DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
publisher.py
Go to the documentation of this file.
1import logging
2import sys
3
4from google.protobuf.json_format import MessageToJson
5from google.protobuf.message import Message as Msg
6
7from opmonlib.conf import OpMonConf
8from opmonlib.publisher_base import OpMonPublisherBase
9from opmonlib.utils import (
10 LoggingFormatter,
11 extract_opmon_file_path,
12 full_log_format,
13 setup_rich_handler,
14)
15
16
18 """Publish operational monitoring metrics to file or stream."""
19
21 self,
22 conf: OpMonConf,
23 log_level: int | str = logging.INFO,
24 rich_handler: bool = False,
25 ) -> None:
26 """Construct the object to publish OpMon metrics to stdout."""
27 super().__init__()
28 self.loglog = logging.getLogger("OpMonPublisher")
29 if not isinstance(log_level, int):
30 log_level = self.log_level_to_int(log_level)
31 self.loglog.setLevel(log_level)
32 self.loglog.addHandler(setup_rich_handler())
33
34 self.confconf = conf
35 self.confconf.level = self.log_level_to_int(self.confconf.level)
36
37 if self.confconf.opmon_type == "stdout":
38 if rich_handler:
39 handler = setup_rich_handler()
40 else:
41 handler = logging.StreamHandler(sys.stdout)
42 handler.setFormatter(LoggingFormatter(fmt=full_log_format))
43 elif self.confconf.opmon_type == "file":
44 self.confconf.path = extract_opmon_file_path(self.confconf.path)
45 handler = logging.FileHandler(self.confconf.path)
46 elif self.confconf.opmon_type == "stream":
47 self.loglog.error("Type must not be stream to use file or stdout handling.")
48 sys.exit(1)
49 else:
50 self.loglog.error("Unsupported OpMon type.")
51 sys.exit(1)
52
53 self.default_topicdefault_topic = "monitoring." + self.confconf.topic
55 self.publisherpublisherpublisher.addHandler(handler)
56 self.publisherpublisherpublisher.setLevel(self.confconf.level)
57
58 super().__post_init__()
59 return
60
62 self, logger: logging.Logger, level: int | str, message: str
63 ) -> None:
64 """Log the metric with the appropriate level."""
65 method = getattr(logger, self.log_level_to_str(level).lower(), logger.info)
66 method(message)
67 return
68
70 self,
71 message: Msg,
72 custom_origin: dict[str, str] | None = None,
73 level: int | str | None = None,
74 ) -> None:
75 """Publish the message to either a file or the terminal."""
76 if not isinstance(message, Msg):
77 self.loglog.error("Passed message needs to be of type google.protobuf.message")
78 return
79
80 if not level:
81 level = self.confconf.level
82 if not isinstance(level, int):
83 level = self.log_level_to_int(level)
84 if level < self.confconf.level:
85 return
86
87 metric = self.to_entry(message=message, custom_origin=custom_origin)
88 target_topic = self.extract_topic(message)
89 publishing_logger = logging.getLogger(f"{self.publisher.name}.{target_topic}")
90 self.publish_message(publishing_logger, level, MessageToJson(metric))
91 return
None publish(self, Msg message, dict[str, str]|None custom_origin=None, int|str|None level=None)
Definition publisher.py:74
None __init__(self, OpMonConf conf, int|str log_level=logging.INFO, bool rich_handler=False)
Definition publisher.py:25
None publish_message(self, logging.Logger logger, int|str level, str message)
Definition publisher.py:63
OpMonEntry to_entry(self, Msg message, dict[str, str]|None custom_origin)
Factory couldn t std::string alg_name Invalid configuration error
Definition Issues.hpp:34