DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
__main__.py
Go to the documentation of this file.
1import argparse
2import copy
3import gc
4import logging
5import re
6import resource
7import sys
8from collections import defaultdict
9from dataclasses import dataclass, field
10from pathlib import Path
11from tqdm import tqdm
12from typing import Any
13
14import conffwk
15import daqdataformats
16import detchannelmaps
17import detdataformats
18import trgdataformats
19from daqconf.consolidate import copy_configuration
20from hdf5libs import HDF5RawDataFile
21
22
23def setup_logging(verbose: bool) -> None:
24 """
25 Set up logging based on the verbose flag.
26 If verbose flag is provided, set the logging level to DEBUG to show all logs.
27 Otherwise, set it to INFO to only show INFO level logs and above.
28 """
29 if verbose:
30 logging.basicConfig(level=logging.DEBUG, format="%(asctime)s - %(levelname)s - %(message)s")
31 logging.debug("Verbose logging configured!")
32 else:
33 logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
34
35def set_mem_limit(mem_limit: int) -> None:
36 """
37 As a safety measure we set a memory limit for the process.
38 Should not be needed here as hdf5 processing is minimal.
39 """
40 GB = 1024**3
41 memory_limit = mem_limit * GB
42 resource.setrlimit(resource.RLIMIT_AS, (memory_limit, memory_limit))
43 logging.debug("Setting memory limit to %i GBs", memory_limit/GB)
44
45def cleanup() -> None:
46 gc.collect()
47
48# custom type to hold a map of all (unique) ReadoutUnits and the corresponding planes for each
49@dataclass
51 data: dict[str, set[int]] = field(default_factory=lambda: defaultdict(set))
52
53 def add_value(self, rou: str, plane: int) -> None:
54 self.data[rou].add(plane)
55
56 def get_values(self, rou: str) -> set[int]:
57 return self.data.get(rou, set())
58
59 def total_plane_count(self) -> int:
60 return sum(len(planes) for planes in self.data.values())
61
62# custom type to hold variables for TPStream, used later to sort & update db
63@dataclass
65 filename: str
66 stime: int
67 index: int
68
69 def __str__(self) -> str:
70 return f"Name: {self.filename}, Stime: {self.stime}, Index: {self.index}"
71
72def setup_configuration(path_str: str, sessions_file: str, verbose: bool) -> conffwk.Configuration:
73 """
74 Copies over relevant configuration files for a provided .data.xml file.
75 The default is the example-configs.data.xml, which contains the default sessions.
76 """
77 logging.info("Setting up configuration files")
78 logging.debug("Local path for configurations: %s", path_str)
79 path = Path(path_str).resolve() # Convert string to Path object
80 path.mkdir(parents=True, exist_ok=True)
81 external_logger = logging.getLogger('daqconf.consolidate')
82 if not verbose: external_logger.setLevel(logging.WARNING)
83 copy_configuration(path, [sessions_file])
84 cleanup()
85
86 logging.debug("Copying configuration represented by databases: %s", [sessions_file])
87
88 return conffwk.Configuration(f"oksconflibs:{path}/example-configs.data.xml")
89
90def get_tpreplay_app(cfg: conffwk.Configuration) -> Any:
91 """
92 Retrieves the instance of TPReplayApplication from configuration files.
93 If it does not exist, stops the script.
94 In theory, we could make one from scratch, however, there are so many objects to configure
95 that doing that externally is preferred.
96 """
97 tpreplay_apps = cfg.get_dals("TPReplayApplication")
98 if tpreplay_apps:
99 tpreplay_app = tpreplay_apps[0]
100 logging.debug("Loaded tpreplay application")
101 return tpreplay_app
102 else:
103 logging.critical("No 'TPReplayApplication' DAL objects found.")
104 sys.exit(1)
105
106def load_channel_map(channel_map_string: str) -> detchannelmaps._daq_detchannelmaps_py.TPCChannelMap:
107 """
108 Tries to create a channel map using the provided string name.
109 If it fails, prints the error and exits.
110 """
111 try:
112 channel_map = detchannelmaps.make_tpc_map(channel_map_string)
113 logging.debug(f"Channel map '{channel_map_string}' successfully created.")
114 return channel_map
115 except Exception as e:
116 logging.critical(f"Failed to create the channel map '{channel_map_string}'. Error: {str(e)}")
117 sys.exit(1)
118
119def get_tpstream_files(filename: str) -> list[str]:
120 """
121 Reads in names of tpstream files from provided text file.
122 """
123 logging.info("Reading TPStream file list from %s", filename)
124 with open(filename, "r") as file:
125 tpstream_files = [line.strip() for line in file.readlines()]
126 cleanup()
127
128 logging.info("Total files to process: %i", len(tpstream_files))
129 logging.debug("TPStream files loaded: %s", tpstream_files)
130
131 return tpstream_files
132
133def check_files(files: list[str]) -> None:
134 """
135 Very basic checks on the provided TPStream files.
136 """
137 for a_file in files:
138
140 path = Path(a_file)
141 if not path.exists():
142 logging.critical("File %s does not exist!", a_file)
143 sys.exit(1)
144 # check it's hdf5
145 if not path.suffix.lower() in ('.h5', '.hdf5'):
146 logging.critical("File %s does not seem to be hdf5 file!", a_file)
147 sys.exit(1)
148
149def extract_rous_and_planes(files: list[str], channel_map: 'detchannelmaps._daq_detchannelmaps_py.TPCChannelMap', planes_to_filter: set[int]) -> (list[TPStreamFile], ROUPlaneData):
150 """
151 This function goes over the provided TPStream files.
152 It extracts the readout units used to generate the data in the files.
153 It also extracts unfiltered planes for each readout unit.
154 """
155 logging.info("Extracting ROUs and planes from files")
156 all_tpstream_files = []
157 rou_plane_data = ROUPlaneData()
158
159 valid_subdetectors = {
160 int(detdataformats.DetID.Subdetector.kHD_TPC),
161 int(detdataformats.DetID.Subdetector.kVD_BottomTPC),
162 int(detdataformats.DetID.Subdetector.kVD_TopTPC),
163 int(detdataformats.DetID.Subdetector.kNDLAr_TPC)
164 }
165
166 for tpstream_file in files:
167 logging.debug("Processing file: %s", tpstream_file)
168 loaded_file = HDF5RawDataFile(tpstream_file)
169 # check is tpstream
170 if not loaded_file.is_timeslice_type:
171 logging.critical("File %s is not a TP Stream file!", tpstream_file)
172 sys.exit(1)
173 # check has records
174 all_record_ids = loaded_file.get_all_record_ids()
175 if not all_record_ids:
176 logging.critical("File %s does not have valid records!", tpstream_file)
177 sys.exit(1)
178
179 # loop over all records :/
180 first_record = True
181 for a_record in tqdm(all_record_ids, desc="Processing records"):
182
183 # check has source IDs
184 source_ids = loaded_file.get_source_ids_for_fragment_type(a_record, "Trigger_Primitive")
185 if len(source_ids) == 0:
186 logging.critical("File %s does not have valid SourceIDs!", tpstream_file)
187 sys.exit(1)
188 logging.debug("SIDs: %s", source_ids)
189
190 for i, sid in enumerate(source_ids):
191 frag = loaded_file.get_frag(a_record, sid)
192 # check frag has data
193 if frag.get_data_size() < 1:
194 logging.critical("File %s has an empty fragment!", tpstream_file)
195 sys.exit(1)
196 tp = trgdataformats.TriggerPrimitive(frag.get_data(0))
197
198 if first_record == True:
199 all_tpstream_files.append(TPStreamFile(tpstream_file, tp.time_start, 0))
200 logging.debug("First time start: %s", tp.time_start)
201 first_record = False
202
203 # check subdetector
204 subdet = tp.detid
205 if subdet not in valid_subdetectors:
206 logging.debug("Subdetector %s is not in the map of valid subdetectors!", detdataformats.DetID.subdetector_to_string( detdataformats.DetID.Subdetector( subdet ) ) )
207 continue
208
209 plane = channel_map.get_plane_from_offline_channel(tp.channel)
210
211 # Hack for APA1 :/
212 if plane in (1, 2):
213 rou = channel_map.get_element_name_from_offline_channel(tp.channel)
214 if rou == "APA_P02SU":
215 # Remap the plane
216 plane = 2 if plane == 1 else 1
217
218 if plane not in planes_to_filter:
219 rou = channel_map.get_element_name_from_offline_channel(tp.channel)
220 rou_plane_data.add_value(rou, plane)
221 logging.debug("Extracted rou: %s for plane: %s", rou, plane)
222 else:
223 logging.debug("Plane %s filtered", plane)
224 cleanup()
225 del frag, tp
226 cleanup()
227 del loaded_file, a_record, source_ids
228
229 # No need for an if verbose check here, as logging level is already set
230 logging.info("Extracted ROUs and planes: %s", rou_plane_data.data)
231
232 # Case when we didn't extract any valid/useful sourceIDs
233 if not rou_plane_data.data:
234 logging.critical("No valid data was extracted!")
235 sys.exit(1)
236
237 return all_tpstream_files, rou_plane_data
238
239def update_tpstream_indices(tpstream_files: list[TPStreamFile]) -> list[TPStreamFile]:
240 """
241 Sorts the files by the time of the very first TP.
242 The data will be sorted in TPRM, but this speeds it up.
243 """
244 logging.info("Sorting TPStream files by time and assigning indices")
245 sorted_files = sorted(tpstream_files, key=lambda x: x.stime)
246 for i, tp_file in enumerate(sorted_files):
247 tp_file.index = i + 1
248 cleanup()
249 logging.debug("Sorted TPSTreamFile objects: %s", sorted_files)
250 return sorted_files
251
252def update_tpstream_dal_objects(a_tp_stream: Any, cfg: conffwk.Configuration, sorted_tpstream_files: list[TPStreamFile]) -> list[Any]:
253 """
254 Creates TPStreamConf dal objects for the provided TP Stream files.
255 Additional safety to create these from scratch if an example instance is not found.
256 """
257 if not a_tp_stream:
258 logging.warning("No template TPStream object found")
259 # get template and create from scratch
260 a_tp_stream_template = cfg.create_obj('TPStreamConf', "template-TPStreamConf")
261 cache = {"TPStreamConf": {}}
262 a_tp_stream = a_tp_stream_template.as_dal(cache)
263 tp_streams = []
264 for a_file in sorted_tpstream_files:
265 temp_tp_stream = copy.deepcopy(a_tp_stream)
266 temp_tp_stream.id = f"def-tp-stream-{a_file.index}"
267 temp_tp_stream.filename = a_file.filename
268 temp_tp_stream.index = a_file.index
269 tp_streams.append(temp_tp_stream)
270 logging.debug("Created TPStream: %s", temp_tp_stream)
271 return tp_streams
272
273def update_sid_dal_objects(a_sid: Any, cfg: conffwk.Configuration, total_unique_planes: int) -> list[Any]:
274 """
275 Creates SourceIDConf dal objects needed for each unique plane.
276 Additional safety to create these from scratch if an example instance is not found.
277 """
278 if not a_sid:
279 logging.warning("No template Source ID object found")
280 # get template and create from scratch
281 a_sid_template = cfg.create_obj('SourceIDConf', "template-SourceIDConf")
282 cache = {"SourceIDConf": {}}
283 a_sid = a_sid_template.as_dal(cache)
284 all_sids = []
285 base_string = "tpreplay-tp-srcid-100000"
286 start_number = int(re.search(r'(\d+)$', base_string).group(1))
287 for i in range(1, total_unique_planes + 1):
288 temp_sid = copy.deepcopy(a_sid)
289 temp_sid.id = re.sub(r'\d+$', f"{start_number + i:06d}", base_string)
290 temp_sid.sid = i
291 temp_sid.subsystem = "Trigger"
292 all_sids.append(temp_sid)
293 logging.debug("Created SID config: %s", temp_sid)
294 return all_sids
295
296def update_RandomTCmaker_obj(cfg: conffwk.Configuration) -> Any:
297 """
298 Changes the trigger_rate_hz for RandomTCMakerConf to 0 by default for replay.
299 """
300 randomTCmakers = cfg.get_dals("RandomTCMakerConf")
301 if randomTCmakers:
302 randomTCmaker = randomTCmakers[0]
303 logging.debug("Loaded randomTCmaker object")
304 randomTCmaker.trigger_rate_hz = 0
305 logging.debug("Updated randomTCmaker object")
306 return randomTCmaker
307
308 else:
309 logging.error("No 'RandomTCMakerConf' DAL objects found.")
310 return None
311
312def update_configuration(cfg: conffwk.Configuration, tpreplay_app: Any, tprm_conf: Any, channel_map: str, sorted_tpstream_files: list[TPStreamFile], total_unique_planes: int, planes_to_filter: set[int], path_str: str, n_loops: int) -> None:
313 """
314 Takes all changes and updates the local database files.
315 [total_planes in TPRM
316 tp_streams in TPRM
317 planes in TPRM
318 tp_source_ids in tpreplay
319 ]
320 """
321 logging.info("Updating configuration with new TPStream data")
322 tprm_conf.total_planes = total_unique_planes
323 tprm_conf.number_of_loops = n_loops
324 tprm_conf.channel_map = channel_map
325
326 a_tp_stream = tprm_conf.tp_streams[0] if tprm_conf.tp_streams else None
327 tprm_conf.tp_streams = update_tpstream_dal_objects(a_tp_stream, cfg, sorted_tpstream_files)
328 logging.info("Total of %i TPStream configs created", len(tprm_conf.tp_streams))
329
330 if planes_to_filter:
331 tprm_conf.filter_out_plane = list(planes_to_filter)
332 logging.info("Total of %i planes to be filtered", len(tprm_conf.filter_out_plane))
333 else:
334 tprm_conf.filter_out_plane = []
335
336 a_sid = tpreplay_app.tp_source_ids[0] if tpreplay_app.tp_source_ids else None
337 tpreplay_app.tp_source_ids = update_sid_dal_objects(a_sid, cfg, total_unique_planes)
338 logging.info("Total of %i SID configs created", len(tpreplay_app.tp_source_ids))
339
340 randomTCmaker = update_RandomTCmaker_obj(cfg)
341
342 cleanup()
343 logging.debug("Committing updated configuration to database")
344
345 db_modules = conffwk.Configuration(f"oksconflibs:{path_str}/moduleconfs.data.xml")
346 db_trigger = conffwk.Configuration(f"oksconflibs:{path_str}/trigger-segment.data.xml")
347 for tpstream in tprm_conf.tp_streams:
348 db_modules.update_dal(tpstream)
349 for sid in tpreplay_app.tp_source_ids:
350 db_trigger.update_dal(sid)
351 db_trigger.update_dal(tpreplay_app)
352 db_modules.update_dal(tprm_conf)
353 if randomTCmaker is not None:
354 db_modules.update_dal(randomTCmaker)
355 db_modules.commit()
356 db_trigger.commit()
357 logging.info("Local database updated!")
358 cleanup()
359
360def main():
361 parser = argparse.ArgumentParser(description="To be used with TP Replay Application. Process TPStream data and update configuration.",
362 formatter_class=argparse.RawTextHelpFormatter)
363 parser.add_argument("--files", type=str, required=True, help="Text file with (full) paths to HDF5 TPStream file location. One per line.")
364 parser.add_argument("--filter-planes", type=int, nargs='*', choices=[0, 1, 2], default=[], help="list of planes to filter out. Can be empty or contain any combination of 0 (U), 1 (V), and 2 (X).")
365 parser.add_argument("--channel-map", type=str, default='PD2HDTPCChannelMap',
366 help="Specify the channel map to use. Available examples include: PD2HDTPCChannelMap, PD2VDBottomTPCChannelMap, VDColdboxTPCChannelMap, HDColdboxTPCChannelMap.\n"
367 "For more details, visit: https://github.com/DUNE-DAQ/detchannelmaps/blob/develop/docs/channel-maps-table.md")
368 parser.add_argument("--n-loops", type=int, default=-1, help="Number of times to loop over the provided data.")
369 parser.add_argument("--config", type=str, default="config/daqsystemtest/example-configs.data.xml", help="Path to OKS configuration file.")
370 parser.add_argument("--path", type=str, default="tpreplay-run", help="Path for local output for configuration files.")
371 parser.add_argument("--mem-limit", type=int, default=25, help="This will set a memory limit [GB] to protect the machine.")
372 parser.add_argument("--verbose", action="store_true", help="Enable verbose logging.")
373 args = parser.parse_args()
374
375 # Set up logging based on the verbose flag
376 setup_logging(args.verbose)
377
378 # Set memory limit
379 set_mem_limit(args.mem_limit)
380
381 logging.info("Starting TPStream processing script")
382 cfg = setup_configuration(args.path, args.config, args.verbose)
383 tpreplay_app = get_tpreplay_app(cfg)
384 logging.debug("TP Replay application configuration: %s", tpreplay_app)
385 tprm_conf = tpreplay_app.tprm_conf
386 logging.debug("TPRM configuration: %s", tprm_conf)
387 channel_map = load_channel_map(args.channel_map)
388 planes_to_filter = set(args.filter_planes)
389 logging.debug("Planes to filter: %s", planes_to_filter)
390
391 files = get_tpstream_files(args.files)
392 check_files(files)
393 all_tpstream_files, rou_plane_data = extract_rous_and_planes(files, channel_map, planes_to_filter)
394 sorted_tpstream_files = update_tpstream_indices(all_tpstream_files)
395
396 total_unique_planes = rou_plane_data.total_plane_count()
397 logging.info("Total plane count: %d", total_unique_planes)
398 update_configuration(cfg, tpreplay_app, tprm_conf, args.channel_map, sorted_tpstream_files, total_unique_planes, planes_to_filter, args.path, args.n_loops)
399
400if __name__ == "__main__":
401 main()
402
set[int] get_values(self, str rou)
Definition __main__.py:56
None add_value(self, str rou, int plane)
Definition __main__.py:53
None check_files(list[str] files)
Definition __main__.py:133
Any get_tpreplay_app(conffwk.Configuration cfg)
Definition __main__.py:90
(list[TPStreamFile], ROUPlaneData) extract_rous_and_planes(list[str] files, 'detchannelmaps._daq_detchannelmaps_py.TPCChannelMap' channel_map, set[int] planes_to_filter)
Definition __main__.py:149
None setup_logging(bool verbose)
Definition __main__.py:23
list[str] get_tpstream_files(str filename)
Definition __main__.py:119
list[TPStreamFile] update_tpstream_indices(list[TPStreamFile] tpstream_files)
Definition __main__.py:239
None set_mem_limit(int mem_limit)
Definition __main__.py:35
list[Any] update_sid_dal_objects(Any a_sid, conffwk.Configuration cfg, int total_unique_planes)
Definition __main__.py:273
Any update_RandomTCmaker_obj(conffwk.Configuration cfg)
Definition __main__.py:296
None update_configuration(conffwk.Configuration cfg, Any tpreplay_app, Any tprm_conf, str channel_map, list[TPStreamFile] sorted_tpstream_files, int total_unique_planes, set[int] planes_to_filter, str path_str, int n_loops)
Definition __main__.py:312
detchannelmaps._daq_detchannelmaps_py.TPCChannelMap load_channel_map(str channel_map_string)
Definition __main__.py:106
list[Any] update_tpstream_dal_objects(Any a_tp_stream, conffwk.Configuration cfg, list[TPStreamFile] sorted_tpstream_files)
Definition __main__.py:252
conffwk.Configuration setup_configuration(str path_str, str sessions_file, bool verbose)
Definition __main__.py:72