DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
ERSKafkaLogHandler.ERSKafkaLogHandler Class Reference
Inheritance diagram for ERSKafkaLogHandler.ERSKafkaLogHandler:
[legend]
Collaboration diagram for ERSKafkaLogHandler.ERSKafkaLogHandler:
[legend]

Public Member Functions

 __init__ (self, str session="Unknown", str kafka_address="monkafka.cern.ch:30092", str kafka_topic="ers_stream", str|None app_name=None)
 
None emit (self, logging.LogRecord record)
 

Public Attributes

str session = session
 
str kafka_address = kafka_address
 
str kafka_topic = kafka_topic
 
 app_name = app_name
 
 publisher
 

Static Protected Member Functions

SeverityLevel _convert_logging_level_to_ers_level (int level)
 

Detailed Description

A logging handler that sends log messages to the ERS system via Kafka.

Note 1: You need to have Kafka to use this correctly, to see the message in Grafana, you will need to have the full ERS stack running.
Note 2: IMPORTANT!! you MUST NOT use this handler on the root logger. Use it on a logger that you have created yourself (this is because the root logger is used by Kafka, and it creates a circular dependency).

Example:
```python
import logging
from erskafka.ERSKafkaLogHandler import ERSKafkaLogHandler

logger = logging.getLogger("my_logger") # NOTE, THIS IS NOT THE ROOT LOGGER!!!
# logger = logging.getLogger() # FORBIDDEN!!! Will should raise an exception
logger.setLevel(logging.DEBUG)
handler = ERSKafkaLogHandler(session="test")
logger.addHandler(handler)
logger.debug("This is a debug message")
```

Definition at line 9 of file ERSKafkaLogHandler.py.

Constructor & Destructor Documentation

◆ __init__()

ERSKafkaLogHandler.ERSKafkaLogHandler.__init__ ( self,
str session = "Unknown",
str kafka_address = "monkafka.cern.ch:30092",
str kafka_topic = "ers_stream",
str | None app_name = None )

Definition at line 29 of file ERSKafkaLogHandler.py.

35 ):
36 super().__init__()
37 os.environ['DUNEDAQ_PARTITION'] = session
38 self.session:str = session
39 self.kafka_address:str = kafka_address
40 self.kafka_topic:str = kafka_topic
41 self.app_name = app_name
42
43 self.publisher = ERSPublisher(
44 bootstrap = kafka_address,
45 topic = kafka_topic,
46 )
47

Member Function Documentation

◆ _convert_logging_level_to_ers_level()

SeverityLevel ERSKafkaLogHandler.ERSKafkaLogHandler._convert_logging_level_to_ers_level ( int level)
staticprotected

Definition at line 49 of file ERSKafkaLogHandler.py.

49 def _convert_logging_level_to_ers_level(level:int) -> SeverityLevel:
50 match level:
51 case logging.DEBUG:
52 return SeverityLevel.DEBUG
53 case logging.INFO:
54 return SeverityLevel.INFO
55 case logging.WARNING:
56 return SeverityLevel.WARNING
57 case logging.ERROR:
58 return SeverityLevel.ERROR
59 case logging.CRITICAL:
60 return SeverityLevel.FATAL
61 case _:
62 return SeverityLevel.INFO
63

◆ emit()

None ERSKafkaLogHandler.ERSKafkaLogHandler.emit ( self,
logging.LogRecord record )

Definition at line 64 of file ERSKafkaLogHandler.py.

64 def emit(self, record:logging.LogRecord) -> None:
65 ers_level = ERSKafkaLogHandler._convert_logging_level_to_ers_level(record.levelno)
66
67 if record.name == 'root':
68 raise ERSKafkaLogHandlerOnRootLogger('To avoid all sorts of undesired behaviours this logger cannot be use on the root logger. Use logging.getLogger("some_name").addHandler(ERSKafkaLogHandler()) instead.')
69
70 success = self.publisher.publish(
71 record.msg,
72 severity = ers_level.name,
73 context_kwargs = dict(
74 package_name = str(record.module),
75 application_name = self.app_name if self.app_name else str(record.name),
76 line_number = record.lineno,
77 file_name = str(record.pathname),
78 function_name = str(record.funcName),
79 process_id=record.process,
80 thread_id=0 # TODO: have better way of handling this. It should come from drunc etc.
81 )
82 )
83
84 #! publish returns a future, so a flush ensures the message is sent out
85 self.publisher.producer.flush()
86 if not success:
87 print(f'WARNING! Failed to publish: {record.msg} to Kafka')

Member Data Documentation

◆ app_name

ERSKafkaLogHandler.ERSKafkaLogHandler.app_name = app_name

Definition at line 41 of file ERSKafkaLogHandler.py.

◆ kafka_address

str ERSKafkaLogHandler.ERSKafkaLogHandler.kafka_address = kafka_address

Definition at line 39 of file ERSKafkaLogHandler.py.

◆ kafka_topic

str ERSKafkaLogHandler.ERSKafkaLogHandler.kafka_topic = kafka_topic

Definition at line 40 of file ERSKafkaLogHandler.py.

◆ publisher

ERSKafkaLogHandler.ERSKafkaLogHandler.publisher
Initial value:
= ERSPublisher(
bootstrap = kafka_address,
topic = kafka_topic,
)

Definition at line 43 of file ERSKafkaLogHandler.py.

◆ session

str ERSKafkaLogHandler.ERSKafkaLogHandler.session = session

Definition at line 38 of file ERSKafkaLogHandler.py.


The documentation for this class was generated from the following file: