74 Copies over relevant configuration files for a provided .data.xml file.
75 The default is the example-configs.data.xml, which contains the default sessions.
77 logging.info(
"Setting up configuration files")
78 logging.debug(
"Local path for configurations: %s", path_str)
79 path = Path(path_str).resolve()
80 path.mkdir(parents=
True, exist_ok=
True)
81 external_logger = logging.getLogger(
'daqconf.consolidate')
82 if not verbose: external_logger.setLevel(logging.WARNING)
83 copy_configuration(path, [sessions_file])
86 logging.debug(
"Copying configuration represented by databases: %s", [sessions_file])
149def extract_rous_and_planes(files: list[str], channel_map:
'detchannelmaps._daq_detchannelmaps_py.TPCChannelMap', planes_to_filter: set[int]) -> (list[TPStreamFile], ROUPlaneData):
151 This function goes over the provided TPStream files.
152 It extracts the readout units used to generate the data in the files.
153 It also extracts unfiltered planes for each readout unit.
155 logging.info(
"Extracting ROUs and planes from files")
156 all_tpstream_files = []
159 valid_subdetectors = {
160 int(detdataformats.DetID.Subdetector.kHD_TPC),
161 int(detdataformats.DetID.Subdetector.kVD_BottomTPC),
162 int(detdataformats.DetID.Subdetector.kVD_TopTPC),
163 int(detdataformats.DetID.Subdetector.kNDLAr_TPC)
166 for tpstream_file
in files:
167 logging.debug(
"Processing file: %s", tpstream_file)
170 if not loaded_file.is_timeslice_type:
171 logging.critical(
"File %s is not a TP Stream file!", tpstream_file)
174 all_record_ids = loaded_file.get_all_record_ids()
175 if not all_record_ids:
176 logging.critical(
"File %s does not have valid records!", tpstream_file)
181 for a_record
in tqdm(all_record_ids, desc=
"Processing records"):
184 source_ids = loaded_file.get_source_ids_for_fragment_type(a_record,
"Trigger_Primitive")
185 if len(source_ids) == 0:
186 logging.critical(
"File %s does not have valid SourceIDs!", tpstream_file)
188 logging.debug(
"SIDs: %s", source_ids)
190 for i, sid
in enumerate(source_ids):
191 frag = loaded_file.get_frag(a_record, sid)
193 if frag.get_data_size() < 1:
194 logging.critical(
"File %s has an empty fragment!", tpstream_file)
196 tp = trgdataformats.TriggerPrimitive(frag.get_data(0))
198 if first_record ==
True:
199 all_tpstream_files.append(
TPStreamFile(tpstream_file, tp.time_start, 0))
200 logging.debug(
"First time start: %s", tp.time_start)
205 if subdet
not in valid_subdetectors:
206 logging.debug(
"Subdetector %s is not in the map of valid subdetectors!", detdataformats.DetID.subdetector_to_string( detdataformats.DetID.Subdetector( subdet ) ) )
209 plane = channel_map.get_plane_from_offline_channel(tp.channel)
213 rou = channel_map.get_element_name_from_offline_channel(tp.channel)
214 if rou ==
"APA_P02SU":
216 plane = 2
if plane == 1
else 1
218 if plane
not in planes_to_filter:
219 rou = channel_map.get_element_name_from_offline_channel(tp.channel)
220 rou_plane_data.add_value(rou, plane)
221 logging.debug(
"Extracted rou: %s for plane: %s", rou, plane)
223 logging.debug(
"Plane %s filtered", plane)
227 del loaded_file, a_record, source_ids
230 logging.info(
"Extracted ROUs and planes: %s", rou_plane_data.data)
233 if not rou_plane_data.data:
234 logging.critical(
"No valid data was extracted!")
237 return all_tpstream_files, rou_plane_data
254 Creates TPStreamConf dal objects for the provided TP Stream files.
255 Additional safety to create these from scratch if an example instance is not found.
258 logging.warning(
"No template TPStream object found")
260 a_tp_stream_template = cfg.create_obj(
'TPStreamConf',
"template-TPStreamConf")
261 cache = {
"TPStreamConf": {}}
262 a_tp_stream = a_tp_stream_template.as_dal(cache)
264 for a_file
in sorted_tpstream_files:
265 temp_tp_stream = copy.deepcopy(a_tp_stream)
266 temp_tp_stream.id = f
"def-tp-stream-{a_file.index}"
267 temp_tp_stream.filename = a_file.filename
268 temp_tp_stream.index = a_file.index
269 tp_streams.append(temp_tp_stream)
270 logging.debug(
"Created TPStream: %s", temp_tp_stream)
275 Creates SourceIDConf dal objects needed for each unique plane.
276 Additional safety to create these from scratch if an example instance is not found.
279 logging.warning(
"No template Source ID object found")
281 a_sid_template = cfg.create_obj(
'SourceIDConf',
"template-SourceIDConf")
282 cache = {
"SourceIDConf": {}}
283 a_sid = a_sid_template.as_dal(cache)
285 base_string =
"tpreplay-tp-srcid-100000"
286 start_number = int(re.search(
r'(\d+)$', base_string).group(1))
287 for i
in range(1, total_unique_planes + 1):
288 temp_sid = copy.deepcopy(a_sid)
289 temp_sid.id = re.sub(
r'\d+$', f
"{start_number + i:06d}", base_string)
291 temp_sid.subsystem =
"Trigger"
292 all_sids.append(temp_sid)
293 logging.debug(
"Created SID config: %s", temp_sid)
312def update_configuration(cfg: conffwk.Configuration, tpreplay_app: Any, tprm_conf: Any, channel_map: str, sorted_tpstream_files: list[TPStreamFile], total_unique_planes: int, planes_to_filter: set[int], path_str: str, n_loops: int) ->
None:
314 Takes all changes and updates the local database files.
315 [total_planes in TPRM
318 tp_source_ids in tpreplay
321 logging.info(
"Updating configuration with new TPStream data")
322 tprm_conf.total_planes = total_unique_planes
323 tprm_conf.number_of_loops = n_loops
324 tprm_conf.channel_map = channel_map
326 a_tp_stream = tprm_conf.tp_streams[0]
if tprm_conf.tp_streams
else None
328 logging.info(
"Total of %i TPStream configs created", len(tprm_conf.tp_streams))
331 tprm_conf.filter_out_plane = list(planes_to_filter)
332 logging.info(
"Total of %i planes to be filtered", len(tprm_conf.filter_out_plane))
334 tprm_conf.filter_out_plane = []
336 a_sid = tpreplay_app.tp_source_ids[0]
if tpreplay_app.tp_source_ids
else None
338 logging.info(
"Total of %i SID configs created", len(tpreplay_app.tp_source_ids))
343 logging.debug(
"Committing updated configuration to database")
347 for tpstream
in tprm_conf.tp_streams:
348 db_modules.update_dal(tpstream)
349 for sid
in tpreplay_app.tp_source_ids:
350 db_trigger.update_dal(sid)
351 db_trigger.update_dal(tpreplay_app)
352 db_modules.update_dal(tprm_conf)
353 if randomTCmaker
is not None:
354 db_modules.update_dal(randomTCmaker)
357 logging.info(
"Local database updated!")
361 parser = argparse.ArgumentParser(description=
"To be used with TP Replay Application. Process TPStream data and update configuration.",
362 formatter_class=argparse.RawTextHelpFormatter)
363 parser.add_argument(
"--files", type=str, required=
True, help=
"Text file with (full) paths to HDF5 TPStream file location. One per line.")
364 parser.add_argument(
"--filter-planes", type=int, nargs=
'*', choices=[0, 1, 2], default=[], help=
"list of planes to filter out. Can be empty or contain any combination of 0 (U), 1 (V), and 2 (X).")
365 parser.add_argument(
"--channel-map", type=str, default=
'PD2HDTPCChannelMap',
366 help=
"Specify the channel map to use. Available examples include: PD2HDTPCChannelMap, PD2VDBottomTPCChannelMap, VDColdboxTPCChannelMap, HDColdboxTPCChannelMap.\n"
367 "For more details, visit: https://github.com/DUNE-DAQ/detchannelmaps/blob/develop/docs/channel-maps-table.md")
368 parser.add_argument(
"--n-loops", type=int, default=-1, help=
"Number of times to loop over the provided data.")
369 parser.add_argument(
"--config", type=str, default=
"config/daqsystemtest/example-configs.data.xml", help=
"Path to OKS configuration file.")
370 parser.add_argument(
"--path", type=str, default=
"tpreplay-run", help=
"Path for local output for configuration files.")
371 parser.add_argument(
"--mem-limit", type=int, default=25, help=
"This will set a memory limit [GB] to protect the machine.")
372 parser.add_argument(
"--verbose", action=
"store_true", help=
"Enable verbose logging.")
373 args = parser.parse_args()
381 logging.info(
"Starting TPStream processing script")
384 logging.debug(
"TP Replay application configuration: %s", tpreplay_app)
385 tprm_conf = tpreplay_app.tprm_conf
386 logging.debug(
"TPRM configuration: %s", tprm_conf)
388 planes_to_filter = set(args.filter_planes)
389 logging.debug(
"Planes to filter: %s", planes_to_filter)
396 total_unique_planes = rou_plane_data.total_plane_count()
397 logging.info(
"Total plane count: %d", total_unique_planes)
398 update_configuration(cfg, tpreplay_app, tprm_conf, args.channel_map, sorted_tpstream_files, total_unique_planes, planes_to_filter, args.path, args.n_loops)
set[int] get_values(self, str rou)
None add_value(self, str rou, int plane)
int total_plane_count(self)
None check_files(list[str] files)
Any get_tpreplay_app(conffwk.Configuration cfg)
(list[TPStreamFile], ROUPlaneData) extract_rous_and_planes(list[str] files, 'detchannelmaps._daq_detchannelmaps_py.TPCChannelMap' channel_map, set[int] planes_to_filter)
None setup_logging(bool verbose)
list[str] get_tpstream_files(str filename)
list[TPStreamFile] update_tpstream_indices(list[TPStreamFile] tpstream_files)
None set_mem_limit(int mem_limit)
list[Any] update_sid_dal_objects(Any a_sid, conffwk.Configuration cfg, int total_unique_planes)
Any update_RandomTCmaker_obj(conffwk.Configuration cfg)
None update_configuration(conffwk.Configuration cfg, Any tpreplay_app, Any tprm_conf, str channel_map, list[TPStreamFile] sorted_tpstream_files, int total_unique_planes, set[int] planes_to_filter, str path_str, int n_loops)
detchannelmaps._daq_detchannelmaps_py.TPCChannelMap load_channel_map(str channel_map_string)
list[Any] update_tpstream_dal_objects(Any a_tp_stream, conffwk.Configuration cfg, list[TPStreamFile] sorted_tpstream_files)
conffwk.Configuration setup_configuration(str path_str, str sessions_file, bool verbose)