DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
OpMonPublisher.OpMonPublisher Class Reference
Inheritance diagram for OpMonPublisher.OpMonPublisher:
[legend]
Collaboration diagram for OpMonPublisher.OpMonPublisher:
[legend]

Public Member Functions

None __init__ (self, OpMonConf conf)
 
str extract_key (self, OpMonEntry opmon_entry)
 
None publish (self, Msg message, dict[str, str]|None custom_origin=None, int|str|None level=None)
 
- Public Member Functions inherited from opmonlib.publisher_base.OpMonPublisherBase
None __post_init__ (self)
 
None check_publisher (self)
 
str extract_topic (self, Msg message)
 
OpMonId make_origin (self, str session, str app)
 
dict[str, str] validate_custom_origin (self, dict[str, str]|None custom_origin=None)
 
dict make_data (self, Msg message, str top_block="")
 
OpMonValue to_map (self, int|float|bool|str value, int field_type)
 
OpMonEntry to_entry (self, Msg message, dict[str, str]|None custom_origin)
 
int log_level_to_int (self, str|int level)
 
str log_level_to_str (self, str|int level)
 

Public Attributes

 log = logging.getLogger("OpMonPublisher")
 
 conf = conf
 
 opmon_producer = None
 
str default_topic = "monitoring." + self.conf.topic
 
 publisher
 
- Public Attributes inherited from opmonlib.publisher_base.OpMonPublisherBase
 ts = Timestamp()
 
 substructure = None
 
 log
 
 conf
 
 publisher = "OpMon configuration must be of type OpMonConf."
 
 default_topic
 
tuple publisher
 

Detailed Description

Tool for publishing operational monitoring metrics to kafka.

Definition at line 14 of file OpMonPublisher.py.

Constructor & Destructor Documentation

◆ __init__()

None OpMonPublisher.OpMonPublisher.__init__ ( self,
OpMonConf conf )
Construct the object to publish OpMon metrics to kafka.

Reimplemented from opmonlib.publisher_base.OpMonPublisherBase.

Definition at line 17 of file OpMonPublisher.py.

17 def __init__(self, conf: OpMonConf) -> None:
18 """Construct the object to publish OpMon metrics to kafka."""
19 super().__init__()
20 self.log = logging.getLogger("OpMonPublisher")
21 self.conf = conf
22 if isinstance(self.conf.level, str):
23 self.conf.level = logging_log_level_from_str(self.conf.level)
24 self.log.setLevel(self.conf.level)
25 self.log.addHandler(setup_rich_handler())
26
27 if self.conf.opmon_type != "stream":
28 self.log.error("Type must be stream to publish to kafka.")
29 sys.exit(1)
30
31 if self.conf.bootstrap == "":
32 self.log.warning(
33 "There is no boostrap provided, not initializing publisher to topic %s",
34 self.conf.default_topic,
35 )
36 self.opmon_producer = None
37 return
38
39 self.default_topic = "monitoring." + self.conf.topic
40 self.publisher = KafkaProducer(
41 bootstrap_servers=conf.bootstrap,
42 value_serializer=lambda v: v.SerializeToString(),
43 key_serializer=lambda k: str(k).encode("utf-8"),
44 )
45
46 super().__post_init__()
47 return
48
Factory couldn t std::string alg_name Invalid configuration error
Definition Issues.hpp:34

Member Function Documentation

◆ extract_key()

str OpMonPublisher.OpMonPublisher.extract_key ( self,
OpMonEntry opmon_entry )
Extract  the key from the OpMonEntry.

Definition at line 49 of file OpMonPublisher.py.

49 def extract_key(self, opmon_entry: OpMonEntry) -> str:
50 """Extract the key from the OpMonEntry."""
51 self.check_publisher()
52 key = str(opmon_entry.origin.session)
53 if opmon_entry.origin.application != "":
54 key += "." + opmon_entry.origin.application
55 if opmon_entry.origin.substructure:
56 for substructure_id in opmon_entry.origin.substructure:
57 key += "." + substructure_id
58 key += "/" + str(opmon_entry.measurement)
59 return key
60

◆ publish()

None OpMonPublisher.OpMonPublisher.publish ( self,
Msg message,
dict[str, str] | None custom_origin = None,
int | str | None level = None )
Send an OpMonEntry to Kafka.

Reimplemented from opmonlib.publisher_base.OpMonPublisherBase.

Definition at line 61 of file OpMonPublisher.py.

66 ) -> None:
67 """Send an OpMonEntry to Kafka."""
68 if not isinstance(message, Msg):
69 self.log.error("Passed message needs to be of type google.protobuf.message")
70 return
71 if not level:
72 level = self.conf.level
73 if isinstance(level, str):
74 level = logging_log_level_from_str(level)
75 if level < self.conf.level:
76 return
77
78 metric = self.to_entry(message=message, custom_origin=custom_origin)
79
80 if len(metric.data) == 0:
81 self.log.warning("OpMonEntry of type %s has no data", message.__name__)
82 return
83
84 target_topic = self.extract_topic(message)
85 target_key = self.extract_key(metric)
86
87 self.publisher.send(target_topic, value=metric, key=target_key)
88 return

Member Data Documentation

◆ conf

OpMonPublisher.OpMonPublisher.conf = conf

Definition at line 21 of file OpMonPublisher.py.

◆ default_topic

str OpMonPublisher.OpMonPublisher.default_topic = "monitoring." + self.conf.topic

Definition at line 39 of file OpMonPublisher.py.

◆ log

OpMonPublisher.OpMonPublisher.log = logging.getLogger("OpMonPublisher")

Definition at line 20 of file OpMonPublisher.py.

◆ opmon_producer

OpMonPublisher.OpMonPublisher.opmon_producer = None

Definition at line 36 of file OpMonPublisher.py.

◆ publisher

OpMonPublisher.OpMonPublisher.publisher
Initial value:
= KafkaProducer(
bootstrap_servers=conf.bootstrap,
value_serializer=lambda v: v.SerializeToString(),
key_serializer=lambda k: str(k).encode("utf-8"),
)

Definition at line 40 of file OpMonPublisher.py.


The documentation for this class was generated from the following file: