9from collections.abc
import Callable
11import google.protobuf.message
as msg
13from kafka
import KafkaConsumer
17 """Define callback function properties to store and validate function execution."""
20 self, function: Callable, opmon_id: re.Pattern, measurement: re.Pattern
22 """Construct the OpMonFunction."""
28 def match(self, key: str) -> bool:
29 """Validate the key follows the standard structure."""
30 opmon_id, measure = key.split(
"/", 1)
37 def execute(self, e: entry.OpMonEntry) ->
None:
38 """Execute the function."""
44 """Subscribe to a kafka topic to read the contained OpMon metrics."""
49 group_id: str |
None =
None,
50 timeout_ms: int = 500,
51 topics: dict[str, str] |
None =
None,
53 """Construct the OpMonSubscriber."""
59 msg =
"Topic list is empty"
69 """Construct the default kafka consumer ID."""
70 node = socket.gethostname()
71 user = getpass.getuser()
73 thread = threading.get_ident()
74 return f
"{node}-{user}-{process}-{thread}"
81 measurement: str =
".*",
83 """Register a callback function to the OpMonSubscriber."""
93 opmon_id=re.compile(opmon_id),
94 measurement=re.compile(measurement),
104 """Remove all callback functions from the OpMonSubscriber."""
111 """Remove the named callback functions from the OpMonSubscriber."""
121 if was_running
and len(self.
functions) > 0:
126 """Start listening to the kafka topic."""
127 logging.info(
"Starting run")
133 """Stop listening to the kafka topic."""
139 """Process entries read in with the KafkaConsumer."""
145 consumer = KafkaConsumer(
149 consumer_timeout_ms=self.
timeout,
153 consumer.subscribe([
"monitoring." + s
for s
in topics])
155 keys_str =
", ".join(self.
functions.keys())
156 logging.info(
"ID: %s running with functions %s", group_id, keys_str)
160 message_it = iter(consumer)
161 message = next(message_it)
162 key = message.key.decode(
"ascii")
167 if function.match(key):
168 e = entry.OpMonEntry()
169 e.ParseFromString(message.value)
172 except msg.DecodeError:
173 logging.exception(
"Could not parse message")
174 except StopIteration:
177 logging.exception(
"Unhandled exception thrown")
179 logging.info(
"Stop run")
None execute(self, entry.OpMonEntry e)
bool match(self, str key)
None __init__(self, Callable function, re.Pattern opmon_id, re.Pattern measurement)
bootstrap
Options from configurations.
bool remove_callback(self, str name)
bool running
runtime options
None clear_callbacks(self)
None __init__(self, str bootstrap, str|None group_id=None, int timeout_ms=500, dict[str, str]|None topics=None)
bool add_callback(self, str name, Callable function, str opmon_id=".*", str measurement=".*")