DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
ERSPublisher.py
Go to the documentation of this file.
1import os
2import socket
3import inspect
4import ers.issue_pb2 as ersissue
5from datetime import datetime
6from kafka import KafkaProducer
7import time
8from enum import IntEnum, auto
9from typing import Union, Optional
10
11class SeverityLevel(IntEnum):
12 DEBUG = auto()
13 INFO = auto()
14 WARNING = auto()
15 ERROR = auto()
16 FATAL = auto()
17
18
21 self,
22 bootstrap:str = "monkafka.cern.ch:30092",
23 topic:str = "ers_stream",
24 application_name:str = "python",
25 package_name:str = "unknown"
26 ):
27 self.application_name = application_name
28 self.package_name = package_name
29 # Proceed with the rest of the setup
30 self.bootstrap = bootstrap
31
32 # The following code ensures that 'monitoring.' is prefixed if it's missing
33 if not topic.startswith('monitoring.'):
34 topic = 'monitoring.' + topic
35
36 self.topic = topic # Set the topic attribute correctly
37
38 # The rest of the KafkaProducer initialization...
39 self.producer = KafkaProducer(
40 bootstrap_servers=self.bootstrap,
41 value_serializer=lambda v: v.SerializeToString(),
42 key_serializer=lambda k: str(k).encode('utf-8')
43 )
44
46 self,
47 message_or_exception:Union[str,Exception],
48 severity:SeverityLevel = SeverityLevel.INFO.name,
49 name:Optional[str] = None,
50 cause:Union[None, Exception, ersissue.IssueChain, ersissue.SimpleIssue] = None,
51 context_kwargs:Optional[dict] = None,
52 ):
53 """Create and issue from text or exception and send to to the Kafka."""
54
55 # If the name is not provided, use the name of the exception
56 if name is None and isinstance(message_or_exception, Exception):
57 name = type(message_or_exception).__name__
58
59 # If the name is still not provided, use a generic name
60 if name is None:
61 name = "GenericPythonIssue"
62
63 issue_chain = self._create_issue_chain(
64 message_or_exception,
65 name = name,
66 severity = severity,
67 cause = cause,
68 context_kwargs = context_kwargs,
69 )
70 return self._publish_issue_chain(issue_chain)
71
72 def _publish_issue_chain(self, issue:ersissue.IssueChain):
73 """Publish an ERS issue_chain to the Kafka topic."""
74 return self.producer.send(self.topic, key=issue.session, value=issue)
75
76
78 self,
79 context_kwargs:Optional[dict] = None,
80 ) -> ersissue.Context:
81
82 """Generate the context for an issue."""
83 # Walk back up the stack and find the frame for the original caller
84 frame = inspect.currentframe()
85 while hasattr(frame, "f_code"):
86 co = frame.f_code
87 filename = os.path.normcase(co.co_filename)
88 if 'ERSPublisher.py' not in filename:
89 # Found the frame of the original caller
90 break
91 frame = frame.f_back
92
93 # If no such frame is found, default to the current frame
94 if frame is None:
95 frame = inspect.currentframe()
96
97 context = dict( # A guess for the context
98 cwd = os.getcwd(),
99 file_name = frame.f_code.co_filename,
100 function_name = frame.f_code.co_name,
101 host_name = socket.gethostname(),
102 line_number = frame.f_lineno,
103 user_name = os.getlogin(),
104 user_id = os.geteuid(),
105 package_name = self.package_name,
106 application_name = self.application_name,
107 )
108
109 if context_kwargs:
110 context.update(context_kwargs)
111
112 return ersissue.Context(**context)
113
115 self,
116 exc:Exception,
117 severity:SeverityLevel = SeverityLevel.WARNING.name,
118 context_kwargs:Optional[dict] = None,
119 ) -> ersissue.SimpleIssue:
120
121 """Converts an exception to a SimpleIssue."""
122
123 current_time = time.time_ns() # Get current time in nanoseconds
124 import inspect
125 mro = [the_class.__name__ for the_class in inspect.getmro(type(exc)) if the_class.__name__ != "object"][::-1]
126 return ersissue.SimpleIssue(
127 context=self._generate_context(context_kwargs),
128 name=type(exc).__name__,
129 message=str(exc),
130 time=current_time,
131 severity=severity,
132 inheritance=["PythonIssue"] + mro,
133 )
134
136 self,
137 message:Union[Exception,str],
138 name:str = "GenericPythonIssue",
139 severity:SeverityLevel = SeverityLevel.INFO.name,
140 cause:Union[Exception,ersissue.SimpleIssue,ersissue.IssueChain] = None,
141 context_kwargs:Optional[dict] = None,
142 ):
143 """Create an ERS IssueChain with minimal user input."""
144 # This creates an issue chain with a given name, message, and severity
145 # The message can be a Python exception; in that case the name is also overwritten, with the name of the exception
146 # The cause can be another issue (chain or simple) or an exception
147
148 current_time = time.time_ns()
149
150 if isinstance(message,Exception):
151 # If the issue is created from an exception, set the name and inheritance
152 issue = self._exception_to_issue(message,severity, context_kwargs) # Use the existing function to create an issue from the exception
153 issue.time = current_time # The time is overwritten as this is the time of the call of the function
154 else:
155 # For non-exception issues, continue as normal
156 inheritance_list = ["PythonIssue", name]
157 issue = ersissue.SimpleIssue(
158 context = self._generate_context(context_kwargs),
159 name=name,
160 message=message,
161 time=current_time,
162 severity=str(severity),
163 inheritance=inheritance_list
164 )
165
166 issue_chain = ersissue.IssueChain(
167 final=issue,
168 session=os.getenv('DUNEDAQ_PARTITION', 'Unknown'),
169 application="python"
170 )
171
172 # Process the cause and add to issue_chain.causes, but not to issue.inheritance
173 if cause:
174 if isinstance(cause, Exception):
175 # Convert exception to a SimpleIssue and append to causes of issue_chain
176 cause_issue = self._exception_to_issue(cause,severity)
177 issue_chain.causes.extend([cause_issue])
178 elif isinstance(cause, ersissue.SimpleIssue):
179 # Add the cause directly to the causes of issue_chain
180 issue_chain.causes.extend([cause])
181 elif isinstance(cause, ersissue.IssueChain):
182 # Set the final cause of the existing chain as the first cause of the new chain
183 issue_chain.causes.append(cause.final)
184 issue_chain.causes.extend(cause.causes)
185
186 return issue_chain
187
188 def __del__(self):
189 """Destructor-like method to clean up resources."""
190 if self.producer:
191 self.producer.close()
192
193
194class ERSException(Exception):
195 """Custom exception which can also be treated as an ERS issue."""
196
197 def __init__(self, message):
198 super().__init__(message)
199 self.message = message
200
ersissue.SimpleIssue _exception_to_issue(self, Exception exc, SeverityLevel severity=SeverityLevel.WARNING.name, Optional[dict] context_kwargs=None)
_publish_issue_chain(self, ersissue.IssueChain issue)
publish(self, Union[str, Exception] message_or_exception, SeverityLevel severity=SeverityLevel.INFO.name, Optional[str] name=None, Union[None, Exception, ersissue.IssueChain, ersissue.SimpleIssue] cause=None, Optional[dict] context_kwargs=None)
_create_issue_chain(self, Union[Exception, str] message, str name="GenericPythonIssue", SeverityLevel severity=SeverityLevel.INFO.name, Union[Exception, ersissue.SimpleIssue, ersissue.IssueChain] cause=None, Optional[dict] context_kwargs=None)
__init__(self, str bootstrap="monkafka.cern.ch:30092", str topic="ers_stream", str application_name="python", str package_name="unknown")
ersissue.Context _generate_context(self, Optional[dict] context_kwargs=None)