DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
ERSSubscriber.ERSSubscriber Class Reference

Public Member Functions

 __init__ (self, config)
 
str default_id (self)
 
bool add_callback (self, function, name, selection='.*')
 
 clear_callbacks (self)
 
bool remove_callback (self, name)
 
 start (self)
 
 stop (self)
 
 message_loop (self)
 

Public Attributes

 bootstrap = config["bootstrap"]
 
str group = config["group_id"]
 
 timeout = config["timeout"]
 
bool running = False
 
 functions = dict()
 
 thread = threading.Thread(target=self.message_loop)
 

Detailed Description

Definition at line 15 of file ERSSubscriber.py.

Constructor & Destructor Documentation

◆ __init__()

ERSSubscriber.ERSSubscriber.__init__ ( self,
config )

Definition at line 16 of file ERSSubscriber.py.

16 def __init__(self, config) :
17 self.bootstrap = config["bootstrap"]
18 if ( 'group_id' in config ) :
19 self.group = config["group_id"]
20 else:
21 self.group = ""
22 self.timeout = config["timeout"]
23 self.running = False
24 self.functions = dict()
25 self.thread = threading.Thread(target=self.message_loop)
26
27

Member Function Documentation

◆ add_callback()

bool ERSSubscriber.ERSSubscriber.add_callback ( self,
function,
name,
selection = '.*' )

Definition at line 38 of file ERSSubscriber.py.

38 def add_callback(self, function, name, selection = '.*') -> bool:
39 if ( name in self.functions ) : return False
40
41 was_running = self.running
42 if (was_running) : self.stop()
43
44 prog = re.compile(selection)
45 self.functions[name] = [prog, function]
46
47 if (was_running) : self.start()
48 return True
49

◆ clear_callbacks()

ERSSubscriber.ERSSubscriber.clear_callbacks ( self)

Definition at line 50 of file ERSSubscriber.py.

50 def clear_callbacks(self):
51 if ( self.running ) :
52 self.stop()
53 self.functions.clear()
54

◆ default_id()

str ERSSubscriber.ERSSubscriber.default_id ( self)

Definition at line 30 of file ERSSubscriber.py.

30 def default_id(self) -> str:
31 node = socket.gethostname()
32 user = getpass.getuser()
33 process = os.getpid()
34 thread = threading.get_ident()
35 id = "{}-{}-{}-{}".format(node, user, process, thread)
36 return id
37

◆ message_loop()

ERSSubscriber.ERSSubscriber.message_loop ( self)

Definition at line 74 of file ERSSubscriber.py.

74 def message_loop(self) :
75 if (self.group == ""): group_id = self.default_id()
76 else: group_id = self.group
77
78 consumer = KafkaConsumer(bootstrap_servers=self.bootstrap,
79 group_id=group_id,
80 client_id=self.default_id(),
81 consumer_timeout_ms=self.timeout)
82
83 topics = ["ers_stream"]
84 consumer.subscribe(["monitoring." + s for s in topics])
85
86 print("ID:", group_id, "running with functions:", *self.functions.keys())
87
88 while ( self.running ) :
89 try:
90 message_it = iter(consumer)
91 message = next(message_it)
92 timestamp = message.timestamp
93 key = message.key.decode('ascii')
94
96
97 for function in self.functions.values() :
98 if function[0].match(key) :
99 issue = ersissue.IssueChain()
100 issue.ParseFromString( message.value )
101 function[1](issue)
102
103 except msg.DecodeError :
104 print("Could not parse message")
105 except StopIteration :
106 pass
107 except Exception as e:
108 print(e)
109
110 print ("Stop")
111
112
113

◆ remove_callback()

bool ERSSubscriber.ERSSubscriber.remove_callback ( self,
name )

Definition at line 55 of file ERSSubscriber.py.

55 def remove_callback(self, name) -> bool:
56 if ( name not in sef.functions.keys() ) : return False
57
58 was_running = self.running
59 if (was_running) : self.stop()
60
61 self.functions.pop(name)
62
63 if ( was_running and len(self.functions)>0 ) : self.start()
64 return True

◆ start()

ERSSubscriber.ERSSubscriber.start ( self)

Definition at line 65 of file ERSSubscriber.py.

65 def start(self):
66 print("Starting run")
67 self.running = True
68 self.thread.start()
69

◆ stop()

ERSSubscriber.ERSSubscriber.stop ( self)

Definition at line 70 of file ERSSubscriber.py.

70 def stop(self) :
71 self.running = False
72 self.thread.join()
73

Member Data Documentation

◆ bootstrap

ERSSubscriber.ERSSubscriber.bootstrap = config["bootstrap"]

Definition at line 17 of file ERSSubscriber.py.

◆ functions

ERSSubscriber.ERSSubscriber.functions = dict()

Definition at line 24 of file ERSSubscriber.py.

◆ group

str ERSSubscriber.ERSSubscriber.group = config["group_id"]

Definition at line 19 of file ERSSubscriber.py.

◆ running

bool ERSSubscriber.ERSSubscriber.running = False

Definition at line 23 of file ERSSubscriber.py.

◆ thread

ERSSubscriber.ERSSubscriber.thread = threading.Thread(target=self.message_loop)

Definition at line 25 of file ERSSubscriber.py.

◆ timeout

ERSSubscriber.ERSSubscriber.timeout = config["timeout"]

Definition at line 22 of file ERSSubscriber.py.


The documentation for this class was generated from the following file: