76 else: group_id = self.
group
78 consumer = KafkaConsumer(bootstrap_servers=self.
bootstrap,
81 consumer_timeout_ms=self.
timeout)
83 topics = [
"ers_stream"]
84 consumer.subscribe([
"monitoring." + s
for s
in topics])
86 print(
"ID:", group_id,
"running with functions:", *self.
functions.keys())
90 message_it = iter(consumer)
91 message = next(message_it)
92 timestamp = message.timestamp
93 key = message.key.decode(
'ascii')
98 if function[0].match(key) :
99 issue = ersissue.IssueChain()
100 issue.ParseFromString( message.value )
103 except msg.DecodeError :
104 print(
"Could not parse message")
105 except StopIteration :
107 except Exception
as e: