9from collections.abc
import Callable
11import google.protobuf.message
as msg
13from kafka
import KafkaConsumer
17 """Define callback function properties to store and validate function execution."""
20 self, function: Callable, opmon_id: re.Pattern, measurement: re.Pattern
22 """Construct the OpMonFunction."""
28 def match(self, key: str) -> bool:
29 """Validate the key follows the standard structure."""
30 opmon_id, measure = key.split(
"/", 1)
37 def execute(self, e: entry.OpMonEntry) ->
None:
38 """Execute the function."""
44 """Subscribe to a kafka topic to read the contained OpMon metrics."""
49 group_id: str |
None =
None,
50 timeout_ms: int = 500,
51 topics: dict[str, str] |
None =
None,
53 """Construct the OpMonSubscriber."""
59 msg =
"Topic list is empty"
69 """Construct the default kafka consumer ID."""
70 node = socket.gethostname()
71 user = getpass.getuser()
73 thread = threading.get_ident()
74 return f
"{node}-{user}-{process}-{thread}"
81 measurement: str =
".*",
83 """Register a callback function to the OpMonSubscriber."""
93 opmon_id=re.compile(opmon_id),
94 measurement=re.compile(measurement),
104 """Remove all callback functions from the OpMonSubscriber."""
111 """Remove the named callback functions from the OpMonSubscriber."""
126 """Start listening to the kafka topic."""
127 logging.info(
"Starting run")
133 """Stop listening to the kafka topic."""
139 """Process entries read in with the KafkaConsumer."""
145 consumer = KafkaConsumer(
149 consumer_timeout_ms=self.
timeout,
153 consumer.subscribe([
"monitoring." + s
for s
in topics])
156 logging.info(
"ID: %s running with functions %s", group_id, keys_str)
160 message_it = iter(consumer)
161 message = next(message_it)
162 key = message.key.decode(
"ascii")
167 if function.match(key):
168 e = entry.OpMonEntry()
169 e.ParseFromString(message.value)
172 except msg.DecodeError:
173 logging.exception(
"Could not parse message")
174 except StopIteration:
177 logging.exception(
"Unhandled exception thrown")
179 logging.info(
"Stop run")
bool match(self, str key)
None __init__(self, Callable function, re.Pattern opmon_id, re.Pattern measurement)
None execute(self, entry.OpMonEntry e)
bool add_callback(self, str name, Callable function, str opmon_id=".*", str measurement=".*")
bool running
runtime options
None __init__(self, str bootstrap, str|None group_id=None, int timeout_ms=500, dict[str, str]|None topics=None)
bool remove_callback(self, str name)
None clear_callbacks(self)
bootstrap
Options from configurations.