DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
utils.py
Go to the documentation of this file.
1from __future__ import annotations
2
3import logging
4import os
5import sys
6from datetime import datetime, tzinfo
7from pathlib import Path
8
9import conffwk
10import pytz
11from rich.console import Console
12from rich.logging import RichHandler
13from rich.theme import Theme
14
15from opmonlib.conf import OpMonConf
16from opmonlib.opmon_entry_pb2 import OpMonId
17
18logging_log_levels = {
19 "CRITICAL": logging.CRITICAL,
20 "ERROR": logging.ERROR,
21 "WARNING": logging.WARNING,
22 "INFO": logging.INFO,
23 "DEBUG": logging.DEBUG,
24 "NOTSET": logging.NOTSET,
25}
26
27logging_log_level_keys = list(logging_log_levels.keys())
28logging_log_level_values = list(logging_log_levels.values())
29
30oks_log_levels = {
31 "kTopPriority": 0,
32 "kEventDriven": 1073741824,
33 "kDefault": 2147483648,
34 "kLowestPriority": 4294967295,
35}
36
37oks_log_level_keys = list(oks_log_levels.keys())
38oks_log_level_values = list(oks_log_levels.values())
39
40oks_to_logging_map = {
41 "kTopPriority": "ERROR",
42 "kEventDriven": "WARNING",
43 "kDefault": "INFO",
44 "kLowestPriority": "DEBUG",
45}
46
47log_level_keys = logging_log_level_keys + oks_log_level_keys
48log_level_values = logging_log_level_values + logging_log_level_values
49
50
51class LogLevelError(Exception):
52 """Custom error for unrecognised log level."""
53
54 def __init__(self, level: str | int) -> None:
55 """Constructor."""
56 if isinstance(level, str):
57 err_msg = (
58 f"Level '{level}' is not one of the recognised level names "
59 f"({log_level_keys})."
60 )
61 elif isinstance(level, int):
62 err_msg = (
63 f"Level '{level}' is not one of the recognised level values "
64 f"({log_level_values})."
65 )
66 else:
67 err_msg = f"Level '{level}' is not of a supported type."
68 super().__init__(err_msg)
69
70
71e_log_levels = (
72 f"{logging_log_level_keys} python logging or "
73 f"{oks_log_level_keys} for oks log levels."
74)
75
76CONTEXT_SETTINGS = {"help_option_names": ["-h", "--help"]}
77CONSOLE_THEMES = Theme({"info": "dim cyan", "warning": "magenta", "danger": "bold red"})
78
79# TODO: for production, remove the filename
80full_log_format = "%(asctime)s %(levelname)s %(filename)s %(name)s %(message)s"
81# TODO: for production, remove the filename
82rich_log_format = "%(filename)s %(name)s %(message)s"
83# TODO: include timezone as %Z when the RichHandler starts supporting it in the tty.
84date_time_format = "[%Y/%m/%d %H:%M:%S]"
85time_zone = pytz.utc
86
87
88class LoggingFormatter(logging.Formatter):
89 """Custom logging formatter for DUNE DAQ applications."""
90
92 self,
93 fmt: str = full_log_format,
94 datefmt: str = date_time_format,
95 tz: tzinfo = time_zone,
96 ) -> None:
97 """Construct the logging formatter."""
98 super().__init__(fmt, datefmt)
99 self.tz = tz
100 self.datefmt = datefmt
101
102 def formatTime(self, record: logging.LogRecord, datefmt: str) -> str: # noqa: N802
103 """Apply the correct formatting to the log record date and time."""
104 date_time = datetime.fromtimestamp(record.created, self.tz)
105 return date_time.strftime(self.datefmt)
106
107 def format(self, record: logging.LogRecord) -> logging.LogRecord:
108 """Apply the correct formatting to the log record."""
109 record.asctime = self.formatTime(record, self.datefmt)
110 # TODO: for production, remove filename and lineno entries
111 component_width = 30
112 file_lineno = f"{record.filename}:{record.lineno}"
113 record.filename = file_lineno.ljust(component_width)[:component_width]
114 component_width = 45
115 name_colon = f"{record.name}:"
116 if name_colon.startswith("drunc."):
117 name_colon = name_colon.replace("drunc.", "")
118 record.name = name_colon.ljust(component_width)[:component_width]
119 component_width = 10
120 level_name = record.levelname
121 record.levelname = level_name.ljust(component_width)[:component_width]
122 return super().format(record)
123
124
125def setup_rich_handler() -> RichHandler:
126 """Initialize a Rich handler for terminal logging."""
127 try:
128 width = os.get_terminal_size()[0]
129 except OSError:
130 width = 150
131 handler = RichHandler(
132 console=Console(width=width),
133 omit_repeated_times=False,
134 markup=True,
135 rich_tracebacks=True,
136 show_path=False,
137 tracebacks_width=width,
138 )
139 handler.setFormatter(LoggingFormatter(fmt=rich_log_format))
140 return handler
141
142
143def logging_log_level_from_int(level: int) -> str:
144 """Get the level name from its int value."""
145 if not isinstance(level, int):
146 return level
147 for k, v in logging_log_levels.items():
148 if v == level:
149 return k
150 for k, v in oks_log_levels.items():
151 if v == level:
152 return oks_to_logging_map[k]
153 err_str = f"Requested log level with value {level} is not standard ({e_log_levels})"
154 raise ValueError(err_str) from None
155
156
157def logging_log_level_from_str(level: str) -> int:
158 """Get the level int from its str value."""
159 if not isinstance(level, str):
160 return level
161 for k, v in logging_log_levels.items():
162 if k == level.upper():
163 return v
164 for k in oks_log_levels.keys():
165 if k == level:
166 return logging_log_levels[oks_to_logging_map[k]]
167 err_str = f"Requested log level with value {level} is not standard ({e_log_levels})"
168 raise ValueError(err_str) from None
169
170
172 log: logging.Logger,
173 conf: dict[str:str] | "conffwk.dal.OpMonConf", # noqa: UP037
174 uri: dict[str:str] | conffwk.dal.OpMonURI, # noqa: UP037.
175 session: str,
176 application: str,
177) -> dict[str:str]:
178 """Parse the OpMonConf and OpMonURI."""
179 if not conf:
180 log.error("Missing opmon configuration, exiting.")
181 sys.exit(1)
182 if not uri:
183 log.error("Missing opmon URI, exiting.")
184 sys.exit(1)
185
186 opmon_type = (
187 uri.get("type") if isinstance(uri, dict) else getattr(uri, "type", None)
188 )
189 if opmon_type:
190 log.debug("Found OpMon type: %s", opmon_type)
191 else:
192 log.debug(
193 "Missing 'type' in the opmon configuration, [yellow]using default value "
194 "'stdout'[/yellow]."
195 )
196 opmon_type = "stdout"
197
198 path = uri.get("path") if isinstance(uri, dict) else getattr(uri, "path", None)
199 if path:
200 log.debug("Found OpMon path: %s", path)
201 elif opmon_type != "stdout":
202 log.error("Missing 'path' in the opmon configuration, exiting.")
203 sys.exit(1)
204 else:
205 if path == []:
206 path = ""
207 log.debug("No OpMon path required for type 'stdout'.")
208
209 bootstrap = None
210 topic = None
211 if opmon_type == "file" and not Path(path).parent.is_dir():
212 err_str = "Requested directory to put file in does not exist."
213 raise ValueError(err_str) from None
214 if "monkafka" in path:
215 bootstrap, topic = path.split("/", 1)
216 if not topic:
217 topic = "opmon_stream"
218 log.debug("Using OpMon topic: [green]'%s'[/green]", topic)
219 log.debug("Using OpMon bootstrap: [green]'%s'[/green]", bootstrap)
220
221 level = (
222 conf.get("level") if isinstance(conf, dict) else getattr(conf, "level", None)
223 )
224 if level:
225 log.debug("Found OpMon level: [green]%s[/green]", level)
226 else:
227 log.debug(
228 "Missing 'level' in the OpMon configuration, [yellow]using default "
229 "'DEBUG'[/yellow]."
230 )
231 level = logging.DEBUG
232
233 interval_s = (
234 conf.get("interval_s")
235 if isinstance(conf, dict)
236 else getattr(conf, "interval_s", None)
237 )
238 if interval_s:
239 log.debug("Found OpMon interval_s: %s", interval_s)
240 else:
241 log.debug(
242 "Missing 'interval_s' in the opmon configuration, [yellow]using default "
243 "10s[/yellow]."
244 )
245 interval_s = 10.0
246
247 return OpMonConf(
248 opmon_type, bootstrap, topic, level, interval_s, path, session, application
249 )
250
251
252def to_string(opmon_id: OpMonId) -> str:
253 """Map the OpMonId to a string."""
254 ret = opmon_id.get("session")
255 if not ret:
256 err_msg = "Missing session in OpMonId."
257 raise ValueError(err_msg) from None
258
259 application = opmon_id.get("application")
260 if application:
261 ret += "." + application
262
263 substructures = opmon_id.get("substructure")
264 for substructure in substructures:
265 ret += "." + substructure
266
267 return ret
268
269
270def extract_opmon_file_path(file_path: str, origin: OpMonId | None = None) -> str:
271 """Verify the file path can be opened."""
272 hook = "://"
273 hook_position = file_path.find(hook)
274 fname = None
275 if hook_position == -1:
276 fname = file_path
277 else:
278 fname = file_path[hook_position + len(hook) :]
279
280 if origin:
281 slash_pos = fname.rfind("/")
282 if slash_pos == -1:
283 dot_pos = fname.find(".")
284 else:
285 dot_pos = fname.find(".", slash_pos)
286 origin = to_string(origin)
287 if dot_pos == -1:
288 fname += "." + origin + ".json"
289 else:
290 fname = fname[:dot_pos] + "." + origin + fname[dot_pos:]
291
292 try:
293 with open(fname, "a"):
294 pass
295 except OSError:
296 err_str = f"Can not open file {fname}"
297 raise OSError(err_str) from None
298
299 return fname
None __init__(self, str|int level)
Definition utils.py:54
None __init__(self, str fmt=full_log_format, str datefmt=date_time_format, tzinfo tz=time_zone)
Definition utils.py:96
logging.LogRecord format(self, logging.LogRecord record)
Definition utils.py:107
str formatTime(self, logging.LogRecord record, str datefmt)
Definition utils.py:102
str extract_opmon_file_path(str file_path, OpMonId|None origin=None)
Definition utils.py:270
str logging_log_level_from_int(int level)
Definition utils.py:143
dict[str:str] parse_opmon_conf(logging.Logger log, dict[str:str]|"conffwk.dal.OpMonConf" conf, dict[str:str]|conffwk.dal.OpMonURI uri, str session, str application)
Definition utils.py:177
RichHandler setup_rich_handler()
Definition utils.py:125
int logging_log_level_from_str(str level)
Definition utils.py:157
str to_string(OpMonId opmon_id)
Definition utils.py:252