DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
opmonlib.publisher_base.OpMonPublisherBase Class Reference
Inheritance diagram for opmonlib.publisher_base.OpMonPublisherBase:
[legend]
Collaboration diagram for opmonlib.publisher_base.OpMonPublisherBase:
[legend]

Public Member Functions

None __init__ (self)
 
None __post_init__ (self)
 
None publish (self, Msg message, dict[str, str]|None custom_origin=None, int|str|None level=None)
 
None check_publisher (self)
 
str extract_topic (self, Msg message)
 
OpMonId make_origin (self, str session, str app)
 
dict[str, str] validate_custom_origin (self, dict[str, str]|None custom_origin=None)
 
dict make_data (self, Msg message, str top_block="")
 
OpMonValue to_map (self, int|float|bool|str value, int field_type)
 
OpMonEntry to_entry (self, Msg message, dict[str, str]|None custom_origin)
 
int log_level_to_int (self, str|int level)
 
str log_level_to_str (self, str|int level)
 

Public Attributes

 ts = Timestamp()
 
 substructure = None
 
 log
 
 conf
 
 publisher = "OpMon configuration must be of type OpMonConf."
 
 default_topic
 
tuple publisher
 

Detailed Description

Base class for OpMon publishers.

Definition at line 18 of file publisher_base.py.

Constructor & Destructor Documentation

◆ __init__()

None opmonlib.publisher_base.OpMonPublisherBase.__init__ ( self)
Construct the publisher.

Reimplemented in opmonlib.publisher.OpMonPublisher, and OpMonPublisher.OpMonPublisher.

Definition at line 22 of file publisher_base.py.

22 def __init__(self) -> None:
23 """Construct the publisher."""
24 self.ts = Timestamp()
25 self.substructure = None
26 pass
27

Member Function Documentation

◆ __post_init__()

None opmonlib.publisher_base.OpMonPublisherBase.__post_init__ ( self)
Perform post-init checks.

Definition at line 28 of file publisher_base.py.

28 def __post_init__(self) -> None:
29 """Perform post-init checks."""
30 if not self.log:
31 err_msg = "OpMon publisher must have a logger."
32 raise (err_msg)
33 if not self.conf:
34 err_msg = "OpMon publisher must have a configuration."
35 raise AttributeError(err_msg)
36 if not isinstance(self.conf, OpMonConf):
37 err_msg = "OpMon configuration must be of type OpMonConf."
38 raise TypeError(err_msg)
39 if not self.publisher:
40 err_msg = "OpMon publisher must have a publisher"
41 raise AttributeError(err_msg)
42 if not self.default_topic:
43 err_msg = "OpMon publisher must have a default_topic"
44 raise AttributeError(err_msg)
45 return
46

◆ check_publisher()

None opmonlib.publisher_base.OpMonPublisherBase.check_publisher ( self)
Validate that the publisher has a valid method to send messages.

Definition at line 57 of file publisher_base.py.

57 def check_publisher(self) -> None:
58 """Validate that the publisher has a valid method to send messages."""
59 if not self.publisher:
60 missing_producer_err_str = (
61 "Improperly initialized OpMonProducer used, nothing will be published."
62 )
63 self.log.error(missing_producer_err_str)
64 sys.exit(1)
65 return
66
Factory couldn t std::string alg_name Invalid configuration error
Definition Issues.hpp:34

◆ extract_topic()

str opmonlib.publisher_base.OpMonPublisherBase.extract_topic ( self,
Msg message )
Extract the target topic from the message.

Definition at line 67 of file publisher_base.py.

67 def extract_topic(self, message: Msg) -> str:
68 """Extract the target topic from the message."""
69 self.check_publisher()
70 return self.default_topic
71

◆ log_level_to_int()

int opmonlib.publisher_base.OpMonPublisherBase.log_level_to_int ( self,
str | int level )
Convert the log level to the equivalent level in python logging as an int.

Definition at line 145 of file publisher_base.py.

145 def log_level_to_int(self, level: str | int) -> int:
146 """Convert the log level to the equivalent level in python logging as an int."""
147 if isinstance(level, int):
148 if level in oks_log_levels.values():
149 oks_level_name = next(
150 k for k, v in oks_log_levels.items() if v == level
151 )
152 return logging_log_levels[oks_to_logging_map[oks_level_name]]
153 if level in logging_log_levels.values():
154 return level
155 elif isinstance(level, str):
156 if level in oks_log_levels.keys():
157 return logging_log_levels[oks_to_logging_map[level]]
158 if level.upper() in logging_log_levels.keys():
159 return logging_log_levels[level.upper()]
160 raise LogLevelError(level)
161

◆ log_level_to_str()

str opmonlib.publisher_base.OpMonPublisherBase.log_level_to_str ( self,
str | int level )
Convert the log level to the equivalent level in python logging as a str.

Definition at line 162 of file publisher_base.py.

162 def log_level_to_str(self, level: str | int) -> str:
163 """Convert the log level to the equivalent level in python logging as a str."""
164 if isinstance(level, str):
165 if level in oks_log_levels.keys():
166 return oks_to_logging_map[level]
167 if level in logging_log_levels.keys():
168 return level
169 elif isinstance(level, int):
170 if level in oks_log_levels.values():
171 oks_level_name = next(
172 k for k, v in oks_log_levels.items() if v == level
173 )
174 return oks_to_logging_map[oks_level_name]
175 if level in logging_log_levels.values():
176 return next(k for k, v in logging_log_levels.items() if v == level)
177 return None

◆ make_data()

dict opmonlib.publisher_base.OpMonPublisherBase.make_data ( self,
Msg message,
str top_block = "" )
Map each message entry to the correct data type.

