1from __future__
import annotations
6from datetime
import datetime, tzinfo
7from pathlib
import Path
11from rich.console
import Console
12from rich.logging
import RichHandler
13from rich.theme
import Theme
19 "CRITICAL": logging.CRITICAL,
20 "ERROR": logging.ERROR,
21 "WARNING": logging.WARNING,
23 "DEBUG": logging.DEBUG,
24 "NOTSET": logging.NOTSET,
27logging_log_level_keys = list(logging_log_levels.keys())
28logging_log_level_values = list(logging_log_levels.values())
32 "kEventDriven": 1073741824,
33 "kDefault": 2147483648,
34 "kLowestPriority": 4294967295,
37oks_log_level_keys = list(oks_log_levels.keys())
38oks_log_level_values = list(oks_log_levels.values())
41 "kTopPriority":
"ERROR",
42 "kEventDriven":
"WARNING",
44 "kLowestPriority":
"DEBUG",
47log_level_keys = logging_log_level_keys + oks_log_level_keys
48log_level_values = logging_log_level_values + logging_log_level_values
52 """Custom error for unrecognised log level."""
56 if isinstance(level, str):
58 f
"Level '{level}' is not one of the recognised level names "
59 f
"({log_level_keys})."
61 elif isinstance(level, int):
63 f
"Level '{level}' is not one of the recognised level values "
64 f
"({log_level_values})."
67 err_msg = f
"Level '{level}' is not of a supported type."
72 f
"{logging_log_level_keys} python logging or "
73 f
"{oks_log_level_keys} for oks log levels."
76CONTEXT_SETTINGS = {
"help_option_names": [
"-h",
"--help"]}
77CONSOLE_THEMES = Theme({
"info":
"dim cyan",
"warning":
"magenta",
"danger":
"bold red"})
80full_log_format =
"%(asctime)s %(levelname)s %(filename)s %(name)s %(message)s"
82rich_log_format =
"%(filename)s %(name)s %(message)s"
84date_time_format =
"[%Y/%m/%d %H:%M:%S]"
89 """Custom logging formatter for DUNE DAQ applications."""
93 fmt: str = full_log_format,
94 datefmt: str = date_time_format,
95 tz: tzinfo = time_zone,
97 """Construct the logging formatter."""
102 def formatTime(self, record: logging.LogRecord, datefmt: str) -> str:
103 """Apply the correct formatting to the log record date and time."""
104 date_time = datetime.fromtimestamp(record.created, self.
tz)
105 return date_time.strftime(self.
datefmt)
107 def format(self, record: logging.LogRecord) -> logging.LogRecord:
108 """Apply the correct formatting to the log record."""
112 file_lineno = f
"{record.filename}:{record.lineno}"
113 record.filename = file_lineno.ljust(component_width)[:component_width]
115 name_colon = f
"{record.name}:"
116 if name_colon.startswith(
"drunc."):
117 name_colon = name_colon.replace(
"drunc.",
"")
118 record.name = name_colon.ljust(component_width)[:component_width]
120 level_name = record.levelname
121 record.levelname = level_name.ljust(component_width)[:component_width]
122 return super().
format(record)
126 """Initialize a Rich handler for terminal logging."""
128 width = os.get_terminal_size()[0]
131 handler = RichHandler(
132 console=Console(width=width),
133 omit_repeated_times=
False,
135 rich_tracebacks=
True,
137 tracebacks_width=width,
144 """Get the level name from its int value."""
145 if not isinstance(level, int):
147 for k, v
in logging_log_levels.items():
150 for k, v
in oks_log_levels.items():
152 return oks_to_logging_map[k]
153 err_str = f
"Requested log level with value {level} is not standard ({e_log_levels})"
154 raise ValueError(err_str)
from None
158 """Get the level int from its str value."""
159 if not isinstance(level, str):
161 for k, v
in logging_log_levels.items():
162 if k == level.upper():
164 for k
in oks_log_levels.keys():
166 return logging_log_levels[oks_to_logging_map[k]]
167 err_str = f
"Requested log level with value {level} is not standard ({e_log_levels})"
168 raise ValueError(err_str)
from None
173 conf: dict[str:str] |
"conffwk.dal.OpMonConf",
174 uri: dict[str:str] | conffwk.dal.OpMonURI,
178 """Parse the OpMonConf and OpMonURI."""
180 log.error(
"Missing opmon configuration, exiting.")
183 log.error(
"Missing opmon URI, exiting.")
187 uri.get(
"type")
if isinstance(uri, dict)
else getattr(uri,
"type",
None)
190 log.debug(
"Found OpMon type: %s", opmon_type)
193 "Missing 'type' in the opmon configuration, [yellow]using default value "
196 opmon_type =
"stdout"
198 path = uri.get(
"path")
if isinstance(uri, dict)
else getattr(uri,
"path",
None)
200 log.debug(
"Found OpMon path: %s", path)
201 elif opmon_type !=
"stdout":
202 log.error(
"Missing 'path' in the opmon configuration, exiting.")
207 log.debug(
"No OpMon path required for type 'stdout'.")
209 if opmon_type ==
"stream" and "monkafka" not in path:
210 msg =
"OpMon 'stream' configuration must publish to kafka, exiting."
211 raise ValueError(msg)
from None
212 if opmon_type !=
"stream" and "monkafka" in path:
213 msg =
"To use kafka, the type must be set to stream."
214 raise ValueError(msg)
from None
218 if opmon_type ==
"file" and not Path(path).parent.is_dir():
219 err_str =
"Requested directory to put file in does not exist."
220 raise ValueError(err_str)
from None
221 if "monkafka" in path:
222 bootstrap, topic = path.split(
"/", 1)
224 topic =
"opmon_stream"
225 log.debug(
"Using OpMon topic: [green]'%s'[/green]", topic)
226 log.debug(
"Using OpMon bootstrap: [green]'%s'[/green]", bootstrap)
229 conf.get(
"level")
if isinstance(conf, dict)
else getattr(conf,
"level",
None)
232 log.debug(
"Found OpMon level: [green]%s[/green]", level)
235 "Missing 'level' in the OpMon configuration, [yellow]using default "
238 level = logging.DEBUG
241 conf.get(
"interval_s")
242 if isinstance(conf, dict)
243 else getattr(conf,
"interval_s",
None)
246 log.debug(
"Found OpMon interval_s: %s", interval_s)
249 "Missing 'interval_s' in the opmon configuration, [yellow]using default "
255 opmon_type, bootstrap, topic, level, interval_s, path, session, application
260 """Map the OpMonId to a string."""
261 ret = opmon_id.get(
"session")
263 err_msg =
"Missing session in OpMonId."
264 raise ValueError(err_msg)
from None
266 application = opmon_id.get(
"application")
268 ret +=
"." + application
270 substructures = opmon_id.get(
"substructure")
271 for substructure
in substructures:
272 ret +=
"." + substructure
278 """Verify the file path can be opened."""
280 hook_position = file_path.find(hook)
282 if hook_position == -1:
285 fname = file_path[hook_position + len(hook) :]
288 slash_pos = fname.rfind(
"/")
290 dot_pos = fname.find(
".")
292 dot_pos = fname.find(
".", slash_pos)
295 fname +=
"." + origin +
".json"
297 fname = fname[:dot_pos] +
"." + origin + fname[dot_pos:]
300 with open(fname,
"a"):
303 err_str = f
"Can not open file {fname}"
304 raise OSError(err_str)
from None
None __init__(self, str|int level)
str extract_opmon_file_path(str file_path, OpMonId|None origin=None)
str logging_log_level_from_int(int level)
dict[str:str] parse_opmon_conf(logging.Logger log, dict[str:str]|"conffwk.dal.OpMonConf" conf, dict[str:str]|conffwk.dal.OpMonURI uri, str session, str application)
RichHandler setup_rich_handler()
int logging_log_level_from_str(str level)
str to_string(OpMonId opmon_id)