DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
ERSPublisher.ERSPublisher Class Reference

Public Member Functions

 __init__ (self, str bootstrap="monkafka.cern.ch:30092", str topic="ers_stream", str application_name="python", str package_name="unknown")
 
 publish (self, Union[str, Exception] message_or_exception, SeverityLevel severity=SeverityLevel.INFO.name, Optional[str] name=None, Union[None, Exception, ersissue.IssueChain, ersissue.SimpleIssue] cause=None, Optional[dict] context_kwargs=None)
 
 __del__ (self)
 

Public Attributes

 application_name = application_name
 
 package_name = package_name
 
 bootstrap = bootstrap
 
 topic = topic
 
 producer
 

Protected Member Functions

 _publish_issue_chain (self, ersissue.IssueChain issue)
 
ersissue.Context _generate_context (self, Optional[dict] context_kwargs=None)
 
ersissue.SimpleIssue _exception_to_issue (self, Exception exc, SeverityLevel severity=SeverityLevel.WARNING.name, Optional[dict] context_kwargs=None)
 
 _create_issue_chain (self, Union[Exception, str] message, str name="GenericPythonIssue", SeverityLevel severity=SeverityLevel.INFO.name, Union[Exception, ersissue.SimpleIssue, ersissue.IssueChain] cause=None, Optional[dict] context_kwargs=None)
 

Detailed Description

Definition at line 19 of file ERSPublisher.py.

Constructor & Destructor Documentation

◆ __init__()

ERSPublisher.ERSPublisher.__init__ ( self,
str bootstrap = "monkafka.cern.ch:30092",
str topic = "ers_stream",
str application_name = "python",
str package_name = "unknown" )

Definition at line 20 of file ERSPublisher.py.

26 ):
27 self.application_name = application_name
28 self.package_name = package_name
29 # Proceed with the rest of the setup
30 self.bootstrap = bootstrap
31
32 # The following code ensures that 'monitoring.' is prefixed if it's missing
33 if not topic.startswith('monitoring.'):
34 topic = 'monitoring.' + topic
35
36 self.topic = topic # Set the topic attribute correctly
37
38 # The rest of the KafkaProducer initialization...
39 self.producer = KafkaProducer(
40 bootstrap_servers=self.bootstrap,
41 value_serializer=lambda v: v.SerializeToString(),
42 key_serializer=lambda k: str(k).encode('utf-8')
43 )
44

◆ __del__()

ERSPublisher.ERSPublisher.__del__ ( self)
Destructor-like method to clean up resources.

Definition at line 187 of file ERSPublisher.py.

187 def __del__(self):
188 """Destructor-like method to clean up resources."""
189 if self.producer:
190 self.producer.close()
191
192

Member Function Documentation

◆ _create_issue_chain()

ERSPublisher.ERSPublisher._create_issue_chain ( self,
Union[Exception,str] message,
str name = "GenericPythonIssue",
SeverityLevel severity = SeverityLevel.INFO.name,
Union[Exception,ersissue.SimpleIssue,ersissue.IssueChain] cause = None,
Optional[dict] context_kwargs = None )
protected
Create an ERS IssueChain with minimal user input.

Definition at line 134 of file ERSPublisher.py.

141 ):
142 """Create an ERS IssueChain with minimal user input."""
143 # This creates an issue chain with a given name, message, and severity
144 # The message can be a Python exception; in that case the name is also overwritten, with the name of the exception
145 # The cause can be another issue (chain or simple) or an exception
146
147 current_time = time.time_ns()
148
149 if isinstance(message,Exception):
150 # If the issue is created from an exception, set the name and inheritance
151 issue = self._exception_to_issue(message,severity, context_kwargs) # Use the existing function to create an issue from the exception
152 issue.time = current_time # The time is overwritten as this is the time of the call of the function
153 else:
154 # For non-exception issues, continue as normal
155 inheritance_list = ["PythonIssue", name]
156 issue = ersissue.SimpleIssue(
157 context = self._generate_context(context_kwargs),
158 name=name,
159 message=message,
160 time=current_time,
161 severity=str(severity),
162 inheritance=inheritance_list
163 )
164
165 issue_chain = ersissue.IssueChain(
166 final=issue,
167 session=os.getenv('DUNEDAQ_PARTITION', 'Unknown'),
168 application="python"
169 )
170
171 # Process the cause and add to issue_chain.causes, but not to issue.inheritance
172 if cause:
173 if isinstance(cause, Exception):
174 # Convert exception to a SimpleIssue and append to causes of issue_chain
175 cause_issue = self._exception_to_issue(cause,severity)
176 issue_chain.causes.extend([cause_issue])
177 elif isinstance(cause, ersissue.SimpleIssue):
178 # Add the cause directly to the causes of issue_chain
179 issue_chain.causes.extend([cause])
180 elif isinstance(cause, ersissue.IssueChain):
181 # Set the final cause of the existing chain as the first cause of the new chain
182 issue_chain.causes.append(cause.final)
183 issue_chain.causes.extend(cause.causes)
184
185 return issue_chain
186

