DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
ERSPublisher.py
Go to the documentation of this file.
1import os
2import socket
3import inspect
4import ers.issue_pb2 as ersissue
5from datetime import datetime
6from kafka import KafkaProducer
7import time
8from enum import IntEnum, auto
9from typing import Union, Optional
10
11class SeverityLevel(IntEnum):
12 DEBUG = auto()
13 INFO = auto()
14 WARNING = auto()
15 ERROR = auto()
16 FATAL = auto()
17
18
21 self,
22 bootstrap:str = "monkafka.cern.ch:30092",
23 topic:str = "ers_stream",
24 application_name:str = "python",
25 package_name:str = "unknown"
26 ):
27 self.application_name = application_name
28 self.package_name = package_name
29 # Proceed with the rest of the setup
30 self.bootstrap = bootstrap
31
32 # The following code ensures that 'monitoring.' is prefixed if it's missing
33 if not topic.startswith('monitoring.'):
34 topic = 'monitoring.' + topic
35
36 self.topic = topic # Set the topic attribute correctly
37
38 # The rest of the KafkaProducer initialization...
39 self.producer = KafkaProducer(
40 bootstrap_servers=self.bootstrap,
41 value_serializer=lambda v: v.SerializeToString(),
42 key_serializer=lambda k: str(k).encode('utf-8')
43 )
44
46 self,
47 message_or_exception:Union[str,Exception],
48 severity:SeverityLevel = SeverityLevel.INFO.name,
49 name:Optional[str] = None,
50 cause:Union[None, Exception, ersissue.IssueChain, ersissue.SimpleIssue] = None,
51 context_kwargs:Optional[dict] = None,
52 ):
53 """Create and issue from text or exception and send to to the Kafka."""
54
55 # If the name is not provided, use the name of the exception
56 if name is None and isinstance(message_or_exception, Exception):
57 name = type(message_or_exception).__name__
58
59 # If the name is still not provided, use a generic name
60 if name is None:
61 name = "GenericPythonIssue"
62
63 issue_chain = self._create_issue_chain(
64 message_or_exception,
65 name = name,
66 severity = severity,
67 cause = cause,
68 context_kwargs = context_kwargs,
69 )
70 return self._publish_issue_chain(issue_chain)
71
72 def _publish_issue_chain(self, issue:ersissue.IssueChain):
73 """Publish an ERS issue_chain to the Kafka topic."""
74 return self.producer.send(self.topic, key=issue.session, value=issue)
75
76
78 self,
79 context_kwargs:Optional[dict] = None,
80 ) -> ersissue.Context:
81
82 """Generate the context for an issue."""
83 # Walk back up the stack and find the frame for the original caller
84 frame = inspect.currentframe()
85 while hasattr(frame, "f_code"):
86 co = frame.f_code
87 filename = os.path.normcase(co.co_filename)
88 if 'ERSPublisher.py' not in filename:
89 # Found the frame of the original caller
90 break
91 frame = frame.f_back
92
93 # If no such frame is found, default to the current frame
94 if frame is None:
95 frame = inspect.currentframe()
96
97 context = dict( # A guess for the context
98 cwd = os.getcwd(),
99 file_name = frame.f_code.co_filename,
100 function_name = frame.f_code.co_name,
101 host_name = socket.gethostname(),
102 line_number = frame.f_lineno,
103 user_name = os.getlogin(),
104 package_name = self.package_name,
105 application_name = self.application_name,
106 )
107
108 if context_kwargs:
109 context.update(context_kwargs)
110
111 return ersissue.Context(**context)
112
114 self,
115 exc:Exception,
116 severity:SeverityLevel = SeverityLevel.WARNING.name,
117 context_kwargs:Optional[dict] = None,
118 ) -> ersissue.SimpleIssue:
119
120 """Converts an exception to a SimpleIssue."""
121
122 current_time = time.time_ns() # Get current time in nanoseconds
123 import inspect
124 mro = [the_class.__name__ for the_class in inspect.getmro(type(exc)) if the_class.__name__ != "object"][::-1]
125 return ersissue.SimpleIssue(
126 context=self._generate_context(context_kwargs),
127 name=type(exc).__name__,
128 message=str(exc),
129 time=current_time,
130 severity=severity,
131 inheritance=["PythonIssue"] + mro,
132 )
133
135 self,
136 message:Union[Exception,str],
137 name:str = "GenericPythonIssue",
138 severity:SeverityLevel = SeverityLevel.INFO.name,
139 cause:Union[Exception,ersissue.SimpleIssue,ersissue.IssueChain] = None,
140 context_kwargs:Optional[dict] = None,
141 ):
142 """Create an ERS IssueChain with minimal user input."""
143 # This creates an issue chain with a given name, message, and severity
144 # The message can be a Python exception; in that case the name is also overwritten, with the name of the exception
145 # The cause can be another issue (chain or simple) or an exception
146
147 current_time = time.time_ns()
148
149 if isinstance(message,Exception):
150 # If the issue is created from an exception, set the name and inheritance
151 issue = self._exception_to_issue(message,severity, context_kwargs) # Use the existing function to create an issue from the exception
152 issue.time = current_time # The time is overwritten as this is the time of the call of the function
153 else:
154 # For non-exception issues, continue as normal
155 inheritance_list = ["PythonIssue", name]
156 issue = ersissue.SimpleIssue(
157 context = self._generate_context(context_kwargs),
158 name=name,
159 message=message,
160 time=current_time,
161 severity=str(severity),
162 inheritance=inheritance_list
163 )
164
165 issue_chain = ersissue.IssueChain(
166 final=issue,
167 session=os.getenv('DUNEDAQ_PARTITION', 'Unknown'),
168 application="python"
169 )
170
171 # Process the cause and add to issue_chain.causes, but not to issue.inheritance
172 if cause:
173 if isinstance(cause, Exception):
174 # Convert exception to a SimpleIssue and append to causes of issue_chain
175 cause_issue = self._exception_to_issue(cause,severity)
176 issue_chain.causes.extend([cause_issue])
177 elif isinstance(cause, ersissue.SimpleIssue):
178 # Add the cause directly to the causes of issue_chain
179 issue_chain.causes.extend([cause])
180 elif isinstance(cause, ersissue.IssueChain):
181 # Set the final cause of the existing chain as the first cause of the new chain
182 issue_chain.causes.append(cause.final)
183 issue_chain.causes.extend(cause.causes)
184
185 return issue_chain
186
187 def __del__(self):
188 """Destructor-like method to clean up resources."""
189 if self.producer:
190 self.producer.close()
191
192
193class ERSException(Exception):
194 """Custom exception which can also be treated as an ERS issue."""
195
196 def __init__(self, message):
197 super().__init__(message)
198 self.message = message
199
ersissue.SimpleIssue _exception_to_issue(self, Exception exc, SeverityLevel severity=SeverityLevel.WARNING.name, Optional[dict] context_kwargs=None)
_publish_issue_chain(self, ersissue.IssueChain issue)
publish(self, Union[str, Exception] message_or_exception, SeverityLevel severity=SeverityLevel.INFO.name, Optional[str] name=None, Union[None, Exception, ersissue.IssueChain, ersissue.SimpleIssue] cause=None, Optional[dict] context_kwargs=None)
_create_issue_chain(self, Union[Exception, str] message, str name="GenericPythonIssue", SeverityLevel severity=SeverityLevel.INFO.name, Union[Exception, ersissue.SimpleIssue, ersissue.IssueChain] cause=None, Optional[dict] context_kwargs=None)
__init__(self, str bootstrap="monkafka.cern.ch:30092", str topic="ers_stream", str application_name="python", str package_name="unknown")
ersissue.Context _generate_context(self, Optional[dict] context_kwargs=None)