DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
OpMonSubscriber.py
Go to the documentation of this file.
1#!/usr/bin/env python3
2
3import getpass
4import logging
5import os
6import re
7import socket
8import threading
9from collections.abc import Callable
10
11import google.protobuf.message as msg
12import opmonlib.opmon_entry_pb2 as entry
13from kafka import KafkaConsumer
14
15
17 """Define callback function properties to store and validate function execution."""
18
20 self, function: Callable, opmon_id: re.Pattern, measurement: re.Pattern
21 ) -> None:
22 """Construct the OpMonFunction."""
23 self.function = function
24 self.opmon_id = opmon_id
25 self.measurement = measurement
26 return
27
28 def match(self, key: str) -> bool:
29 """Validate the key follows the standard structure."""
30 opmon_id, measure = key.split("/", 1)
31 if not self.opmon_id.match(opmon_id):
32 return False
33 if not self.measurement.match(measure):
34 return False
35 return True
36
37 def execute(self, e: entry.OpMonEntry) -> None:
38 """Execute the function."""
39 self.function(e)
40 return
41
42
44 """Subscribe to a kafka topic to read the contained OpMon metrics."""
45
47 self,
48 bootstrap: str,
49 group_id: str | None = None,
50 timeout_ms: int = 500,
51 topics: dict[str, str] | None = None,
52 ) -> None:
53 """Construct the OpMonSubscriber."""
54
55 self.bootstrap = bootstrap
56 self.group_idgroup_id = group_id
57 self.timeout = timeout_ms
58 if len(topics) == 0:
59 msg = "Topic list is empty"
60 raise ValueError(msg)
61 self.topics = topics
62
63 self.running = False
65 self.thread = threading.Thread(target=self.message_loop)
66 return
67
68 def default_id(self) -> str:
69 """Construct the default kafka consumer ID."""
70 node = socket.gethostname()
71 user = getpass.getuser()
72 process = os.getpid()
73 thread = threading.get_ident()
74 return f"{node}-{user}-{process}-{thread}"
75
77 self,
78 name: str,
79 function: Callable,
80 opmon_id: str = ".*",
81 measurement: str = ".*",
82 ) -> bool:
83 """Register a callback function to the OpMonSubscriber."""
84 if name in self.functionsfunctions:
85 return False
86
87 was_running = self.running
88 if was_running:
89 self.stop()
90
91 f = OpMonFunction(
92 function=function,
93 opmon_id=re.compile(opmon_id),
94 measurement=re.compile(measurement),
95 )
96
97 self.functionsfunctions[name] = f
98
99 if was_running:
100 self.start()
101 return True
102
103 def clear_callbacks(self) -> None:
104 """Remove all callback functions from the OpMonSubscriber."""
105 if self.running:
106 self.stop()
107 self.functionsfunctions.clear()
108 return
109
110 def remove_callback(self, name: str) -> bool:
111 """Remove the named callback functions from the OpMonSubscriber."""
112 if name not in self.functionsfunctions.keys():
113 return False
114
115 was_running = self.running
116 if was_running:
117 self.stop()
118
119 self.functionsfunctions.pop(name)
120
121 if was_running and len(self.functionsfunctions) > 0:
122 self.start()
123 return True
124
125 def start(self) -> None:
126 """Start listening to the kafka topic."""
127 logging.info("Starting run")
128 self.running = True
129 self.thread.start()
130 return
131
132 def stop(self) -> None:
133 """Stop listening to the kafka topic."""
134 self.running = False
135 self.thread.join()
136 return
137
138 def message_loop(self) -> None:
139 """Process entries read in with the KafkaConsumer."""
140 if not self.group_idgroup_id:
141 group_id = self.default_id()
142 else:
143 group_id = self.group_idgroup_id
144
145 consumer = KafkaConsumer(
146 bootstrap_servers=self.bootstrap,
147 group_id=group_id,
148 client_id=self.default_id(),
149 consumer_timeout_ms=self.timeout,
150 )
151
152 topics = self.topics
153 consumer.subscribe(["monitoring." + s for s in topics])
154
155 keys_str = ", ".join(self.functionsfunctions.keys())
156 logging.info("ID: %s running with functions %s", group_id, keys_str)
157
158 while self.running:
159 try:
160 message_it = iter(consumer)
161 message = next(message_it)
162 key = message.key.decode("ascii")
163
165
166 for function in self.functionsfunctions.values():
167 if function.match(key):
168 e = entry.OpMonEntry()
169 e.ParseFromString(message.value)
170 function.execute(e)
171
172 except msg.DecodeError:
173 logging.exception("Could not parse message")
174 except StopIteration:
175 pass
176 except Exception:
177 logging.exception("Unhandled exception thrown")
178
179 logging.info("Stop run")
None __init__(self, Callable function, re.Pattern opmon_id, re.Pattern measurement)
None execute(self, entry.OpMonEntry e)
bool add_callback(self, str name, Callable function, str opmon_id=".*", str measurement=".*")
None __init__(self, str bootstrap, str|None group_id=None, int timeout_ms=500, dict[str, str]|None topics=None)
bool remove_callback(self, str name)
bootstrap
Options from configurations.