◆ _exception_to_issue()

ersissue.SimpleIssue ERSPublisher.ERSPublisher._exception_to_issue ( self,
Exception exc,
SeverityLevel severity = SeverityLevel.WARNING.name,
Optional[dict] context_kwargs = None )
protected
Converts an exception to a SimpleIssue.

Definition at line 113 of file ERSPublisher.py.

118 ) -> ersissue.SimpleIssue:
119
120 """Converts an exception to a SimpleIssue."""
121
122 current_time = time.time_ns() # Get current time in nanoseconds
123 import inspect
124 mro = [the_class.__name__ for the_class in inspect.getmro(type(exc)) if the_class.__name__ != "object"][::-1]
125 return ersissue.SimpleIssue(
126 context=self._generate_context(context_kwargs),
127 name=type(exc).__name__,
128 message=str(exc),
129 time=current_time,
130 severity=severity,
131 inheritance=["PythonIssue"] + mro,
132 )
133

◆ _generate_context()

ersissue.Context ERSPublisher.ERSPublisher._generate_context ( self,
Optional[dict] context_kwargs = None )
protected
Generate the context for an issue.

Definition at line 77 of file ERSPublisher.py.

80 ) -> ersissue.Context:
81
82 """Generate the context for an issue."""
83 # Walk back up the stack and find the frame for the original caller
84 frame = inspect.currentframe()
85 while hasattr(frame, "f_code"):
86 co = frame.f_code
87 filename = os.path.normcase(co.co_filename)
88 if 'ERSPublisher.py' not in filename:
89 # Found the frame of the original caller
90 break
91 frame = frame.f_back
92
93 # If no such frame is found, default to the current frame
94 if frame is None:
95 frame = inspect.currentframe()
96
97 context = dict( # A guess for the context
98 cwd = os.getcwd(),
99 file_name = frame.f_code.co_filename,
100 function_name = frame.f_code.co_name,
101 host_name = socket.gethostname(),
102 line_number = frame.f_lineno,
103 user_name = os.getlogin(),
104 package_name = self.package_name,
105 application_name = self.application_name,
106 )
107
108 if context_kwargs:
109 context.update(context_kwargs)
110
111 return ersissue.Context(**context)
112

◆ _publish_issue_chain()

ERSPublisher.ERSPublisher._publish_issue_chain ( self,
ersissue.IssueChain issue )
protected
Publish an ERS issue_chain to the Kafka topic.

Definition at line 72 of file ERSPublisher.py.

72 def _publish_issue_chain(self, issue:ersissue.IssueChain):
73 """Publish an ERS issue_chain to the Kafka topic."""
74 return self.producer.send(self.topic, key=issue.session, value=issue)
75
76

◆ publish()

ERSPublisher.ERSPublisher.publish ( self,
Union[str,Exception] message_or_exception,
SeverityLevel severity = SeverityLevel.INFO.name,
Optional[str] name = None,
Union[None, Exception, ersissue.IssueChain, ersissue.SimpleIssue] cause = None,
Optional[dict] context_kwargs = None )
Create and issue from text or exception and send to to the Kafka.

Definition at line 45 of file ERSPublisher.py.

52 ):
53 """Create and issue from text or exception and send to to the Kafka."""
54
55 # If the name is not provided, use the name of the exception
56 if name is None and isinstance(message_or_exception, Exception):
57 name = type(message_or_exception).__name__
58
59 # If the name is still not provided, use a generic name
60 if name is None:
61 name = "GenericPythonIssue"
62
63 issue_chain = self._create_issue_chain(
64 message_or_exception,
65 name = name,
66 severity = severity,
67 cause = cause,
68 context_kwargs = context_kwargs,
69 )
70 return self._publish_issue_chain(issue_chain)
71

Member Data Documentation

◆ application_name

ERSPublisher.ERSPublisher.application_name = application_name

Definition at line 27 of file ERSPublisher.py.

◆ bootstrap

ERSPublisher.ERSPublisher.bootstrap = bootstrap

Definition at line 30 of file ERSPublisher.py.

◆ package_name

ERSPublisher.ERSPublisher.package_name = package_name

Definition at line 28 of file ERSPublisher.py.

◆ producer

ERSPublisher.ERSPublisher.producer
Initial value:
= KafkaProducer(
bootstrap_servers=self.bootstrap,
value_serializer=lambda v: v.SerializeToString(),
key_serializer=lambda k: str(k).encode('utf-8')
)

Definition at line 39 of file ERSPublisher.py.

◆ topic

ERSPublisher.ERSPublisher.topic = topic

Definition at line 36 of file ERSPublisher.py.


The documentation for this class was generated from the following file: