6from google.protobuf.message
import Message
as Msg
7from kafka
import KafkaProducer
11from opmonlib.utils import logging_log_level_from_str, setup_rich_handler
15 """Tool for publishing operational monitoring metrics to kafka."""
18 """Construct the object to publish OpMon metrics to kafka."""
20 self.
loglog = logging.getLogger(
"OpMonPublisher")
22 if isinstance(self.
confconf.level, str):
25 self.
loglog.addHandler(setup_rich_handler())
27 if self.
confconf.opmon_type !=
"stream":
28 self.
loglog.
error(
"Type must be stream to publish to kafka.")
33 "There is no boostrap provided, not initializing publisher to topic %s",
41 bootstrap_servers=conf.bootstrap,
42 value_serializer=
lambda v: v.SerializeToString(),
43 key_serializer=
lambda k: str(k).encode(
"utf-8"),
50 """Extract the key from the OpMonEntry."""
52 key = str(opmon_entry.origin.session)
53 if opmon_entry.origin.application !=
"":
54 key +=
"." + opmon_entry.origin.application
55 if opmon_entry.origin.substructure:
56 for substructure_id
in opmon_entry.origin.substructure:
57 key +=
"." + substructure_id
58 key +=
"/" + str(opmon_entry.measurement)
64 custom_origin: dict[str, str] |
None =
None,
65 level: int | str |
None =
None,
67 """Send an OpMonEntry to Kafka."""
68 if not isinstance(message, Msg):
69 self.
loglog.
error(
"Passed message needs to be of type google.protobuf.message")
73 if isinstance(level, str):
74 level = logging_log_level_from_str(level)
78 metric = self.
to_entry(message=message, custom_origin=custom_origin)
80 if len(metric.data) == 0:
81 self.
loglog.warning(
"OpMonEntry of type %s has no data", message.__name__)
None publish(self, Msg message, dict[str, str]|None custom_origin=None, int|str|None level=None)
str extract_key(self, OpMonEntry opmon_entry)
None __init__(self, OpMonConf conf)
OpMonEntry to_entry(self, Msg message, dict[str, str]|None custom_origin)
str extract_topic(self, Msg message)
None check_publisher(self)
Factory couldn t std::string alg_name Invalid configuration error