Definition at line 91 of file publisher_base.py.

91 def make_data(self, message: Msg, top_block: str = "") -> dict:
92 """Map each message entry to the correct data type."""
93 message_dict = {}
94 for name, descriptor in message.DESCRIPTOR.fields_by_name.items():
95 if descriptor.label == FieldDescriptor.LABEL_REPEATED:
96 continue # Repeated values not supported in influxdb
97 if descriptor.cpp_type == FieldDescriptor.CPPTYPE_MESSAGE:
98 top_block += name + "."
99 message_dict = message_dict | self.make_data(
100 getattr(message, name), top_block
101 )
102 else:
103 message_dict[top_block + name] = self.to_map(
104 value=getattr(message, name), field_type=descriptor.cpp_type
105 )
106 return message_dict
107

◆ make_origin()

OpMonId opmonlib.publisher_base.OpMonPublisherBase.make_origin ( self,
str session,
str app )
Construct and return the OpMonId.

Definition at line 72 of file publisher_base.py.

72 def make_origin(self, session: str, app: str) -> OpMonId:
73 """Construct and return the OpMonId."""
74 return OpMonId(session=session, application=app, substructure=self.substructure)
75

◆ publish()

None opmonlib.publisher_base.OpMonPublisherBase.publish ( self,
Msg message,
dict[str, str] | None custom_origin = None,
int | str | None level = None )
Publish an OpMonEntry to the relevant location.

Reimplemented in opmonlib.publisher.OpMonPublisher, and OpMonPublisher.OpMonPublisher.

Definition at line 48 of file publisher_base.py.

53 ) -> None:
54 """Publish an OpMonEntry to the relevant location."""
55 pass
56

◆ to_entry()

OpMonEntry opmonlib.publisher_base.OpMonPublisherBase.to_entry ( self,
Msg message,
dict[str, str] | None custom_origin )
Pack all the data that needs to be published to an OpMonEntry.

Definition at line 132 of file publisher_base.py.

134 ) -> OpMonEntry:
135 """Pack all the data that needs to be published to an OpMonEntry."""
136 self.ts.GetCurrentTime()
137 return OpMonEntry(
138 time=self.ts,
139 origin=self.make_origin(self.conf.session, self.conf.application),
140 custom_origin=self.validate_custom_origin(custom_origin),
141 measurement=message.DESCRIPTOR.full_name,
142 data=self.make_data(message),
143 )
144

◆ to_map()

OpMonValue opmonlib.publisher_base.OpMonPublisherBase.to_map ( self,
int | float | bool | str value,
int field_type )
Map the data entry to the correct protobuf format.

Definition at line 108 of file publisher_base.py.

108 def to_map(self, value: int | float | bool | str, field_type: int) -> OpMonValue:
109 """Map the data entry to the correct protobuf format."""
110 formatted_opmonvalue = OpMonValue()
111 match field_type:
112 case FieldDescriptor.CPPTYPE_INT32:
113 formatted_opmonvalue.int4_value = value
114 case FieldDescriptor.CPPTYPE_INT64:
115 formatted_opmonvalue.int8_value = value
116 case FieldDescriptor.CPPTYPE_UINT32:
117 formatted_opmonvalue.uint4_value = value
118 case FieldDescriptor.CPPTYPE_UINT64:
119 formatted_opmonvalue.uint8_value = value
120 case FieldDescriptor.CPPTYPE_DOUBLE:
121 formatted_opmonvalue.double_value = value
122 case FieldDescriptor.CPPTYPE_FLOAT:
123 formatted_opmonvalue.float_value = value
124 case FieldDescriptor.CPPTYPE_BOOL:
125 formatted_opmonvalue.boolean_value = value
126 case FieldDescriptor.CPPTYPE_STRING:
127 formatted_opmonvalue.string_value = value
128 case _:
129 pass # Ignore unknown types.
130 return formatted_opmonvalue
131

◆ validate_custom_origin()

dict[str, str] opmonlib.publisher_base.OpMonPublisherBase.validate_custom_origin ( self,
dict[str, str] | None custom_origin = None )
Validate that each custom_origin entry is a str.

Definition at line 76 of file publisher_base.py.

78 ) -> dict[str, str]:
79 """Validate that each custom_origin entry is a str."""
80 if custom_origin is None:
81 return None
82 for key, value in custom_origin.items():
83 if not isinstance(value, str):
84 try:
85 custom_origin[key] = str(value)
86 except TypeError:
87 msg = "%s is not a string and cannot be converted to one.", key
88 raise TypeError(msg) from None
89 return custom_origin
90

Member Data Documentation

◆ conf

opmonlib.publisher_base.OpMonPublisherBase.conf

Definition at line 33 of file publisher_base.py.

◆ default_topic

opmonlib.publisher_base.OpMonPublisherBase.default_topic

Definition at line 42 of file publisher_base.py.

◆ log

opmonlib.publisher_base.OpMonPublisherBase.log

Definition at line 30 of file publisher_base.py.

◆ publisher [1/2]

opmonlib.publisher_base.OpMonPublisherBase.publisher = "OpMon configuration must be of type OpMonConf."

Definition at line 39 of file publisher_base.py.

◆ publisher [2/2]

tuple opmonlib.publisher_base.OpMonPublisherBase.publisher
Initial value:
= (
"Improperly initialized OpMonProducer used, nothing will be published."
)

Definition at line 59 of file publisher_base.py.

◆ substructure

opmonlib.publisher_base.OpMonPublisherBase.substructure = None

Definition at line 25 of file publisher_base.py.

◆ ts

opmonlib.publisher_base.OpMonPublisherBase.ts = Timestamp()

Definition at line 24 of file publisher_base.py.


The documentation for this class was generated from the following file: