DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
__main__.py
Go to the documentation of this file.
1import argparse
2import copy
3import gc
4import logging
5import re
6import resource
7import sys
8from collections import defaultdict
9from dataclasses import dataclass, field
10from pathlib import Path
11from tqdm import tqdm
12from typing import Any
13
14import conffwk
15import daqdataformats
16import detchannelmaps
17import detdataformats
18import trgdataformats
19from daqconf.consolidate import copy_configuration
20from hdf5libs import HDF5RawDataFile
21
22
23def setup_logging(verbose: bool) -> None:
24 """
25 Set up logging based on the verbose flag.
26 If verbose flag is provided, set the logging level to DEBUG to show all logs.
27 Otherwise, set it to INFO to only show INFO level logs and above.
28 """
29 if verbose:
30 logging.basicConfig(level=logging.DEBUG, format="%(asctime)s - %(levelname)s - %(message)s")
31 logging.debug("Verbose logging configured!")
32 else:
33 logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
34
35def set_mem_limit(mem_limit: int) -> None:
36 """
37 As a safety measure we set a memory limit for the process.
38 Should not be needed here as hdf5 processing is minimal.
39 """
40 GB = 1024**3
41 memory_limit = mem_limit * GB
42 resource.setrlimit(resource.RLIMIT_AS, (memory_limit, memory_limit))
43 logging.debug("Setting memory limit to %i GBs", memory_limit/GB)
44
45def cleanup() -> None:
46 gc.collect()
47
48# custom type to hold a map of all (unique) ReadoutUnits and the corresponding planes for each
49@dataclass
51 data: dict[str, set[int]] = field(default_factory=lambda: defaultdict(set))
52
53 def add_value(self, rou: str, plane: int) -> None:
54 self.data[rou].add(plane)
55
56 def get_values(self, rou: str) -> set[int]:
57 return self.data.get(rou, set())
58
59 def total_plane_count(self) -> int:
60 return sum(len(planes) for planes in self.data.values())
61
62# custom type to hold variables for TPStream, used later to sort & update db
63@dataclass
65 filename: str
66 stime: int
67 index: int
68
69 def __str__(self) -> str:
70 return f"Name: {self.filename}, Stime: {self.stime}, Index: {self.index}"
71
72def setup_configuration(path_str: str, sessions_file: str, verbose: bool) -> conffwk.Configuration:
73 """
74 Copies over relevant configuration files for a provided .data.xml file.
75 The default is the example-configs.data.xml, which contains the default sessions.
76 """
77 logging.info("Setting up configuration files")
78 logging.debug("Local path for configurations: %s", path_str)
79 path = Path(path_str).resolve() # Convert string to Path object
80 path.mkdir(parents=True, exist_ok=True)
81 external_logger = logging.getLogger('daqconf.consolidate')
82 if not verbose: external_logger.setLevel(logging.WARNING)
83 copy_configuration(path, [sessions_file])
84 cleanup()
85
86 logging.debug("Copying configuration represented by databases: %s", [sessions_file])
87
88 return conffwk.Configuration(f"oksconflibs:{path}/example-configs.data.xml")
89
90def get_tpreplay_app(cfg: conffwk.Configuration) -> Any:
91 """
92 Retrieves the instance of TPReplayApplication from configuration files.
93 If it does not exist, stops the script.
94 In theory, we could make one from scratch, however, there are so many objects to configure
95 that doing that externally is preferred.
96 """
97 tpreplay_apps = cfg.get_dals("TPReplayApplication")
98 if tpreplay_apps:
99 tpreplay_app = tpreplay_apps[0]
100 logging.debug("Loaded tpreplay application")
101 return tpreplay_app
102 else:
103 logging.critical("No 'TPReplayApplication' DAL objects found.")
104 sys.exit(1)
105
106def load_channel_map(channel_map_string: str) -> detchannelmaps._daq_detchannelmaps_py.TPCChannelMap:
107 """
108 Tries to create a channel map using the provided string name.
109 If it fails, prints the error and exits.
110 """
111 try:
112 channel_map = detchannelmaps.make_tpc_map(channel_map_string)
113 logging.debug(f"Channel map '{channel_map_string}' successfully created.")
114 return channel_map
115 except Exception as e:
116 logging.critical(f"Failed to create the channel map '{channel_map_string}'. Error: {str(e)}")
117 sys.exit(1)
118
119def get_tpstream_files(filename: str) -> list[str]:
120 """
121 Reads in names of tpstream files from provided text file.
122 """
123 logging.info("Reading TPStream file list from %s", filename)
124 with open(filename, "r") as file:
125 tpstream_files = [line.strip() for line in file.readlines()]
126 cleanup()
127
128 logging.info("Total files to process: %i", len(tpstream_files))
129 logging.debug("TPStream files loaded: %s", tpstream_files)
130
131 return tpstream_files
132
133def check_files(files: list[str]) -> None:
134 """
135 Very basic checks on the provided TPStream files.
136 """
137 for a_file in files:
138
140 path = Path(a_file)
141 if not path.exists():
142 logging.critical("File %s does not exist!", a_file)
143 sys.exit(1)
144 # check it's hdf5
145 if not path.suffix.lower() in ('.h5', '.hdf5'):
146 logging.critical("File %s does not seem to be hdf5 file!", a_file)
147 sys.exit(1)
148
149def extract_rous_and_planes(files: list[str], channel_map: 'detchannelmaps._daq_detchannelmaps_py.TPCChannelMap', planes_to_filter: set[int]) -> (list[TPStreamFile], ROUPlaneData):
150 """
151 This function goes over the provided TPStream files.
152 It extracts the readout units used to generate the data in the files.
153 It also extracts unfiltered planes for each readout unit.
154 """
155 logging.info("Extracting ROUs and planes from files")
156 all_tpstream_files = []
157 rou_plane_data = ROUPlaneData()
158
159 valid_subdetectors = {
160 int(detdataformats.DetID.Subdetector.kHD_TPC),
161 int(detdataformats.DetID.Subdetector.kVD_BottomTPC),
162 int(detdataformats.DetID.Subdetector.kVD_TopTPC),
163 int(detdataformats.DetID.Subdetector.kNDLAr_TPC)
164 }
165
166 for tpstream_file in files:
167 logging.debug("Processing file: %s", tpstream_file)
168 loaded_file = HDF5RawDataFile(tpstream_file)
169 # check is tpstream
170 if not loaded_file.is_timeslice_type:
171 logging.critical("File %s is not a TP Stream file!", tpstream_file)
172 sys.exit(1)
173 # check has records
174 all_record_ids = loaded_file.get_all_record_ids()
175 if not all_record_ids:
176 logging.critical("File %s does not have valid records!", tpstream_file)
177 sys.exit(1)
178
179 # loop over all records :/
180 first_record = True
181 for a_record in tqdm(all_record_ids, desc="Processing records"):
182
183 # check has source IDs
184 source_ids = loaded_file.get_source_ids_for_fragment_type(a_record, "Trigger_Primitive")
185 if len(source_ids) == 0:
186 logging.critical("File %s does not have valid SourceIDs!", tpstream_file)
187 sys.exit(1)
188 logging.debug("SIDs: %s", source_ids)
189
190 for i, sid in enumerate(source_ids):
191 frag = loaded_file.get_frag(a_record, sid)
192 # check frag has data
193 if frag.get_data_size() < 1:
194 logging.critical("File %s has an empty fragment!", tpstream_file)
195 sys.exit(1)
196 tp = trgdataformats.TriggerPrimitive(frag.get_data(0))
197
198 if first_record == True:
199 all_tpstream_files.append(TPStreamFile(tpstream_file, tp.time_start, 0))
200 logging.debug("First time start: %s", tp.time_start)
201 first_record = False
202
203 # check subdetector
204 subdet = tp.detid
205 if subdet not in valid_subdetectors:
206 logging.debug("Subdetector %s is not in the map of valid subdetectors!", detdataformats.DetID.subdetector_to_string( detdataformats.DetID.Subdetector( subdet ) ) )
207 continue
208
209 plane = channel_map.get_plane_from_offline_channel(tp.channel)
210 if plane not in planes_to_filter:
211 rou = channel_map.get_element_name_from_offline_channel(tp.channel)
212 rou_plane_data.add_value(rou, plane)
213 logging.debug("Extracted rou: %s for plane: %s", rou, plane)
214 else:
215 logging.debug("Plane %s filtered", plane)
216 cleanup()
217 del frag, tp
218 cleanup()
219 del loaded_file, a_record, source_ids
220
221 # No need for an if verbose check here, as logging level is already set
222 logging.info("Extracted ROUs and planes: %s", rou_plane_data.data)
223
224 # Case when we didn't extract any valid/useful sourceIDs
225 if not rou_plane_data.data:
226 logging.critical("No valid data was extracted!")
227 sys.exit(1)
228
229 return all_tpstream_files, rou_plane_data
230
231def update_tpstream_indices(tpstream_files: list[TPStreamFile]) -> list[TPStreamFile]:
232 """
233 Sorts the files by the time of the very first TP.
234 The data will be sorted in TPRM, but this speeds it up.
235 """
236 logging.info("Sorting TPStream files by time and assigning indices")
237 sorted_files = sorted(tpstream_files, key=lambda x: x.stime)
238 for i, tp_file in enumerate(sorted_files):
239 tp_file.index = i + 1
240 cleanup()
241 logging.debug("Sorted TPSTreamFile objects: %s", sorted_files)
242 return sorted_files
243
244def update_tpstream_dal_objects(a_tp_stream: Any, cfg: conffwk.Configuration, sorted_tpstream_files: list[TPStreamFile]) -> list[Any]:
245 """
246 Creates TPStreamConf dal objects for the provided TP Stream files.
247 Additional safety to create these from scratch if an example instance is not found.
248 """
249 if not a_tp_stream:
250 logging.warning("No template TPStream object found")
251 # get template and create from scratch
252 a_tp_stream_template = cfg.create_obj('TPStreamConf', "template-TPStreamConf")
253 cache = {"TPStreamConf": {}}
254 a_tp_stream = a_tp_stream_template.as_dal(cache)
255 tp_streams = []
256 for a_file in sorted_tpstream_files:
257 temp_tp_stream = copy.deepcopy(a_tp_stream)
258 temp_tp_stream.id = f"def-tp-stream-{a_file.index}"
259 temp_tp_stream.filename = a_file.filename
260 temp_tp_stream.index = a_file.index
261 tp_streams.append(temp_tp_stream)
262 logging.debug("Created TPStream: %s", temp_tp_stream)
263 return tp_streams
264
265def update_sid_dal_objects(a_sid: Any, cfg: conffwk.Configuration, total_unique_planes: int) -> list[Any]:
266 """
267 Creates SourceIDConf dal objects needed for each unique plane.
268 Additional safety to create these from scratch if an example instance is not found.
269 """
270 if not a_sid:
271 logging.warning("No template Source ID object found")
272 # get template and create from scratch
273 a_sid_template = cfg.create_obj('SourceIDConf', "template-SourceIDConf")
274 cache = {"SourceIDConf": {}}
275 a_sid = a_sid_template.as_dal(cache)
276 all_sids = []
277 base_string = "tpreplay-tp-srcid-100000"
278 start_number = int(re.search(r'(\d+)$', base_string).group(1))
279 for i in range(1, total_unique_planes + 1):
280 temp_sid = copy.deepcopy(a_sid)
281 temp_sid.id = re.sub(r'\d+$', f"{start_number + i:06d}", base_string)
282 temp_sid.sid = i
283 temp_sid.subsystem = "Trigger"
284 all_sids.append(temp_sid)
285 logging.debug("Created SID config: %s", temp_sid)
286 return all_sids
287
288def update_RandomTCmaker_obj(cfg: conffwk.Configuration) -> Any:
289 """
290 Changes the trigger_rate_hz for RandomTCMakerConf to 0 by default for replay.
291 """
292 randomTCmakers = cfg.get_dals("RandomTCMakerConf")
293 if randomTCmakers:
294 randomTCmaker = randomTCmakers[0]
295 logging.debug("Loaded randomTCmaker object")
296 randomTCmaker.trigger_rate_hz = 0
297 logging.debug("Updated randomTCmaker object")
298 return randomTCmaker
299
300 else:
301 logging.error("No 'RandomTCMakerConf' DAL objects found.")
302 return None
303
304def update_configuration(cfg: conffwk.Configuration, tpreplay_app: Any, tprm_conf: Any, channel_map: str, sorted_tpstream_files: list[TPStreamFile], total_unique_planes: int, planes_to_filter: set[int], path_str: str, n_loops: int) -> None:
305 """
306 Takes all changes and updates the local database files.
307 [total_planes in TPRM
308 tp_streams in TPRM
309 planes in TPRM
310 tp_source_ids in tpreplay
311 ]
312 """
313 logging.info("Updating configuration with new TPStream data")
314 tprm_conf.total_planes = total_unique_planes
315 tprm_conf.number_of_loops = n_loops
316 tprm_conf.channel_map = channel_map
317
318 a_tp_stream = tprm_conf.tp_streams[0] if tprm_conf.tp_streams else None
319 tprm_conf.tp_streams = update_tpstream_dal_objects(a_tp_stream, cfg, sorted_tpstream_files)
320 logging.info("Total of %i TPStream configs created", len(tprm_conf.tp_streams))
321
322 if planes_to_filter:
323 tprm_conf.filter_out_plane = list(planes_to_filter)
324 logging.info("Total of %i planes to be filtered", len(tprm_conf.filter_out_plane))
325 else:
326 tprm_conf.filter_out_plane = []
327
328 a_sid = tpreplay_app.tp_source_ids[0] if tpreplay_app.tp_source_ids else None
329 tpreplay_app.tp_source_ids = update_sid_dal_objects(a_sid, cfg, total_unique_planes)
330 logging.info("Total of %i SID configs created", len(tpreplay_app.tp_source_ids))
331
332 randomTCmaker = update_RandomTCmaker_obj(cfg)
333
334 cleanup()
335 logging.debug("Committing updated configuration to database")
336
337 db_modules = conffwk.Configuration(f"oksconflibs:{path_str}/moduleconfs.data.xml")
338 db_trigger = conffwk.Configuration(f"oksconflibs:{path_str}/trigger-segment.data.xml")
339 for tpstream in tprm_conf.tp_streams:
340 db_modules.update_dal(tpstream)
341 for sid in tpreplay_app.tp_source_ids:
342 db_trigger.update_dal(sid)
343 db_trigger.update_dal(tpreplay_app)
344 db_modules.update_dal(tprm_conf)
345 if randomTCmaker is not None:
346 db_modules.update_dal(randomTCmaker)
347 db_modules.commit()
348 db_trigger.commit()
349 logging.info("Local database updated!")
350 cleanup()
351
352def main():
353 parser = argparse.ArgumentParser(description="To be used with TP Replay Application. Process TPStream data and update configuration.",
354 formatter_class=argparse.RawTextHelpFormatter)
355 parser.add_argument("--files", type=str, required=True, help="Text file with (full) paths to HDF5 TPStream file location. One per line.")
356 parser.add_argument("--filter-planes", type=int, nargs='*', choices=[0, 1, 2], default=[], help="list of planes to filter out. Can be empty or contain any combination of 0 (U), 1 (V), and 2 (X).")
357 parser.add_argument("--channel-map", type=str, default='PD2HDTPCChannelMap',
358 help="Specify the channel map to use. Available examples include: PD2HDTPCChannelMap, PD2VDBottomTPCChannelMap, VDColdboxTPCChannelMap, HDColdboxTPCChannelMap.\n"
359 "For more details, visit: https://github.com/DUNE-DAQ/detchannelmaps/blob/develop/docs/channel-maps-table.md")
360 parser.add_argument("--n-loops", type=int, default=-1, help="Number of times to loop over the provided data.")
361 parser.add_argument("--config", type=str, default="config/daqsystemtest/example-configs.data.xml", help="Path to OKS configuration file.")
362 parser.add_argument("--path", type=str, default="tpreplay-run", help="Path for local output for configuration files.")
363 parser.add_argument("--mem-limit", type=int, default=25, help="This will set a memory limit [GB] to protect the machine.")
364 parser.add_argument("--verbose", action="store_true", help="Enable verbose logging.")
365 args = parser.parse_args()
366
367 # Set up logging based on the verbose flag
368 setup_logging(args.verbose)
369
370 # Set memory limit
371 set_mem_limit(args.mem_limit)
372
373 logging.info("Starting TPStream processing script")
374 cfg = setup_configuration(args.path, args.config, args.verbose)
375 tpreplay_app = get_tpreplay_app(cfg)
376 logging.debug("TP Replay application configuration: %s", tpreplay_app)
377 tprm_conf = tpreplay_app.tprm_conf
378 logging.debug("TPRM configuration: %s", tprm_conf)
379 channel_map = load_channel_map(args.channel_map)
380 planes_to_filter = set(args.filter_planes)
381 logging.debug("Planes to filter: %s", planes_to_filter)
382
383 files = get_tpstream_files(args.files)
384 check_files(files)
385 all_tpstream_files, rou_plane_data = extract_rous_and_planes(files, channel_map, planes_to_filter)
386 sorted_tpstream_files = update_tpstream_indices(all_tpstream_files)
387
388 total_unique_planes = rou_plane_data.total_plane_count()
389 logging.info("Total plane count: %d", total_unique_planes)
390 update_configuration(cfg, tpreplay_app, tprm_conf, args.channel_map, sorted_tpstream_files, total_unique_planes, planes_to_filter, args.path, args.n_loops)
391
392if __name__ == "__main__":
393 main()
394
set[int] get_values(self, str rou)
Definition __main__.py:56
None add_value(self, str rou, int plane)
Definition __main__.py:53
None check_files(list[str] files)
Definition __main__.py:133
Any get_tpreplay_app(conffwk.Configuration cfg)
Definition __main__.py:90
(list[TPStreamFile], ROUPlaneData) extract_rous_and_planes(list[str] files, 'detchannelmaps._daq_detchannelmaps_py.TPCChannelMap' channel_map, set[int] planes_to_filter)
Definition __main__.py:149
None setup_logging(bool verbose)
Definition __main__.py:23
list[str] get_tpstream_files(str filename)
Definition __main__.py:119
list[TPStreamFile] update_tpstream_indices(list[TPStreamFile] tpstream_files)
Definition __main__.py:231
None set_mem_limit(int mem_limit)
Definition __main__.py:35
list[Any] update_sid_dal_objects(Any a_sid, conffwk.Configuration cfg, int total_unique_planes)
Definition __main__.py:265
Any update_RandomTCmaker_obj(conffwk.Configuration cfg)
Definition __main__.py:288
None update_configuration(conffwk.Configuration cfg, Any tpreplay_app, Any tprm_conf, str channel_map, list[TPStreamFile] sorted_tpstream_files, int total_unique_planes, set[int] planes_to_filter, str path_str, int n_loops)
Definition __main__.py:304
detchannelmaps._daq_detchannelmaps_py.TPCChannelMap load_channel_map(str channel_map_string)
Definition __main__.py:106
list[Any] update_tpstream_dal_objects(Any a_tp_stream, conffwk.Configuration cfg, list[TPStreamFile] sorted_tpstream_files)
Definition __main__.py:244
conffwk.Configuration setup_configuration(str path_str, str sessions_file, bool verbose)
Definition __main__.py:72