DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
tpreplay_application.__main__ Namespace Reference

Classes

class  ROUPlaneData
 
class  TPStreamFile
 

Functions

None setup_logging (bool verbose)
 
None set_mem_limit (int mem_limit)
 
None cleanup ()
 
conffwk.Configuration setup_configuration (str path_str, str sessions_file, bool verbose)
 
Any get_tpreplay_app (conffwk.Configuration cfg)
 
detchannelmaps._daq_detchannelmaps_py.TPCChannelMap load_channel_map (str channel_map_string)
 
list[str] get_tpstream_files (str filename)
 
None check_files (list[str] files)
 
(list[TPStreamFile], ROUPlaneDataextract_rous_and_planes (list[str] files, 'detchannelmaps._daq_detchannelmaps_py.TPCChannelMap' channel_map, set[int] planes_to_filter)
 
list[TPStreamFileupdate_tpstream_indices (list[TPStreamFile] tpstream_files)
 
list[Any] update_tpstream_dal_objects (Any a_tp_stream, conffwk.Configuration cfg, list[TPStreamFile] sorted_tpstream_files)
 
list[Any] update_sid_dal_objects (Any a_sid, conffwk.Configuration cfg, int total_unique_planes)
 
Any update_RandomTCmaker_obj (conffwk.Configuration cfg)
 
None update_configuration (conffwk.Configuration cfg, Any tpreplay_app, Any tprm_conf, str channel_map, list[TPStreamFile] sorted_tpstream_files, int total_unique_planes, set[int] planes_to_filter, str path_str, int n_loops)
 
 main ()
 

Function Documentation

◆ check_files()

None tpreplay_application.__main__.check_files ( list[str] files)
Very basic checks on the provided TPStream files.

Definition at line 133 of file __main__.py.

133def check_files(files: list[str]) -> None:
134 """
135 Very basic checks on the provided TPStream files.
136 """
137 for a_file in files:
138
140 path = Path(a_file)
141 if not path.exists():
142 logging.critical("File %s does not exist!", a_file)
143 sys.exit(1)
144 # check it's hdf5
145 if not path.suffix.lower() in ('.h5', '.hdf5'):
146 logging.critical("File %s does not seem to be hdf5 file!", a_file)
147 sys.exit(1)
148

◆ cleanup()

None tpreplay_application.__main__.cleanup ( )

Definition at line 45 of file __main__.py.

45def cleanup() -> None:
46 gc.collect()
47
48# custom type to hold a map of all (unique) ReadoutUnits and the corresponding planes for each
49@dataclass

◆ extract_rous_and_planes()

(list[TPStreamFile], ROUPlaneData) tpreplay_application.__main__.extract_rous_and_planes ( list[str] files,
'detchannelmaps._daq_detchannelmaps_py.TPCChannelMap' channel_map,
set[int] planes_to_filter )
This function goes over the provided TPStream files.
It extracts the readout units used to generate the data in the files.
It also extracts unfiltered planes for each readout unit.

Definition at line 149 of file __main__.py.

149def extract_rous_and_planes(files: list[str], channel_map: 'detchannelmaps._daq_detchannelmaps_py.TPCChannelMap', planes_to_filter: set[int]) -> (list[TPStreamFile], ROUPlaneData):
150 """
151 This function goes over the provided TPStream files.
152 It extracts the readout units used to generate the data in the files.
153 It also extracts unfiltered planes for each readout unit.
154 """
155 logging.info("Extracting ROUs and planes from files")
156 all_tpstream_files = []
157 rou_plane_data = ROUPlaneData()
158
159 valid_subdetectors = {
160 int(detdataformats.DetID.Subdetector.kHD_TPC),
161 int(detdataformats.DetID.Subdetector.kVD_BottomTPC),
162 int(detdataformats.DetID.Subdetector.kVD_TopTPC),
163 int(detdataformats.DetID.Subdetector.kNDLAr_TPC)
164 }
165
166 for tpstream_file in files:
167 logging.debug("Processing file: %s", tpstream_file)
168 loaded_file = HDF5RawDataFile(tpstream_file)
169 # check is tpstream
170 if not loaded_file.is_timeslice_type:
171 logging.critical("File %s is not a TP Stream file!", tpstream_file)
172 sys.exit(1)
173 # check has records
174 all_record_ids = loaded_file.get_all_record_ids()
175 if not all_record_ids:
176 logging.critical("File %s does not have valid records!", tpstream_file)
177 sys.exit(1)
178
179 # loop over all records :/
180 first_record = True
181 for a_record in tqdm(all_record_ids, desc="Processing records"):
182
183 # check has source IDs
184 source_ids = loaded_file.get_source_ids_for_fragment_type(a_record, "Trigger_Primitive")
185 if len(source_ids) == 0:
186 logging.critical("File %s does not have valid SourceIDs!", tpstream_file)
187 sys.exit(1)
188 logging.debug("SIDs: %s", source_ids)
189
190 for i, sid in enumerate(source_ids):
191 frag = loaded_file.get_frag(a_record, sid)
192 # check frag has data
193 if frag.get_data_size() < 1:
194 logging.critical("File %s has an empty fragment!", tpstream_file)
195 sys.exit(1)
196 tp = trgdataformats.TriggerPrimitive(frag.get_data(0))
197
198 if first_record == True:
199 all_tpstream_files.append(TPStreamFile(tpstream_file, tp.time_start, 0))
200 logging.debug("First time start: %s", tp.time_start)
201 first_record = False
202
203 # check subdetector
204 subdet = tp.detid
205 if subdet not in valid_subdetectors:
206 logging.debug("Subdetector %s is not in the map of valid subdetectors!", detdataformats.DetID.subdetector_to_string( detdataformats.DetID.Subdetector( subdet ) ) )
207 continue
208
209 plane = channel_map.get_plane_from_offline_channel(tp.channel)
210
211 # Hack for APA1 :/
212 if plane in (1, 2):
213 rou = channel_map.get_element_name_from_offline_channel(tp.channel)
214 if rou == "APA_P02SU":
215 # Remap the plane
216 plane = 2 if plane == 1 else 1
217
218 if plane not in planes_to_filter:
219 rou = channel_map.get_element_name_from_offline_channel(tp.channel)
220 rou_plane_data.add_value(rou, plane)
221 logging.debug("Extracted rou: %s for plane: %s", rou, plane)
222 else:
223 logging.debug("Plane %s filtered", plane)
224 cleanup()
225 del frag, tp
226 cleanup()
227 del loaded_file, a_record, source_ids
228
229 # No need for an if verbose check here, as logging level is already set
230 logging.info("Extracted ROUs and planes: %s", rou_plane_data.data)
231
232 # Case when we didn't extract any valid/useful sourceIDs
233 if not rou_plane_data.data:
234 logging.critical("No valid data was extracted!")
235 sys.exit(1)
236
237 return all_tpstream_files, rou_plane_data
238

◆ get_tpreplay_app()

Any tpreplay_application.__main__.get_tpreplay_app ( conffwk.Configuration cfg)
Retrieves the instance of TPReplayApplication from configuration files.
If it does not exist, stops the script.
In theory, we could make one from scratch, however, there are so many objects to configure
that doing that externally is preferred.

Definition at line 90 of file __main__.py.

90def get_tpreplay_app(cfg: conffwk.Configuration) -> Any:
91 """
92 Retrieves the instance of TPReplayApplication from configuration files.
93 If it does not exist, stops the script.
94 In theory, we could make one from scratch, however, there are so many objects to configure
95 that doing that externally is preferred.
96 """
97 tpreplay_apps = cfg.get_dals("TPReplayApplication")
98 if tpreplay_apps:
99 tpreplay_app = tpreplay_apps[0]
100 logging.debug("Loaded tpreplay application")
101 return tpreplay_app
102 else:
103 logging.critical("No 'TPReplayApplication' DAL objects found.")
104 sys.exit(1)
105

◆ get_tpstream_files()

list[str] tpreplay_application.__main__.get_tpstream_files ( str filename)
Reads in names of tpstream files from provided text file.

Definition at line 119 of file __main__.py.

119def get_tpstream_files(filename: str) -> list[str]:
120 """
121 Reads in names of tpstream files from provided text file.
122 """
123 logging.info("Reading TPStream file list from %s", filename)
124 with open(filename, "r") as file:
125 tpstream_files = [line.strip() for line in file.readlines()]
126 cleanup()
127
128 logging.info("Total files to process: %i", len(tpstream_files))
129 logging.debug("TPStream files loaded: %s", tpstream_files)
130
131 return tpstream_files
132

◆ load_channel_map()

detchannelmaps._daq_detchannelmaps_py.TPCChannelMap tpreplay_application.__main__.load_channel_map ( str channel_map_string)
Tries to create a channel map using the provided string name.
If it fails, prints the error and exits.

Definition at line 106 of file __main__.py.

106def load_channel_map(channel_map_string: str) -> detchannelmaps._daq_detchannelmaps_py.TPCChannelMap:
107 """
108 Tries to create a channel map using the provided string name.
109 If it fails, prints the error and exits.
110 """
111 try:
112 channel_map = detchannelmaps.make_tpc_map(channel_map_string)
113 logging.debug(f"Channel map '{channel_map_string}' successfully created.")
114 return channel_map
115 except Exception as e:
116 logging.critical(f"Failed to create the channel map '{channel_map_string}'. Error: {str(e)}")
117 sys.exit(1)
118

◆ main()

tpreplay_application.__main__.main ( )

Definition at line 360 of file __main__.py.

360def main():
361 parser = argparse.ArgumentParser(description="To be used with TP Replay Application. Process TPStream data and update configuration.",
362 formatter_class=argparse.RawTextHelpFormatter)
363 parser.add_argument("--files", type=str, required=True, help="Text file with (full) paths to HDF5 TPStream file location. One per line.")
364 parser.add_argument("--filter-planes", type=int, nargs='*', choices=[0, 1, 2], default=[], help="list of planes to filter out. Can be empty or contain any combination of 0 (U), 1 (V), and 2 (X).")
365 parser.add_argument("--channel-map", type=str, default='PD2HDTPCChannelMap',
366 help="Specify the channel map to use. Available examples include: PD2HDTPCChannelMap, PD2VDBottomTPCChannelMap, VDColdboxTPCChannelMap, HDColdboxTPCChannelMap.\n"
367 "For more details, visit: https://github.com/DUNE-DAQ/detchannelmaps/blob/develop/docs/channel-maps-table.md")
368 parser.add_argument("--n-loops", type=int, default=-1, help="Number of times to loop over the provided data.")
369 parser.add_argument("--config", type=str, default="config/daqsystemtest/example-configs.data.xml", help="Path to OKS configuration file.")
370 parser.add_argument("--path", type=str, default="tpreplay-run", help="Path for local output for configuration files.")
371 parser.add_argument("--mem-limit", type=int, default=25, help="This will set a memory limit [GB] to protect the machine.")
372 parser.add_argument("--verbose", action="store_true", help="Enable verbose logging.")
373 args = parser.parse_args()
374
375 # Set up logging based on the verbose flag
376 setup_logging(args.verbose)
377
378 # Set memory limit
379 set_mem_limit(args.mem_limit)
380
381 logging.info("Starting TPStream processing script")
382 cfg = setup_configuration(args.path, args.config, args.verbose)
383 tpreplay_app = get_tpreplay_app(cfg)
384 logging.debug("TP Replay application configuration: %s", tpreplay_app)
385 tprm_conf = tpreplay_app.tprm_conf
386 logging.debug("TPRM configuration: %s", tprm_conf)
387 channel_map = load_channel_map(args.channel_map)
388 planes_to_filter = set(args.filter_planes)
389 logging.debug("Planes to filter: %s", planes_to_filter)
390
391 files = get_tpstream_files(args.files)
392 check_files(files)
393 all_tpstream_files, rou_plane_data = extract_rous_and_planes(files, channel_map, planes_to_filter)
394 sorted_tpstream_files = update_tpstream_indices(all_tpstream_files)
395
396 total_unique_planes = rou_plane_data.total_plane_count()
397 logging.info("Total plane count: %d", total_unique_planes)
398 update_configuration(cfg, tpreplay_app, tprm_conf, args.channel_map, sorted_tpstream_files, total_unique_planes, planes_to_filter, args.path, args.n_loops)
399
int main(int argc, char **argv)

◆ set_mem_limit()

None tpreplay_application.__main__.set_mem_limit ( int mem_limit)
As a safety measure we set a memory limit for the process.
Should not be needed here as hdf5 processing is minimal.

Definition at line 35 of file __main__.py.

35def set_mem_limit(mem_limit: int) -> None:
36 """
37 As a safety measure we set a memory limit for the process.
38 Should not be needed here as hdf5 processing is minimal.
39 """
40 GB = 1024**3
41 memory_limit = mem_limit * GB
42 resource.setrlimit(resource.RLIMIT_AS, (memory_limit, memory_limit))
43 logging.debug("Setting memory limit to %i GBs", memory_limit/GB)
44

◆ setup_configuration()

conffwk.Configuration tpreplay_application.__main__.setup_configuration ( str path_str,
str sessions_file,
bool verbose )
Copies over relevant configuration files for a provided .data.xml file.
The default is the example-configs.data.xml, which contains the default sessions.

Definition at line 72 of file __main__.py.

72def setup_configuration(path_str: str, sessions_file: str, verbose: bool) -> conffwk.Configuration:
73 """
74 Copies over relevant configuration files for a provided .data.xml file.
75 The default is the example-configs.data.xml, which contains the default sessions.
76 """
77 logging.info("Setting up configuration files")
78 logging.debug("Local path for configurations: %s", path_str)
79 path = Path(path_str).resolve() # Convert string to Path object
80 path.mkdir(parents=True, exist_ok=True)
81 external_logger = logging.getLogger('daqconf.consolidate')
82 if not verbose: external_logger.setLevel(logging.WARNING)
83 copy_configuration(path, [sessions_file])
84 cleanup()
85
86 logging.debug("Copying configuration represented by databases: %s", [sessions_file])
87
88 return conffwk.Configuration(f"oksconflibs:{path}/example-configs.data.xml")
89

◆ setup_logging()

None tpreplay_application.__main__.setup_logging ( bool verbose)
Set up logging based on the verbose flag.
If verbose flag is provided, set the logging level to DEBUG to show all logs.
Otherwise, set it to INFO to only show INFO level logs and above.

Definition at line 23 of file __main__.py.

23def setup_logging(verbose: bool) -> None:
24 """
25 Set up logging based on the verbose flag.
26 If verbose flag is provided, set the logging level to DEBUG to show all logs.
27 Otherwise, set it to INFO to only show INFO level logs and above.
28 """
29 if verbose:
30 logging.basicConfig(level=logging.DEBUG, format="%(asctime)s - %(levelname)s - %(message)s")
31 logging.debug("Verbose logging configured!")
32 else:
33 logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
34

◆ update_configuration()

None tpreplay_application.__main__.update_configuration ( conffwk.Configuration cfg,
Any tpreplay_app,
Any tprm_conf,
str channel_map,
list[TPStreamFile] sorted_tpstream_files,
int total_unique_planes,
set[int] planes_to_filter,
str path_str,
int n_loops )
Takes all changes and updates the local database files.
[total_planes in TPRM
 tp_streams in TPRM
 planes in TPRM
 tp_source_ids in tpreplay
 ]

Definition at line 312 of file __main__.py.

312def update_configuration(cfg: conffwk.Configuration, tpreplay_app: Any, tprm_conf: Any, channel_map: str, sorted_tpstream_files: list[TPStreamFile], total_unique_planes: int, planes_to_filter: set[int], path_str: str, n_loops: int) -> None:
313 """
314 Takes all changes and updates the local database files.
315 [total_planes in TPRM
316 tp_streams in TPRM
317 planes in TPRM
318 tp_source_ids in tpreplay
319 ]
320 """
321 logging.info("Updating configuration with new TPStream data")
322 tprm_conf.total_planes = total_unique_planes
323 tprm_conf.number_of_loops = n_loops
324 tprm_conf.channel_map = channel_map
325
326 a_tp_stream = tprm_conf.tp_streams[0] if tprm_conf.tp_streams else None
327 tprm_conf.tp_streams = update_tpstream_dal_objects(a_tp_stream, cfg, sorted_tpstream_files)
328 logging.info("Total of %i TPStream configs created", len(tprm_conf.tp_streams))
329
330 if planes_to_filter:
331 tprm_conf.filter_out_plane = list(planes_to_filter)
332 logging.info("Total of %i planes to be filtered", len(tprm_conf.filter_out_plane))
333 else:
334 tprm_conf.filter_out_plane = []
335
336 a_sid = tpreplay_app.tp_source_ids[0] if tpreplay_app.tp_source_ids else None
337 tpreplay_app.tp_source_ids = update_sid_dal_objects(a_sid, cfg, total_unique_planes)
338 logging.info("Total of %i SID configs created", len(tpreplay_app.tp_source_ids))
339
340 randomTCmaker = update_RandomTCmaker_obj(cfg)
341
342 cleanup()
343 logging.debug("Committing updated configuration to database")
344
345 db_modules = conffwk.Configuration(f"oksconflibs:{path_str}/moduleconfs.data.xml")
346 db_trigger = conffwk.Configuration(f"oksconflibs:{path_str}/trigger-segment.data.xml")
347 for tpstream in tprm_conf.tp_streams:
348 db_modules.update_dal(tpstream)
349 for sid in tpreplay_app.tp_source_ids:
350 db_trigger.update_dal(sid)
351 db_trigger.update_dal(tpreplay_app)
352 db_modules.update_dal(tprm_conf)
353 if randomTCmaker is not None:
354 db_modules.update_dal(randomTCmaker)
355 db_modules.commit()
356 db_trigger.commit()
357 logging.info("Local database updated!")
358 cleanup()
359

◆ update_RandomTCmaker_obj()

Any tpreplay_application.__main__.update_RandomTCmaker_obj ( conffwk.Configuration cfg)
Changes the trigger_rate_hz for RandomTCMakerConf to 0 by default for replay.

Definition at line 296 of file __main__.py.

296def update_RandomTCmaker_obj(cfg: conffwk.Configuration) -> Any:
297 """
298 Changes the trigger_rate_hz for RandomTCMakerConf to 0 by default for replay.
299 """
300 randomTCmakers = cfg.get_dals("RandomTCMakerConf")
301 if randomTCmakers:
302 randomTCmaker = randomTCmakers[0]
303 logging.debug("Loaded randomTCmaker object")
304 randomTCmaker.trigger_rate_hz = 0
305 logging.debug("Updated randomTCmaker object")
306 return randomTCmaker
307
308 else:
309 logging.error("No 'RandomTCMakerConf' DAL objects found.")
310 return None
311

◆ update_sid_dal_objects()

list[Any] tpreplay_application.__main__.update_sid_dal_objects ( Any a_sid,
conffwk.Configuration cfg,
int total_unique_planes )
Creates SourceIDConf dal objects needed for each unique plane.
Additional safety to create these from scratch if an example instance is not found.

Definition at line 273 of file __main__.py.

273def update_sid_dal_objects(a_sid: Any, cfg: conffwk.Configuration, total_unique_planes: int) -> list[Any]:
274 """
275 Creates SourceIDConf dal objects needed for each unique plane.
276 Additional safety to create these from scratch if an example instance is not found.
277 """
278 if not a_sid:
279 logging.warning("No template Source ID object found")
280 # get template and create from scratch
281 a_sid_template = cfg.create_obj('SourceIDConf', "template-SourceIDConf")
282 cache = {"SourceIDConf": {}}
283 a_sid = a_sid_template.as_dal(cache)
284 all_sids = []
285 base_string = "tpreplay-tp-srcid-100000"
286 start_number = int(re.search(r'(\d+)$', base_string).group(1))
287 for i in range(1, total_unique_planes + 1):
288 temp_sid = copy.deepcopy(a_sid)
289 temp_sid.id = re.sub(r'\d+$', f"{start_number + i:06d}", base_string)
290 temp_sid.sid = i
291 temp_sid.subsystem = "Trigger"
292 all_sids.append(temp_sid)
293 logging.debug("Created SID config: %s", temp_sid)
294 return all_sids
295

◆ update_tpstream_dal_objects()

list[Any] tpreplay_application.__main__.update_tpstream_dal_objects ( Any a_tp_stream,
conffwk.Configuration cfg,
list[TPStreamFile] sorted_tpstream_files )
Creates TPStreamConf dal objects for the provided TP Stream files.
Additional safety to create these from scratch if an example instance is not found.

Definition at line 252 of file __main__.py.

252def update_tpstream_dal_objects(a_tp_stream: Any, cfg: conffwk.Configuration, sorted_tpstream_files: list[TPStreamFile]) -> list[Any]:
253 """
254 Creates TPStreamConf dal objects for the provided TP Stream files.
255 Additional safety to create these from scratch if an example instance is not found.
256 """
257 if not a_tp_stream:
258 logging.warning("No template TPStream object found")
259 # get template and create from scratch
260 a_tp_stream_template = cfg.create_obj('TPStreamConf', "template-TPStreamConf")
261 cache = {"TPStreamConf": {}}
262 a_tp_stream = a_tp_stream_template.as_dal(cache)
263 tp_streams = []
264 for a_file in sorted_tpstream_files:
265 temp_tp_stream = copy.deepcopy(a_tp_stream)
266 temp_tp_stream.id = f"def-tp-stream-{a_file.index}"
267 temp_tp_stream.filename = a_file.filename
268 temp_tp_stream.index = a_file.index
269 tp_streams.append(temp_tp_stream)
270 logging.debug("Created TPStream: %s", temp_tp_stream)
271 return tp_streams
272

◆ update_tpstream_indices()

list[TPStreamFile] tpreplay_application.__main__.update_tpstream_indices ( list[TPStreamFile] tpstream_files)
Sorts the files by the time of the very first TP.
The data will be sorted in TPRM, but this speeds it up.

Definition at line 239 of file __main__.py.

239def update_tpstream_indices(tpstream_files: list[TPStreamFile]) -> list[TPStreamFile]:
240 """
241 Sorts the files by the time of the very first TP.
242 The data will be sorted in TPRM, but this speeds it up.
243 """
244 logging.info("Sorting TPStream files by time and assigning indices")
245 sorted_files = sorted(tpstream_files, key=lambda x: x.stime)
246 for i, tp_file in enumerate(sorted_files):
247 tp_file.index = i + 1
248 cleanup()
249 logging.debug("Sorted TPSTreamFile objects: %s", sorted_files)
250 return sorted_files
251