DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
tpreplay_application.__main__ Namespace Reference

Classes

class  ROUPlaneData
 
class  TPStreamFile
 

Functions

None setup_logging (bool verbose)
 
None set_mem_limit (int mem_limit)
 
None cleanup ()
 
conffwk.Configuration setup_configuration (str path_str, str sessions_file, bool verbose)
 
Any get_tpreplay_app (conffwk.Configuration cfg)
 
detchannelmaps._daq_detchannelmaps_py.TPCChannelMap load_channel_map (str channel_map_string)
 
list[str] get_tpstream_files (str filename)
 
None check_files (list[str] files)
 
(list[TPStreamFile], ROUPlaneDataextract_rous_and_planes (list[str] files, 'detchannelmaps._daq_detchannelmaps_py.TPCChannelMap' channel_map, set[int] planes_to_filter)
 
list[TPStreamFileupdate_tpstream_indices (list[TPStreamFile] tpstream_files)
 
list[Any] update_tpstream_dal_objects (Any a_tp_stream, conffwk.Configuration cfg, list[TPStreamFile] sorted_tpstream_files)
 
list[Any] update_sid_dal_objects (Any a_sid, conffwk.Configuration cfg, int total_unique_planes)
 
Any update_RandomTCmaker_obj (conffwk.Configuration cfg)
 
None update_configuration (conffwk.Configuration cfg, Any tpreplay_app, Any tprm_conf, str channel_map, list[TPStreamFile] sorted_tpstream_files, int total_unique_planes, set[int] planes_to_filter, str path_str, int n_loops)
 
 main ()
 

Function Documentation

◆ check_files()

None tpreplay_application.__main__.check_files ( list[str] files)
Very basic checks on the provided TPStream files.

Definition at line 133 of file __main__.py.

133def check_files(files: list[str]) -> None:
134 """
135 Very basic checks on the provided TPStream files.
136 """
137 for a_file in files:
138
140 path = Path(a_file)
141 if not path.exists():
142 logging.critical("File %s does not exist!", a_file)
143 sys.exit(1)
144 # check it's hdf5
145 if not path.suffix.lower() in ('.h5', '.hdf5'):
146 logging.critical("File %s does not seem to be hdf5 file!", a_file)
147 sys.exit(1)
148

◆ cleanup()

None tpreplay_application.__main__.cleanup ( )

Definition at line 45 of file __main__.py.

45def cleanup() -> None:
46 gc.collect()
47
48# custom type to hold a map of all (unique) ReadoutUnits and the corresponding planes for each
49@dataclass

◆ extract_rous_and_planes()

(list[TPStreamFile], ROUPlaneData) tpreplay_application.__main__.extract_rous_and_planes ( list[str] files,
'detchannelmaps._daq_detchannelmaps_py.TPCChannelMap' channel_map,
set[int] planes_to_filter )
This function goes over the provided TPStream files.
It extracts the readout units used to generate the data in the files.
It also extracts unfiltered planes for each readout unit.

Definition at line 149 of file __main__.py.

149def extract_rous_and_planes(files: list[str], channel_map: 'detchannelmaps._daq_detchannelmaps_py.TPCChannelMap', planes_to_filter: set[int]) -> (list[TPStreamFile], ROUPlaneData):
150 """
151 This function goes over the provided TPStream files.
152 It extracts the readout units used to generate the data in the files.
153 It also extracts unfiltered planes for each readout unit.
154 """
155 logging.info("Extracting ROUs and planes from files")
156 all_tpstream_files = []
157 rou_plane_data = ROUPlaneData()
158
159 valid_subdetectors = {
160 int(detdataformats.DetID.Subdetector.kHD_TPC),
161 int(detdataformats.DetID.Subdetector.kVD_BottomTPC),
162 int(detdataformats.DetID.Subdetector.kVD_TopTPC),
163 int(detdataformats.DetID.Subdetector.kNDLAr_TPC)
164 }
165
166 for tpstream_file in files:
167 logging.debug("Processing file: %s", tpstream_file)
168 loaded_file = HDF5RawDataFile(tpstream_file)
169 # check is tpstream
170 if not loaded_file.is_timeslice_type:
171 logging.critical("File %s is not a TP Stream file!", tpstream_file)
172 sys.exit(1)
173 # check has records
174 all_record_ids = loaded_file.get_all_record_ids()
175 if not all_record_ids:
176 logging.critical("File %s does not have valid records!", tpstream_file)
177 sys.exit(1)
178
179 # loop over all records :/
180 first_record = True
181 for a_record in tqdm(all_record_ids, desc="Processing records"):
182
183 # check has source IDs
184 source_ids = loaded_file.get_source_ids_for_fragment_type(a_record, "Trigger_Primitive")
185 if len(source_ids) == 0:
186 logging.critical("File %s does not have valid SourceIDs!", tpstream_file)
187 sys.exit(1)
188 logging.debug("SIDs: %s", source_ids)
189
190 for i, sid in enumerate(source_ids):
191 frag = loaded_file.get_frag(a_record, sid)
192 # check frag has data
193 if frag.get_data_size() < 1:
194 logging.critical("File %s has an empty fragment!", tpstream_file)
195 sys.exit(1)
196 tp = trgdataformats.TriggerPrimitive(frag.get_data(0))
197
198 if first_record == True:
199 all_tpstream_files.append(TPStreamFile(tpstream_file, tp.time_start, 0))
200 logging.debug("First time start: %s", tp.time_start)
201 first_record = False
202
203 # check subdetector
204 subdet = tp.detid
205 if subdet not in valid_subdetectors:
206 logging.debug("Subdetector %s is not in the map of valid subdetectors!", detdataformats.DetID.subdetector_to_string( detdataformats.DetID.Subdetector( subdet ) ) )
207 continue
208
209 plane = channel_map.get_plane_from_offline_channel(tp.channel)
210 if plane not in planes_to_filter:
211 rou = channel_map.get_element_name_from_offline_channel(tp.channel)
212 rou_plane_data.add_value(rou, plane)
213 logging.debug("Extracted rou: %s for plane: %s", rou, plane)
214 else:
215 logging.debug("Plane %s filtered", plane)
216 cleanup()
217 del frag, tp
218 cleanup()
219 del loaded_file, a_record, source_ids
220
221 # No need for an if verbose check here, as logging level is already set
222 logging.info("Extracted ROUs and planes: %s", rou_plane_data.data)
223
224 # Case when we didn't extract any valid/useful sourceIDs
225 if not rou_plane_data.data:
226 logging.critical("No valid data was extracted!")
227 sys.exit(1)
228
229 return all_tpstream_files, rou_plane_data
230

◆ get_tpreplay_app()

Any tpreplay_application.__main__.get_tpreplay_app ( conffwk.Configuration cfg)
Retrieves the instance of TPReplayApplication from configuration files.
If it does not exist, stops the script.
In theory, we could make one from scratch, however, there are so many objects to configure
that doing that externally is preferred.

Definition at line 90 of file __main__.py.

90def get_tpreplay_app(cfg: conffwk.Configuration) -> Any:
91 """
92 Retrieves the instance of TPReplayApplication from configuration files.
93 If it does not exist, stops the script.
94 In theory, we could make one from scratch, however, there are so many objects to configure
95 that doing that externally is preferred.
96 """
97 tpreplay_apps = cfg.get_dals("TPReplayApplication")
98 if tpreplay_apps:
99 tpreplay_app = tpreplay_apps[0]
100 logging.debug("Loaded tpreplay application")
101 return tpreplay_app
102 else:
103 logging.critical("No 'TPReplayApplication' DAL objects found.")
104 sys.exit(1)
105

◆ get_tpstream_files()

list[str] tpreplay_application.__main__.get_tpstream_files ( str filename)
Reads in names of tpstream files from provided text file.

Definition at line 119 of file __main__.py.

119def get_tpstream_files(filename: str) -> list[str]:
120 """
121 Reads in names of tpstream files from provided text file.
122 """
123 logging.info("Reading TPStream file list from %s", filename)
124 with open(filename, "r") as file:
125 tpstream_files = [line.strip() for line in file.readlines()]
126 cleanup()
127
128 logging.info("Total files to process: %i", len(tpstream_files))
129 logging.debug("TPStream files loaded: %s", tpstream_files)
130
131 return tpstream_files
132

◆ load_channel_map()

detchannelmaps._daq_detchannelmaps_py.TPCChannelMap tpreplay_application.__main__.load_channel_map ( str channel_map_string)
Tries to create a channel map using the provided string name.
If it fails, prints the error and exits.

Definition at line 106 of file __main__.py.

106def load_channel_map(channel_map_string: str) -> detchannelmaps._daq_detchannelmaps_py.TPCChannelMap:
107 """
108 Tries to create a channel map using the provided string name.
109 If it fails, prints the error and exits.
110 """
111 try:
112 channel_map = detchannelmaps.make_tpc_map(channel_map_string)
113 logging.debug(f"Channel map '{channel_map_string}' successfully created.")
114 return channel_map
115 except Exception as e:
116 logging.critical(f"Failed to create the channel map '{channel_map_string}'. Error: {str(e)}")
117 sys.exit(1)
118

◆ main()

tpreplay_application.__main__.main ( )

Definition at line 352 of file __main__.py.

352def main():
353 parser = argparse.ArgumentParser(description="To be used with TP Replay Application. Process TPStream data and update configuration.",
354 formatter_class=argparse.RawTextHelpFormatter)
355 parser.add_argument("--files", type=str, required=True, help="Text file with (full) paths to HDF5 TPStream file location. One per line.")
356 parser.add_argument("--filter-planes", type=int, nargs='*', choices=[0, 1, 2], default=[], help="list of planes to filter out. Can be empty or contain any combination of 0 (U), 1 (V), and 2 (X).")
357 parser.add_argument("--channel-map", type=str, default='PD2HDTPCChannelMap',
358 help="Specify the channel map to use. Available examples include: PD2HDTPCChannelMap, PD2VDBottomTPCChannelMap, VDColdboxTPCChannelMap, HDColdboxTPCChannelMap.\n"
359 "For more details, visit: https://github.com/DUNE-DAQ/detchannelmaps/blob/develop/docs/channel-maps-table.md")
360 parser.add_argument("--n-loops", type=int, default=-1, help="Number of times to loop over the provided data.")
361 parser.add_argument("--config", type=str, default="config/daqsystemtest/example-configs.data.xml", help="Path to OKS configuration file.")
362 parser.add_argument("--path", type=str, default="tpreplay-run", help="Path for local output for configuration files.")
363 parser.add_argument("--mem-limit", type=int, default=25, help="This will set a memory limit [GB] to protect the machine.")
364 parser.add_argument("--verbose", action="store_true", help="Enable verbose logging.")
365 args = parser.parse_args()
366
367 # Set up logging based on the verbose flag
368 setup_logging(args.verbose)
369
370 # Set memory limit
371 set_mem_limit(args.mem_limit)
372
373 logging.info("Starting TPStream processing script")
374 cfg = setup_configuration(args.path, args.config, args.verbose)
375 tpreplay_app = get_tpreplay_app(cfg)
376 logging.debug("TP Replay application configuration: %s", tpreplay_app)
377 tprm_conf = tpreplay_app.tprm_conf
378 logging.debug("TPRM configuration: %s", tprm_conf)
379 channel_map = load_channel_map(args.channel_map)
380 planes_to_filter = set(args.filter_planes)
381 logging.debug("Planes to filter: %s", planes_to_filter)
382
383 files = get_tpstream_files(args.files)
384 check_files(files)
385 all_tpstream_files, rou_plane_data = extract_rous_and_planes(files, channel_map, planes_to_filter)
386 sorted_tpstream_files = update_tpstream_indices(all_tpstream_files)
387
388 total_unique_planes = rou_plane_data.total_plane_count()
389 logging.info("Total plane count: %d", total_unique_planes)
390 update_configuration(cfg, tpreplay_app, tprm_conf, args.channel_map, sorted_tpstream_files, total_unique_planes, planes_to_filter, args.path, args.n_loops)
391
int main(int argc, char **argv)

◆ set_mem_limit()

None tpreplay_application.__main__.set_mem_limit ( int mem_limit)
As a safety measure we set a memory limit for the process.
Should not be needed here as hdf5 processing is minimal.

Definition at line 35 of file __main__.py.

35def set_mem_limit(mem_limit: int) -> None:
36 """
37 As a safety measure we set a memory limit for the process.
38 Should not be needed here as hdf5 processing is minimal.
39 """
40 GB = 1024**3
41 memory_limit = mem_limit * GB
42 resource.setrlimit(resource.RLIMIT_AS, (memory_limit, memory_limit))
43 logging.debug("Setting memory limit to %i GBs", memory_limit/GB)
44

◆ setup_configuration()

conffwk.Configuration tpreplay_application.__main__.setup_configuration ( str path_str,
str sessions_file,
bool verbose )
Copies over relevant configuration files for a provided .data.xml file.
The default is the example-configs.data.xml, which contains the default sessions.

Definition at line 72 of file __main__.py.

72def setup_configuration(path_str: str, sessions_file: str, verbose: bool) -> conffwk.Configuration:
73 """
74 Copies over relevant configuration files for a provided .data.xml file.
75 The default is the example-configs.data.xml, which contains the default sessions.
76 """
77 logging.info("Setting up configuration files")
78 logging.debug("Local path for configurations: %s", path_str)
79 path = Path(path_str).resolve() # Convert string to Path object
80 path.mkdir(parents=True, exist_ok=True)
81 external_logger = logging.getLogger('daqconf.consolidate')
82 if not verbose: external_logger.setLevel(logging.WARNING)
83 copy_configuration(path, [sessions_file])
84 cleanup()
85
86 logging.debug("Copying configuration represented by databases: %s", [sessions_file])
87
88 return conffwk.Configuration(f"oksconflibs:{path}/example-configs.data.xml")
89

◆ setup_logging()

None tpreplay_application.__main__.setup_logging ( bool verbose)
Set up logging based on the verbose flag.
If verbose flag is provided, set the logging level to DEBUG to show all logs.
Otherwise, set it to INFO to only show INFO level logs and above.

Definition at line 23 of file __main__.py.

23def setup_logging(verbose: bool) -> None:
24 """
25 Set up logging based on the verbose flag.
26 If verbose flag is provided, set the logging level to DEBUG to show all logs.
27 Otherwise, set it to INFO to only show INFO level logs and above.
28 """
29 if verbose:
30 logging.basicConfig(level=logging.DEBUG, format="%(asctime)s - %(levelname)s - %(message)s")
31 logging.debug("Verbose logging configured!")
32 else:
33 logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
34

◆ update_configuration()

None tpreplay_application.__main__.update_configuration ( conffwk.Configuration cfg,
Any tpreplay_app,
Any tprm_conf,
str channel_map,
list[TPStreamFile] sorted_tpstream_files,
int total_unique_planes,
set[int] planes_to_filter,
str path_str,
int n_loops )
Takes all changes and updates the local database files.
[total_planes in TPRM
 tp_streams in TPRM
 planes in TPRM
 tp_source_ids in tpreplay
 ]

Definition at line 304 of file __main__.py.

304def update_configuration(cfg: conffwk.Configuration, tpreplay_app: Any, tprm_conf: Any, channel_map: str, sorted_tpstream_files: list[TPStreamFile], total_unique_planes: int, planes_to_filter: set[int], path_str: str, n_loops: int) -> None:
305 """
306 Takes all changes and updates the local database files.
307 [total_planes in TPRM
308 tp_streams in TPRM
309 planes in TPRM
310 tp_source_ids in tpreplay
311 ]
312 """
313 logging.info("Updating configuration with new TPStream data")
314 tprm_conf.total_planes = total_unique_planes
315 tprm_conf.number_of_loops = n_loops
316 tprm_conf.channel_map = channel_map
317
318 a_tp_stream = tprm_conf.tp_streams[0] if tprm_conf.tp_streams else None
319 tprm_conf.tp_streams = update_tpstream_dal_objects(a_tp_stream, cfg, sorted_tpstream_files)
320 logging.info("Total of %i TPStream configs created", len(tprm_conf.tp_streams))
321
322 if planes_to_filter:
323 tprm_conf.filter_out_plane = list(planes_to_filter)
324 logging.info("Total of %i planes to be filtered", len(tprm_conf.filter_out_plane))
325 else:
326 tprm_conf.filter_out_plane = []
327
328 a_sid = tpreplay_app.tp_source_ids[0] if tpreplay_app.tp_source_ids else None
329 tpreplay_app.tp_source_ids = update_sid_dal_objects(a_sid, cfg, total_unique_planes)
330 logging.info("Total of %i SID configs created", len(tpreplay_app.tp_source_ids))
331
332 randomTCmaker = update_RandomTCmaker_obj(cfg)
333
334 cleanup()
335 logging.debug("Committing updated configuration to database")
336
337 db_modules = conffwk.Configuration(f"oksconflibs:{path_str}/moduleconfs.data.xml")
338 db_trigger = conffwk.Configuration(f"oksconflibs:{path_str}/trigger-segment.data.xml")
339 for tpstream in tprm_conf.tp_streams:
340 db_modules.update_dal(tpstream)
341 for sid in tpreplay_app.tp_source_ids:
342 db_trigger.update_dal(sid)
343 db_trigger.update_dal(tpreplay_app)
344 db_modules.update_dal(tprm_conf)
345 if randomTCmaker is not None:
346 db_modules.update_dal(randomTCmaker)
347 db_modules.commit()
348 db_trigger.commit()
349 logging.info("Local database updated!")
350 cleanup()
351

◆ update_RandomTCmaker_obj()

Any tpreplay_application.__main__.update_RandomTCmaker_obj ( conffwk.Configuration cfg)
Changes the trigger_rate_hz for RandomTCMakerConf to 0 by default for replay.

Definition at line 288 of file __main__.py.

288def update_RandomTCmaker_obj(cfg: conffwk.Configuration) -> Any:
289 """
290 Changes the trigger_rate_hz for RandomTCMakerConf to 0 by default for replay.
291 """
292 randomTCmakers = cfg.get_dals("RandomTCMakerConf")
293 if randomTCmakers:
294 randomTCmaker = randomTCmakers[0]
295 logging.debug("Loaded randomTCmaker object")
296 randomTCmaker.trigger_rate_hz = 0
297 logging.debug("Updated randomTCmaker object")
298 return randomTCmaker
299
300 else:
301 logging.error("No 'RandomTCMakerConf' DAL objects found.")
302 return None
303

◆ update_sid_dal_objects()

list[Any] tpreplay_application.__main__.update_sid_dal_objects ( Any a_sid,
conffwk.Configuration cfg,
int total_unique_planes )
Creates SourceIDConf dal objects needed for each unique plane.
Additional safety to create these from scratch if an example instance is not found.

Definition at line 265 of file __main__.py.

265def update_sid_dal_objects(a_sid: Any, cfg: conffwk.Configuration, total_unique_planes: int) -> list[Any]:
266 """
267 Creates SourceIDConf dal objects needed for each unique plane.
268 Additional safety to create these from scratch if an example instance is not found.
269 """
270 if not a_sid:
271 logging.warning("No template Source ID object found")
272 # get template and create from scratch
273 a_sid_template = cfg.create_obj('SourceIDConf', "template-SourceIDConf")
274 cache = {"SourceIDConf": {}}
275 a_sid = a_sid_template.as_dal(cache)
276 all_sids = []
277 base_string = "tpreplay-tp-srcid-100000"
278 start_number = int(re.search(r'(\d+)$', base_string).group(1))
279 for i in range(1, total_unique_planes + 1):
280 temp_sid = copy.deepcopy(a_sid)
281 temp_sid.id = re.sub(r'\d+$', f"{start_number + i:06d}", base_string)
282 temp_sid.sid = i
283 temp_sid.subsystem = "Trigger"
284 all_sids.append(temp_sid)
285 logging.debug("Created SID config: %s", temp_sid)
286 return all_sids
287

◆ update_tpstream_dal_objects()

list[Any] tpreplay_application.__main__.update_tpstream_dal_objects ( Any a_tp_stream,
conffwk.Configuration cfg,
list[TPStreamFile] sorted_tpstream_files )
Creates TPStreamConf dal objects for the provided TP Stream files.
Additional safety to create these from scratch if an example instance is not found.

Definition at line 244 of file __main__.py.

244def update_tpstream_dal_objects(a_tp_stream: Any, cfg: conffwk.Configuration, sorted_tpstream_files: list[TPStreamFile]) -> list[Any]:
245 """
246 Creates TPStreamConf dal objects for the provided TP Stream files.
247 Additional safety to create these from scratch if an example instance is not found.
248 """
249 if not a_tp_stream:
250 logging.warning("No template TPStream object found")
251 # get template and create from scratch
252 a_tp_stream_template = cfg.create_obj('TPStreamConf', "template-TPStreamConf")
253 cache = {"TPStreamConf": {}}
254 a_tp_stream = a_tp_stream_template.as_dal(cache)
255 tp_streams = []
256 for a_file in sorted_tpstream_files:
257 temp_tp_stream = copy.deepcopy(a_tp_stream)
258 temp_tp_stream.id = f"def-tp-stream-{a_file.index}"
259 temp_tp_stream.filename = a_file.filename
260 temp_tp_stream.index = a_file.index
261 tp_streams.append(temp_tp_stream)
262 logging.debug("Created TPStream: %s", temp_tp_stream)
263 return tp_streams
264

◆ update_tpstream_indices()

list[TPStreamFile] tpreplay_application.__main__.update_tpstream_indices ( list[TPStreamFile] tpstream_files)
Sorts the files by the time of the very first TP.
The data will be sorted in TPRM, but this speeds it up.

Definition at line 231 of file __main__.py.

231def update_tpstream_indices(tpstream_files: list[TPStreamFile]) -> list[TPStreamFile]:
232 """
233 Sorts the files by the time of the very first TP.
234 The data will be sorted in TPRM, but this speeds it up.
235 """
236 logging.info("Sorting TPStream files by time and assigning indices")
237 sorted_files = sorted(tpstream_files, key=lambda x: x.stime)
238 for i, tp_file in enumerate(sorted_files):
239 tp_file.index = i + 1
240 cleanup()
241 logging.debug("Sorted TPSTreamFile objects: %s", sorted_files)
242 return sorted_files
243