DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
OpMonPublisher.py
Go to the documentation of this file.
1#!/usr/bin/env python3
2
3import logging
4import sys
5
6from google.protobuf.message import Message as Msg
7from kafka import KafkaProducer
8from opmonlib.conf import OpMonConf
9from opmonlib.opmon_entry_pb2 import OpMonEntry
10from opmonlib.publisher_base import OpMonPublisherBase
11from opmonlib.utils import logging_log_level_from_str, setup_rich_handler
12
13
15 """Tool for publishing operational monitoring metrics to kafka."""
16
17 def __init__(self, conf: OpMonConf) -> None:
18 """Construct the object to publish OpMon metrics to kafka."""
19 super().__init__()
20 self.loglog = logging.getLogger("OpMonPublisher")
21 self.confconf = conf
22 if isinstance(self.confconf.level, str):
23 self.confconf.level = logging_log_level_from_str(self.confconf.level)
24 self.loglog.setLevel(self.confconf.level)
25 self.loglog.addHandler(setup_rich_handler())
26
27 if self.confconf.opmon_type != "stream":
28 self.loglog.error("Type must be stream to publish to kafka.")
29 sys.exit(1)
30
31 if self.confconf.bootstrap == "":
32 self.loglog.warning(
33 "There is no boostrap provided, not initializing publisher to topic %s",
34 self.confconf.default_topic,
35 )
36 self.opmon_producer = None
37 return
38
39 self.default_topicdefault_topic = "monitoring." + self.confconf.topic
40 self.publisherpublisherpublisher = KafkaProducer(
41 bootstrap_servers=conf.bootstrap,
42 value_serializer=lambda v: v.SerializeToString(),
43 key_serializer=lambda k: str(k).encode("utf-8"),
44 )
45
46 super().__post_init__()
47 return
48
49 def extract_key(self, opmon_entry: OpMonEntry) -> str:
50 """Extract the key from the OpMonEntry."""
51 self.check_publisher()
52 key = str(opmon_entry.origin.session)
53 if opmon_entry.origin.application != "":
54 key += "." + opmon_entry.origin.application
55 if opmon_entry.origin.substructure:
56 for substructure_id in opmon_entry.origin.substructure:
57 key += "." + substructure_id
58 key += "/" + str(opmon_entry.measurement)
59 return key
60
62 self,
63 message: Msg,
64 custom_origin: dict[str, str] | None = None,
65 level: int | str | None = None,
66 ) -> None:
67 """Send an OpMonEntry to Kafka."""
68 if not isinstance(message, Msg):
69 self.loglog.error("Passed message needs to be of type google.protobuf.message")
70 return
71 if not level:
72 level = self.confconf.level
73 if isinstance(level, str):
74 level = logging_log_level_from_str(level)
75 if level < self.confconf.level:
76 return
77
78 metric = self.to_entry(message=message, custom_origin=custom_origin)
79
80 if len(metric.data) == 0:
81 self.loglog.warning("OpMonEntry of type %s has no data", message.__name__)
82 return
83
84 target_topic = self.extract_topic(message)
85 target_key = self.extract_key(metric)
86
87 self.publisherpublisherpublisher.send(target_topic, value=metric, key=target_key)
88 return
None publish(self, Msg message, dict[str, str]|None custom_origin=None, int|str|None level=None)
str extract_key(self, OpMonEntry opmon_entry)
None __init__(self, OpMonConf conf)
OpMonEntry to_entry(self, Msg message, dict[str, str]|None custom_origin)
Factory couldn t std::string alg_name Invalid configuration error
Definition Issues.hpp:34