DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
OpMonSubscriber.OpMonSubscriber Class Reference

Public Member Functions

None __init__ (self, str bootstrap, str|None group_id=None, int timeout_ms=500, dict[str, str]|None topics=None)
 
str default_id (self)
 
bool add_callback (self, str name, Callable function, str opmon_id=".*", str measurement=".*")
 
None clear_callbacks (self)
 
bool remove_callback (self, str name)
 
None start (self)
 
None stop (self)
 
None message_loop (self)
 

Public Attributes

 bootstrap = bootstrap
 Options from configurations.
 
 group_id = group_id
 
 timeout = timeout_ms
 
 topics = topics
 
bool running = False
 runtime options
 
dict functions = {}
 
 thread = threading.Thread(target=self.message_loop)
 
bool functions = self.running
 
str group_id = self.default_id()
 

Detailed Description

Subscribe to a kafka topic to read the contained OpMon metrics.

Definition at line 43 of file OpMonSubscriber.py.

Constructor & Destructor Documentation

◆ __init__()

None OpMonSubscriber.OpMonSubscriber.__init__ ( self,
str bootstrap,
str | None group_id = None,
int timeout_ms = 500,
dict[str, str] | None topics = None )
Construct the OpMonSubscriber.

Definition at line 46 of file OpMonSubscriber.py.

52 ) -> None:
53 """Construct the OpMonSubscriber."""
54
55 self.bootstrap = bootstrap
56 self.group_id = group_id
57 self.timeout = timeout_ms
58 if len(topics) == 0:
59 msg = "Topic list is empty"
60 raise ValueError(msg)
61 self.topics = topics
62
63 self.running = False
64 self.functions = {}
65 self.thread = threading.Thread(target=self.message_loop)
66 return
67

Member Function Documentation

◆ add_callback()

bool OpMonSubscriber.OpMonSubscriber.add_callback ( self,
str name,
Callable function,
str opmon_id = ".*",
str measurement = ".*" )
Register a callback function to the OpMonSubscriber.

Definition at line 76 of file OpMonSubscriber.py.

82 ) -> bool:
83 """Register a callback function to the OpMonSubscriber."""
84 if name in self.functions:
85 return False
86
87 was_running = self.running
88 if was_running:
89 self.stop()
90
91 f = OpMonFunction(
92 function=function,
93 opmon_id=re.compile(opmon_id),
94 measurement=re.compile(measurement),
95 )
96
97 self.functions[name] = f
98
99 if was_running:
100 self.start()
101 return True
102

◆ clear_callbacks()

None OpMonSubscriber.OpMonSubscriber.clear_callbacks ( self)
Remove all callback functions from the OpMonSubscriber.

Definition at line 103 of file OpMonSubscriber.py.

103 def clear_callbacks(self) -> None:
104 """Remove all callback functions from the OpMonSubscriber."""
105 if self.running:
106 self.stop()
107 self.functions.clear()
108 return
109

◆ default_id()

str OpMonSubscriber.OpMonSubscriber.default_id ( self)
Construct the default kafka consumer ID.

Definition at line 68 of file OpMonSubscriber.py.

68 def default_id(self) -> str:
69 """Construct the default kafka consumer ID."""
70 node = socket.gethostname()
71 user = getpass.getuser()
72 process = os.getpid()
73 thread = threading.get_ident()
74 return f"{node}-{user}-{process}-{thread}"
75

◆ message_loop()

None OpMonSubscriber.OpMonSubscriber.message_loop ( self)
Process entries read in with the KafkaConsumer.

Definition at line 138 of file OpMonSubscriber.py.

138 def message_loop(self) -> None:
139 """Process entries read in with the KafkaConsumer."""
140 if not self.group_id:
141 group_id = self.default_id()
142 else:
143 group_id = self.group_id
144
145 consumer = KafkaConsumer(
146 bootstrap_servers=self.bootstrap,
147 group_id=group_id,
148 client_id=self.default_id(),
149 consumer_timeout_ms=self.timeout,
150 )
151
152 topics = self.topics
153 consumer.subscribe(["monitoring." + s for s in topics])
154
155 keys_str = ", ".join(self.functions.keys())
156 logging.info("ID: %s running with functions %s", group_id, keys_str)
157
158 while self.running:
159 try:
160 message_it = iter(consumer)
161 message = next(message_it)
162 key = message.key.decode("ascii")
163
165
166 for function in self.functions.values():
167 if function.match(key):
168 e = entry.OpMonEntry()
169 e.ParseFromString(message.value)
170 function.execute(e)
171
172 except msg.DecodeError:
173 logging.exception("Could not parse message")
174 except StopIteration:
175 pass
176 except Exception:
177 logging.exception("Unhandled exception thrown")
178
179 logging.info("Stop run")

◆ remove_callback()

bool OpMonSubscriber.OpMonSubscriber.remove_callback ( self,
str name )
Remove the named callback functions from the OpMonSubscriber.

Definition at line 110 of file OpMonSubscriber.py.

110 def remove_callback(self, name: str) -> bool:
111 """Remove the named callback functions from the OpMonSubscriber."""
112 if name not in self.functions.keys():
113 return False
114
115 was_running = self.running
116 if was_running:
117 self.stop()
118
119 self.functions.pop(name)
120
121 if was_running and len(self.functions) > 0:
122 self.start()
123 return True
124

◆ start()

None OpMonSubscriber.OpMonSubscriber.start ( self)
Start listening to the kafka topic.

Definition at line 125 of file OpMonSubscriber.py.

125 def start(self) -> None:
126 """Start listening to the kafka topic."""
127 logging.info("Starting run")
128 self.running = True
129 self.thread.start()
130 return
131

◆ stop()

None OpMonSubscriber.OpMonSubscriber.stop ( self)
Stop listening to the kafka topic.

Definition at line 132 of file OpMonSubscriber.py.

132 def stop(self) -> None:
133 """Stop listening to the kafka topic."""
134 self.running = False
135 self.thread.join()
136 return
137

Member Data Documentation

◆ bootstrap

OpMonSubscriber.OpMonSubscriber.bootstrap = bootstrap

Options from configurations.

Definition at line 55 of file OpMonSubscriber.py.

◆ functions [1/2]

OpMonSubscriber.OpMonSubscriber.functions = {}

Definition at line 64 of file OpMonSubscriber.py.

◆ functions [2/2]

bool OpMonSubscriber.OpMonSubscriber.functions = self.running

Definition at line 84 of file OpMonSubscriber.py.

◆ group_id [1/2]

OpMonSubscriber.OpMonSubscriber.group_id = group_id

Definition at line 56 of file OpMonSubscriber.py.

◆ group_id [2/2]

str OpMonSubscriber.OpMonSubscriber.group_id = self.default_id()

Definition at line 140 of file OpMonSubscriber.py.

◆ running

OpMonSubscriber.OpMonSubscriber.running = False

runtime options

Definition at line 63 of file OpMonSubscriber.py.

◆ thread

OpMonSubscriber.OpMonSubscriber.thread = threading.Thread(target=self.message_loop)

Definition at line 65 of file OpMonSubscriber.py.

◆ timeout

OpMonSubscriber.OpMonSubscriber.timeout = timeout_ms

Definition at line 57 of file OpMonSubscriber.py.

◆ topics

OpMonSubscriber.OpMonSubscriber.topics = topics

Definition at line 61 of file OpMonSubscriber.py.


The documentation for this class was generated from the following file: