DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
ERSSubscriber.py
Go to the documentation of this file.
1#!/usr/bin/env python3
2
3from kafka import KafkaConsumer
4import json
5import threading
6import socket
7import os
8import re
9import logging
10import getpass
11
12import ers.issue_pb2 as ersissue
13import google.protobuf.message as msg
14
16 def __init__(self, config) :
17 self.bootstrap = config["bootstrap"]
18 if ( 'group_id' in config ) :
19 self.group = config["group_id"]
20 else:
21 self.group = ""
22 self.timeout = config["timeout"]
23 self.running = False
24 self.functions = dict()
25 self.thread = threading.Thread(target=self.message_loop)
26
27
28 # print("From Kafka server:",bootstrap)
29
30 def default_id(self) -> str:
31 node = socket.gethostname()
32 user = getpass.getuser()
33 process = os.getpid()
34 thread = threading.get_ident()
35 id = "{}-{}-{}-{}".format(node, user, process, thread)
36 return id
37
38 def add_callback(self, function, name, selection = '.*') -> bool:
39 if ( name in self.functions ) : return False
40
41 was_running = self.running
42 if (was_running) : self.stop()
43
44 prog = re.compile(selection)
45 self.functions[name] = [prog, function]
46
47 if (was_running) : self.start()
48 return True
49
50 def clear_callbacks(self):
51 if ( self.running ) :
52 self.stop()
53 self.functions.clear()
54
55 def remove_callback(self, name) -> bool:
56 if ( name not in sef.functions.keys() ) : return False
57
58 was_running = self.running
59 if (was_running) : self.stop()
60
61 self.functions.pop(name)
62
63 if ( was_running and len(self.functions)>0 ) : self.start()
64 return True
65 def start(self):
66 print("Starting run")
67 self.running = True
68 self.thread.start()
69
70 def stop(self) :
71 self.running = False
72 self.thread.join()
73
74 def message_loop(self) :
75 if (self.group == ""): group_id = self.default_id()
76 else: group_id = self.group
77
78 consumer = KafkaConsumer(bootstrap_servers=self.bootstrap,
79 group_id=group_id,
80 client_id=self.default_id(),
81 consumer_timeout_ms=self.timeout)
82
83 topics = ["ers_stream"]
84 consumer.subscribe(["monitoring." + s for s in topics])
85
86 print("ID:", group_id, "running with functions:", *self.functions.keys())
87
88 while ( self.running ) :
89 try:
90 message_it = iter(consumer)
91 message = next(message_it)
92 timestamp = message.timestamp
93 key = message.key.decode('ascii')
94
96
97 for function in self.functions.values() :
98 if function[0].match(key) :
99 issue = ersissue.IssueChain()
100 issue.ParseFromString( message.value )
101 function[1](issue)
102
103 except msg.DecodeError :
104 print("Could not parse message")
105 except StopIteration :
106 pass
107 except Exception as e:
108 print(e)
109
110 print ("Stop")
111
112
113
bool add_callback(self, function, name, selection='.*')
bool remove_callback(self, name)