DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
publisher_base.py
Go to the documentation of this file.
1import sys
2from abc import ABC, abstractmethod
3
4from google.protobuf.descriptor import FieldDescriptor
5from google.protobuf.message import Message as Msg
6from google.protobuf.timestamp_pb2 import Timestamp
7
8from opmonlib.conf import OpMonConf
9from opmonlib.opmon_entry_pb2 import OpMonEntry, OpMonId, OpMonValue
10from opmonlib.utils import (
11 LogLevelError,
12 logging_log_levels,
13 oks_log_levels,
14 oks_to_logging_map,
15)
16
17
19 """Base class for OpMon publishers."""
20
21 @abstractmethod
22 def __init__(self) -> None:
23 """Construct the publisher."""
24 self.ts = Timestamp()
25 self.substructure = None
26 pass
27
28 def __post_init__(self) -> None:
29 """Perform post-init checks."""
30 if not self.log:
31 err_msg = "OpMon publisher must have a logger."
32 raise (err_msg)
33 if not self.conf:
34 err_msg = "OpMon publisher must have a configuration."
35 raise AttributeError(err_msg)
36 if not isinstance(self.conf, OpMonConf):
37 err_msg = "OpMon configuration must be of type OpMonConf."
38 raise TypeError(err_msg)
39 if not self.publisherpublisher:
40 err_msg = "OpMon publisher must have a publisher"
41 raise AttributeError(err_msg)
42 if not self.default_topic:
43 err_msg = "OpMon publisher must have a default_topic"
44 raise AttributeError(err_msg)
45 return
46
47 @abstractmethod
49 self,
50 message: Msg,
51 custom_origin: dict[str, str] | None = None,
52 level: int | str | None = None,
53 ) -> None:
54 """Publish an OpMonEntry to the relevant location."""
55 pass
56
57 def check_publisher(self) -> None:
58 """Validate that the publisher has a valid method to send messages."""
59 if not self.publisherpublisher:
60 missing_producer_err_str = (
61 "Improperly initialized OpMonProducer used, nothing will be published."
62 )
63 self.log.error(missing_producer_err_str)
64 sys.exit(1)
65 return
66
67 def extract_topic(self, message: Msg) -> str:
68 """Extract the target topic from the message."""
69 self.check_publisher()
70 return self.default_topic
71
72 def make_origin(self, session: str, app: str) -> OpMonId:
73 """Construct and return the OpMonId."""
74 return OpMonId(session=session, application=app, substructure=self.substructure)
75
77 self, custom_origin: dict[str, str] | None = None
78 ) -> dict[str, str]:
79 """Validate that each custom_origin entry is a str."""
80 if custom_origin is None:
81 return None
82 for key, value in custom_origin.items():
83 if not isinstance(value, str):
84 try:
85 custom_origin[key] = str(value)
86 except TypeError:
87 msg = "%s is not a string and cannot be converted to one.", key
88 raise TypeError(msg) from None
89 return custom_origin
90
91 def make_data(self, message: Msg, top_block: str = "") -> dict:
92 """Map each message entry to the correct data type."""
93 message_dict = {}
94 for name, descriptor in message.DESCRIPTOR.fields_by_name.items():
95 if descriptor.label == FieldDescriptor.LABEL_REPEATED:
96 continue # Repeated values not supported in influxdb
97 if descriptor.cpp_type == FieldDescriptor.CPPTYPE_MESSAGE:
98 top_block += name + "."
99 message_dict = message_dict | self.make_data(
100 getattr(message, name), top_block
101 )
102 else:
103 message_dict[top_block + name] = self.to_map(
104 value=getattr(message, name), field_type=descriptor.cpp_type
105 )
106 return message_dict
107
108 def to_map(self, value: int | float | bool | str, field_type: int) -> OpMonValue:
109 """Map the data entry to the correct protobuf format."""
110 formatted_opmonvalue = OpMonValue()
111 match field_type:
112 case FieldDescriptor.CPPTYPE_INT32:
113 formatted_opmonvalue.int4_value = value
114 case FieldDescriptor.CPPTYPE_INT64:
115 formatted_opmonvalue.int8_value = value
116 case FieldDescriptor.CPPTYPE_UINT32:
117 formatted_opmonvalue.uint4_value = value
118 case FieldDescriptor.CPPTYPE_UINT64:
119 formatted_opmonvalue.uint8_value = value
120 case FieldDescriptor.CPPTYPE_DOUBLE:
121 formatted_opmonvalue.double_value = value
122 case FieldDescriptor.CPPTYPE_FLOAT:
123 formatted_opmonvalue.float_value = value
124 case FieldDescriptor.CPPTYPE_BOOL:
125 formatted_opmonvalue.boolean_value = value
126 case FieldDescriptor.CPPTYPE_STRING:
127 formatted_opmonvalue.string_value = value
128 case _:
129 pass # Ignore unknown types.
130 return formatted_opmonvalue
131
133 self, message: Msg, custom_origin: dict[str, str] | None
134 ) -> OpMonEntry:
135 """Pack all the data that needs to be published to an OpMonEntry."""
136 self.ts.GetCurrentTime()
137 return OpMonEntry(
138 time=self.ts,
139 origin=self.make_origin(self.conf.session, self.conf.application),
140 custom_origin=self.validate_custom_origin(custom_origin),
141 measurement=message.DESCRIPTOR.full_name,
142 data=self.make_data(message),
143 )
144
145 def log_level_to_int(self, level: str | int) -> int:
146 """Convert the log level to the equivalent level in python logging as an int."""
147 if isinstance(level, int):
148 if level in oks_log_levels.values():
149 oks_level_name = next(
150 k for k, v in oks_log_levels.items() if v == level
151 )
152 return logging_log_levels[oks_to_logging_map[oks_level_name]]
153 if level in logging_log_levels.values():
154 return level
155 elif isinstance(level, str):
156 if level in oks_log_levels.keys():
157 return logging_log_levels[oks_to_logging_map[level]]
158 if level.upper() in logging_log_levels.keys():
159 return logging_log_levels[level.upper()]
160 raise LogLevelError(level)
161
162 def log_level_to_str(self, level: str | int) -> str:
163 """Convert the log level to the equivalent level in python logging as a str."""
164 if isinstance(level, str):
165 if level in oks_log_levels.keys():
166 return oks_to_logging_map[level]
167 if level in logging_log_levels.keys():
168 return level
169 elif isinstance(level, int):
170 if level in oks_log_levels.values():
171 oks_level_name = next(
172 k for k, v in oks_log_levels.items() if v == level
173 )
174 return oks_to_logging_map[oks_level_name]
175 if level in logging_log_levels.values():
176 return next(k for k, v in logging_log_levels.items() if v == level)
177 return None
OpMonEntry to_entry(self, Msg message, dict[str, str]|None custom_origin)
OpMonValue to_map(self, int|float|bool|str value, int field_type)
None publish(self, Msg message, dict[str, str]|None custom_origin=None, int|str|None level=None)
dict make_data(self, Msg message, str top_block="")
OpMonId make_origin(self, str session, str app)
dict[str, str] validate_custom_origin(self, dict[str, str]|None custom_origin=None)
Factory couldn t std::string alg_name Invalid configuration error
Definition Issues.hpp:34