74 Copies over relevant configuration files for a provided .data.xml file.
75 The default is the example-configs.data.xml, which contains the default sessions.
77 logging.info(
"Setting up configuration files")
78 logging.debug(
"Local path for configurations: %s", path_str)
79 path = Path(path_str).resolve()
80 path.mkdir(parents=
True, exist_ok=
True)
81 external_logger = logging.getLogger(
'daqconf.consolidate')
82 if not verbose: external_logger.setLevel(logging.WARNING)
83 copy_configuration(path, [sessions_file])
86 logging.debug(
"Copying configuration represented by databases: %s", [sessions_file])
149def extract_rous_and_planes(files: list[str], channel_map:
'detchannelmaps._daq_detchannelmaps_py.TPCChannelMap', planes_to_filter: set[int]) -> (list[TPStreamFile], ROUPlaneData):
151 This function goes over the provided TPStream files.
152 It extracts the readout units used to generate the data in the files.
153 It also extracts unfiltered planes for each readout unit.
155 logging.info(
"Extracting ROUs and planes from files")
156 all_tpstream_files = []
159 valid_subdetectors = {
160 int(detdataformats.DetID.Subdetector.kHD_TPC),
161 int(detdataformats.DetID.Subdetector.kVD_BottomTPC),
162 int(detdataformats.DetID.Subdetector.kVD_TopTPC),
163 int(detdataformats.DetID.Subdetector.kNDLAr_TPC)
166 for tpstream_file
in files:
167 logging.debug(
"Processing file: %s", tpstream_file)
170 if not loaded_file.is_timeslice_type:
171 logging.critical(
"File %s is not a TP Stream file!", tpstream_file)
174 all_record_ids = loaded_file.get_all_record_ids()
175 if not all_record_ids:
176 logging.critical(
"File %s does not have valid records!", tpstream_file)
181 for a_record
in tqdm(all_record_ids, desc=
"Processing records"):
184 source_ids = loaded_file.get_source_ids_for_fragment_type(a_record,
"Trigger_Primitive")
185 if len(source_ids) == 0:
186 logging.critical(
"File %s does not have valid SourceIDs!", tpstream_file)
188 logging.debug(
"SIDs: %s", source_ids)
190 for i, sid
in enumerate(source_ids):
191 frag = loaded_file.get_frag(a_record, sid)
193 if frag.get_data_size() < 1:
194 logging.critical(
"File %s has an empty fragment!", tpstream_file)
196 tp = trgdataformats.TriggerPrimitive(frag.get_data(0))
198 if first_record ==
True:
199 all_tpstream_files.append(
TPStreamFile(tpstream_file, tp.time_start, 0))
200 logging.debug(
"First time start: %s", tp.time_start)
205 if subdet
not in valid_subdetectors:
206 logging.debug(
"Subdetector %s is not in the map of valid subdetectors!", detdataformats.DetID.subdetector_to_string( detdataformats.DetID.Subdetector( subdet ) ) )
209 plane = channel_map.get_plane_from_offline_channel(tp.channel)
210 if plane
not in planes_to_filter:
211 rou = channel_map.get_element_name_from_offline_channel(tp.channel)
212 rou_plane_data.add_value(rou, plane)
213 logging.debug(
"Extracted rou: %s for plane: %s", rou, plane)
215 logging.debug(
"Plane %s filtered", plane)
219 del loaded_file, a_record, source_ids
222 logging.info(
"Extracted ROUs and planes: %s", rou_plane_data.data)
225 if not rou_plane_data.data:
226 logging.critical(
"No valid data was extracted!")
229 return all_tpstream_files, rou_plane_data
246 Creates TPStreamConf dal objects for the provided TP Stream files.
247 Additional safety to create these from scratch if an example instance is not found.
250 logging.warning(
"No template TPStream object found")
252 a_tp_stream_template = cfg.create_obj(
'TPStreamConf',
"template-TPStreamConf")
253 cache = {
"TPStreamConf": {}}
254 a_tp_stream = a_tp_stream_template.as_dal(cache)
256 for a_file
in sorted_tpstream_files:
257 temp_tp_stream = copy.deepcopy(a_tp_stream)
258 temp_tp_stream.id = f
"def-tp-stream-{a_file.index}"
259 temp_tp_stream.filename = a_file.filename
260 temp_tp_stream.index = a_file.index
261 tp_streams.append(temp_tp_stream)
262 logging.debug(
"Created TPStream: %s", temp_tp_stream)
267 Creates SourceIDConf dal objects needed for each unique plane.
268 Additional safety to create these from scratch if an example instance is not found.
271 logging.warning(
"No template Source ID object found")
273 a_sid_template = cfg.create_obj(
'SourceIDConf',
"template-SourceIDConf")
274 cache = {
"SourceIDConf": {}}
275 a_sid = a_sid_template.as_dal(cache)
277 base_string =
"tpreplay-tp-srcid-100000"
278 start_number = int(re.search(
r'(\d+)$', base_string).group(1))
279 for i
in range(1, total_unique_planes + 1):
280 temp_sid = copy.deepcopy(a_sid)
281 temp_sid.id = re.sub(
r'\d+$', f
"{start_number + i:06d}", base_string)
283 temp_sid.subsystem =
"Trigger"
284 all_sids.append(temp_sid)
285 logging.debug(
"Created SID config: %s", temp_sid)
304def update_configuration(cfg: conffwk.Configuration, tpreplay_app: Any, tprm_conf: Any, channel_map: str, sorted_tpstream_files: list[TPStreamFile], total_unique_planes: int, planes_to_filter: set[int], path_str: str, n_loops: int) ->
None:
306 Takes all changes and updates the local database files.
307 [total_planes in TPRM
310 tp_source_ids in tpreplay
313 logging.info(
"Updating configuration with new TPStream data")
314 tprm_conf.total_planes = total_unique_planes
315 tprm_conf.number_of_loops = n_loops
316 tprm_conf.channel_map = channel_map
318 a_tp_stream = tprm_conf.tp_streams[0]
if tprm_conf.tp_streams
else None
320 logging.info(
"Total of %i TPStream configs created", len(tprm_conf.tp_streams))
323 tprm_conf.filter_out_plane = list(planes_to_filter)
324 logging.info(
"Total of %i planes to be filtered", len(tprm_conf.filter_out_plane))
326 tprm_conf.filter_out_plane = []
328 a_sid = tpreplay_app.tp_source_ids[0]
if tpreplay_app.tp_source_ids
else None
330 logging.info(
"Total of %i SID configs created", len(tpreplay_app.tp_source_ids))
335 logging.debug(
"Committing updated configuration to database")
339 for tpstream
in tprm_conf.tp_streams:
340 db_modules.update_dal(tpstream)
341 for sid
in tpreplay_app.tp_source_ids:
342 db_trigger.update_dal(sid)
343 db_trigger.update_dal(tpreplay_app)
344 db_modules.update_dal(tprm_conf)
345 if randomTCmaker
is not None:
346 db_modules.update_dal(randomTCmaker)
349 logging.info(
"Local database updated!")
353 parser = argparse.ArgumentParser(description=
"To be used with TP Replay Application. Process TPStream data and update configuration.",
354 formatter_class=argparse.RawTextHelpFormatter)
355 parser.add_argument(
"--files", type=str, required=
True, help=
"Text file with (full) paths to HDF5 TPStream file location. One per line.")
356 parser.add_argument(
"--filter-planes", type=int, nargs=
'*', choices=[0, 1, 2], default=[], help=
"list of planes to filter out. Can be empty or contain any combination of 0 (U), 1 (V), and 2 (X).")
357 parser.add_argument(
"--channel-map", type=str, default=
'PD2HDTPCChannelMap',
358 help=
"Specify the channel map to use. Available examples include: PD2HDTPCChannelMap, PD2VDBottomTPCChannelMap, VDColdboxTPCChannelMap, HDColdboxTPCChannelMap.\n"
359 "For more details, visit: https://github.com/DUNE-DAQ/detchannelmaps/blob/develop/docs/channel-maps-table.md")
360 parser.add_argument(
"--n-loops", type=int, default=-1, help=
"Number of times to loop over the provided data.")
361 parser.add_argument(
"--config", type=str, default=
"config/daqsystemtest/example-configs.data.xml", help=
"Path to OKS configuration file.")
362 parser.add_argument(
"--path", type=str, default=
"tpreplay-run", help=
"Path for local output for configuration files.")
363 parser.add_argument(
"--mem-limit", type=int, default=25, help=
"This will set a memory limit [GB] to protect the machine.")
364 parser.add_argument(
"--verbose", action=
"store_true", help=
"Enable verbose logging.")
365 args = parser.parse_args()
373 logging.info(
"Starting TPStream processing script")
376 logging.debug(
"TP Replay application configuration: %s", tpreplay_app)
377 tprm_conf = tpreplay_app.tprm_conf
378 logging.debug(
"TPRM configuration: %s", tprm_conf)
380 planes_to_filter = set(args.filter_planes)
381 logging.debug(
"Planes to filter: %s", planes_to_filter)
388 total_unique_planes = rou_plane_data.total_plane_count()
389 logging.info(
"Total plane count: %d", total_unique_planes)
390 update_configuration(cfg, tpreplay_app, tprm_conf, args.channel_map, sorted_tpstream_files, total_unique_planes, planes_to_filter, args.path, args.n_loops)
set[int] get_values(self, str rou)
None add_value(self, str rou, int plane)
int total_plane_count(self)
None check_files(list[str] files)
Any get_tpreplay_app(conffwk.Configuration cfg)
(list[TPStreamFile], ROUPlaneData) extract_rous_and_planes(list[str] files, 'detchannelmaps._daq_detchannelmaps_py.TPCChannelMap' channel_map, set[int] planes_to_filter)
None setup_logging(bool verbose)
list[str] get_tpstream_files(str filename)
list[TPStreamFile] update_tpstream_indices(list[TPStreamFile] tpstream_files)
None set_mem_limit(int mem_limit)
list[Any] update_sid_dal_objects(Any a_sid, conffwk.Configuration cfg, int total_unique_planes)
Any update_RandomTCmaker_obj(conffwk.Configuration cfg)
None update_configuration(conffwk.Configuration cfg, Any tpreplay_app, Any tprm_conf, str channel_map, list[TPStreamFile] sorted_tpstream_files, int total_unique_planes, set[int] planes_to_filter, str path_str, int n_loops)
detchannelmaps._daq_detchannelmaps_py.TPCChannelMap load_channel_map(str channel_map_string)
list[Any] update_tpstream_dal_objects(Any a_tp_stream, conffwk.Configuration cfg, list[TPStreamFile] sorted_tpstream_files)
conffwk.Configuration setup_configuration(str path_str, str sessions_file, bool verbose)