DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
ERSKafkaLogHandler.py
Go to the documentation of this file.
1from erskafka.ERSPublisher import ERSPublisher, ERSException, SeverityLevel # import the custom exception and the SeverityLevel enum
2#import erskafka.ERSPublisher as erspub
3import logging
4import os
5
7 pass
8
9class ERSKafkaLogHandler(logging.Handler):
10 '''
11 A logging handler that sends log messages to the ERS system via Kafka.
12
13 Note 1: You need to have Kafka to use this correctly, to see the message in Grafana, you will need to have the full ERS stack running.
14 Note 2: IMPORTANT!! you MUST NOT use this handler on the root logger. Use it on a logger that you have created yourself (this is because the root logger is used by Kafka, and it creates a circular dependency).
15
16 Example:
17 ```python
18 import logging
19 from erskafka.ERSKafkaLogHandler import ERSKafkaLogHandler
20
21 logger = logging.getLogger("my_logger") # NOTE, THIS IS NOT THE ROOT LOGGER!!!
22 # logger = logging.getLogger() # FORBIDDEN!!! Will should raise an exception
23 logger.setLevel(logging.DEBUG)
24 handler = ERSKafkaLogHandler(session="test")
25 logger.addHandler(handler)
26 logger.debug("This is a debug message")
27 ```
28 '''
30 self,
31 session:str="Unknown",
32 kafka_address:str="monkafka.cern.ch:30092",
33 kafka_topic:str="ers_stream",
34 app_name:str | None = None,
35 ):
36 super().__init__()
37 os.environ['DUNEDAQ_PARTITION'] = session
38 self.session:str = session
39 self.kafka_address:str = kafka_address
40 self.kafka_topic:str = kafka_topic
41 self.app_name = app_name
42
43 self.publisher = ERSPublisher(
44 bootstrap = kafka_address,
45 topic = kafka_topic,
46 )
47
48 @staticmethod
49 def _convert_logging_level_to_ers_level(level:int) -> SeverityLevel:
50 match level:
51 case logging.DEBUG:
52 return SeverityLevel.DEBUG
53 case logging.INFO:
54 return SeverityLevel.INFO
55 case logging.WARNING:
56 return SeverityLevel.WARNING
57 case logging.ERROR:
58 return SeverityLevel.ERROR
59 case logging.CRITICAL:
60 return SeverityLevel.FATAL
61 case _:
62 return SeverityLevel.INFO
63
64 def emit(self, record:logging.LogRecord) -> None:
65 ers_level = ERSKafkaLogHandler._convert_logging_level_to_ers_level(record.levelno)
66
67 if record.name == 'root':
68 raise ERSKafkaLogHandlerOnRootLogger('To avoid all sorts of undesired behaviours this logger cannot be use on the root logger. Use logging.getLogger("some_name").addHandler(ERSKafkaLogHandler()) instead.')
69
70 success = self.publisher.publish(
71 record.msg,
72 severity = ers_level.name,
73 context_kwargs = dict(
74 package_name = str(record.module),
75 application_name = self.app_name if self.app_name else str(record.name),
76 line_number = record.lineno,
77 file_name = str(record.pathname),
78 function_name = str(record.funcName),
79 process_id=record.process,
80 thread_id=0 # TODO: have better way of handling this. It should come from drunc etc.
81 )
82 )
83
84 #! publish returns a future, so a flush ensures the message is sent out
85 self.publisher.producer.flush()
86 if not success:
87 print(f'WARNING! Failed to publish: {record.msg} to Kafka')
None emit(self, logging.LogRecord record)
SeverityLevel _convert_logging_level_to_ers_level(int level)
__init__(self, str session="Unknown", str kafka_address="monkafka.cern.ch:30092", str kafka_topic="ers_stream", str|None app_name=None)