DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
ERSKafkaLogHandler.py
Go to the documentation of this file.
1from erskafka.ERSPublisher import ERSPublisher, ERSException, SeverityLevel # import the custom exception and the SeverityLevel enum
2#import erskafka.ERSPublisher as erspub
3import logging
4import os
5
7 pass
8
9class ERSKafkaLogHandler(logging.Handler):
10 '''
11 A logging handler that sends log messages to the ERS system via Kafka.
12
13 Note 1: You need to have Kafka to use this correctly, to see the message in Grafana, you will need to have the full ERS stack running.
14 Note 2: IMPORTANT!! you MUST NOT use this handler on the root logger. Use it on a logger that you have created yourself (this is because the root logger is used by Kafka, and it creates a circular dependency).
15
16 Example:
17 ```python
18 import logging
19 from erskafka.ERSKafkaLogHandler import ERSKafkaLogHandler
20
21 logger = logging.getLogger("my_logger") # NOTE, THIS IS NOT THE ROOT LOGGER!!!
22 # logger = logging.getLogger() # FORBIDDEN!!! Will should raise an exception
23 logger.setLevel(logging.DEBUG)
24 handler = ERSKafkaLogHandler(session="test")
25 logger.addHandler(handler)
26 logger.debug("This is a debug message")
27 ```
28 '''
30 self,
31 session:str="Unknown",
32 kafka_address:str="monkafka.cern.ch:30092",
33 kafka_topic:str="ers_stream",
34 ):
35 super().__init__()
36 os.environ['DUNEDAQ_PARTITION'] = session
37 self.session:str = session
38 self.kafka_address:str = kafka_address
39 self.kafka_topic:str = kafka_topic
40
41 self.publisher = ERSPublisher(
42 bootstrap = kafka_address,
43 topic = kafka_topic,
44 )
45
46 @staticmethod
47 def _convert_logging_level_to_ers_level(level:int) -> SeverityLevel:
48 match level:
49 case logging.DEBUG:
50 return SeverityLevel.DEBUG
51 case logging.INFO:
52 return SeverityLevel.INFO
53 case logging.WARNING:
54 return SeverityLevel.WARNING
55 case logging.ERROR:
56 return SeverityLevel.ERROR
57 case logging.CRITICAL:
58 return SeverityLevel.FATAL
59 case _:
60 return SeverityLevel.INFO
61
62 def emit(self, record:logging.LogRecord) -> None:
63 ers_level = ERSKafkaLogHandler._convert_logging_level_to_ers_level(record.levelno)
64
65 if record.name == 'root':
66 raise ERSKafkaLogHandlerOnRootLogger('To avoid all sorts of undesired behaviours this logger cannot be use on the root logger. Use logging.getLogger("some_name").addHandler(ERSKafkaLogHandler()) instead.')
67
68 success = self.publisher.publish(
69 record.msg,
70 severity = ers_level.name,
71 context_kwargs = dict(
72 package_name = str(record.module),
73 application_name = str(record.name),
74 line_number = record.lineno,
75 file_name = str(record.pathname),
76 function_name = str(record.funcName),
77 )
78 )
79 if not success:
80 print(f'WARNING! Failed to publish: {record.msg} to Kafka')
None emit(self, logging.LogRecord record)
__init__(self, str session="Unknown", str kafka_address="monkafka.cern.ch:30092", str kafka_topic="ers_stream")
SeverityLevel _convert_logging_level_to_ers_level(int level)