DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
emulate_from_tpstream.cxx File Reference
#include "trgtools/TCEmulationUnit.hpp"
#include "trgtools/TAEmulationWorker.hpp"
#include "CLI/App.hpp"
#include "CLI/Config.hpp"
#include "CLI/Formatter.hpp"
#include <fmt/core.h>
#include <fmt/format.h>
#include <fmt/chrono.h>
#include <filesystem>
#include <optional>
#include "hdf5libs/HDF5RawDataFile.hpp"
#include "hdf5libs/HDF5SourceIDHandler.hpp"
#include "triggeralgs/TriggerCandidateFactory.hpp"
Include dependency graph for emulate_from_tpstream.cxx:

Go to the source code of this file.

Classes

struct  Options
 Struct with available cli application options. More...
 

Functions

void save_fragments (const std::string &_outputfilename, const hdf5libs::HDF5SourceIDHandler::source_id_geo_id_map_t &_sourceid_geoid_map, std::map< uint64_t, std::vector< std::unique_ptr< daqdataformats::Fragment > > > _frags, bool _quiet)
 Saves fragments in timeslice format into output HDF5 file.
 
std::map< std::string, std::vector< std::shared_ptr< hdf5libs::HDF5RawDataFile > > > sort_files_per_writer (const std::vector< std::string > &_files)
 Group and order input TimeSlice files per TPStream writer application.
 
std::pair< uint64_t, uint64_t > get_available_slice_id_range (const std::map< std::string, std::vector< std::shared_ptr< hdf5libs::HDF5RawDataFile > > > &_files, bool _quiet)
 Compute the common SliceID interval shared by all writer groups.
 
void parse_app (CLI::App &_app, Options &_opts)
 Adds options to our CLI application.
 
int main (int argc, char const *argv[])
 

Function Documentation

◆ get_available_slice_id_range()

std::pair< uint64_t, uint64_t > get_available_slice_id_range ( const std::map< std::string, std::vector< std::shared_ptr< hdf5libs::HDF5RawDataFile > > > & _files,
bool _quiet )

Compute the common SliceID interval shared by all writer groups.

For each writer application, this function scans all of its files and finds the minimum and maximum available record IDs. It then computes the global intersection across applications:

  • global_start = max(all per-application starts)
  • global_end = min(all per-application ends)

The returned range is therefore the SliceID window for which data is expected to be available from every application.

Todo
: Rather than returning the SliceID range, should try to return a time range and have processors use that.
Parameters
_filesMap of writer application names to vectors of input HDF5 files.
_quietIf false, print per-application and global range diagnostics.
Returns
Inclusive pair {global_start, global_end} of overlapping SliceIDs.
Exceptions
std::runtime_errorIf _files is empty, if any file has no records, or if no overlapping SliceID interval exists.

Definition at line 168 of file emulate_from_tpstream.cxx.

170{
171 if (_files.empty()) {
172 throw std::runtime_error("No files provided");
173 }
174
175 uint64_t global_start = std::numeric_limits<uint64_t>::min();
176 uint64_t global_end = std::numeric_limits<uint64_t>::max();
177
178 // Get the min & max record id per application, and the global
179 for (auto& [appid, vec_files]: _files) {
180 uint64_t app_start = std::numeric_limits<uint64_t>::max();
181 uint64_t app_end = std::numeric_limits<uint64_t>::min();
182
183 // Find min / max record id for this application
184 for (auto& file : vec_files) {
185 auto record_ids = file->get_all_record_ids();
186 if (record_ids.empty()) {
187 throw std::runtime_error(fmt::format("File from application {} contains no records.", appid));
188 }
189 app_start = std::min(app_start, record_ids.begin()->first);
190 app_end = std::max(app_end, record_ids.rbegin()->first);
191 }
192
193 // Update the global min / max record id
194 global_start = std::max(global_start, app_start);
195 global_end = std::min(global_end, app_end);
196
197 if (!_quiet) {
198 std::cout << "Application: " << appid << " " << " TimeSliceID start: " << app_start << " end: " << app_end << std::endl;
199 }
200 }
201
202 if (!_quiet) {
203 std::cout << "Global start: " << global_start << " Global end: " << global_end << std::endl;
204 }
205 if (global_start > global_end) {
206 throw std::runtime_error("One of the provided files' id range did not overlap with the rest. Please select files with overlapping TimeSlice IDs");
207 }
208
209 // Extra validation / error handling
210 for (auto& [appid, vec_files]: _files) {
211 for (auto& file : vec_files) {
212 auto record_ids = file->get_all_record_ids();
213
214 uint64_t file_start = record_ids.begin()->first;
215 uint64_t file_end = record_ids.rbegin()->first;
216 if ((file_start > global_end || file_end < global_start)) {
217 uint64_t file_index = file->get_attribute<size_t>("file_index");
218 throw std::runtime_error(fmt::format(
219 "File from TPStreamWrite application '{}' (index '{}') has record id range [{}, {}], which does not overlap with global range [{}, {}].",
220 appid, file_index, file_start, file_end, global_start, global_end
221 ));
222 }
223 }
224 }
225
226 return {global_start, global_end};
227}

◆ main()

int main ( int argc,
char const * argv[] )

Definition at line 282 of file emulate_from_tpstream.cxx.

283{
284 // Do all the CLI processing first
285 CLI::App app{"Offline trigger TriggerActivity & TriggerCandidate emulatior"};
286 Options opts{};
287
288 parse_app(app, opts);
289
290 try {
291 app.parse(argc, argv);
292 }
293 catch (const CLI::ParseError &e) {
294 return app.exit(e);
295 }
296
297 // Get the configuration file
298 std::ifstream config_stream(opts.config_name);
299 nlohmann::json config = nlohmann::json::parse(config_stream);
300
301 if (!opts.quiet) {
302 std::cout << "Files to process:\n";
303 for (const std::string& file : opts.input_files) {
304 std::cout << "- " << file << "\n";
305 }
306 }
307
308 // Sort the files into a map writer_id::vector<HDF5>
309 std::map<std::string, std::vector<std::shared_ptr<hdf5libs::HDF5RawDataFile>>> sorted_files =
310 sort_files_per_writer(opts.input_files);
311
312 // Get the available record_id range
313 std::pair<uint64_t, uint64_t> recordid_range = get_available_slice_id_range(sorted_files, opts.quiet);
314 std::pair<uint64_t, uint64_t> processing_range = recordid_range;
315
316 if (opts.slice_start.has_value()) {
317 processing_range.first = std::max(processing_range.first, opts.slice_start.value());
318 }
319
320 if (opts.num_slices.has_value()) {
321 if (opts.num_slices.value() == 0) {
322 throw std::runtime_error("Invalid --num-slices value: must be greater than 0");
323 }
324
325 uint64_t requested_end = processing_range.first + (opts.num_slices.value() - 1);
326 if (requested_end < processing_range.first) {
327 requested_end = std::numeric_limits<uint64_t>::max();
328 }
329
330 processing_range.second = std::min(processing_range.second, requested_end);
331 }
332
333 if (processing_range.first > processing_range.second) {
334 throw std::runtime_error(fmt::format(
335 "Requested timeslice subset (slice-start={}, num-slices={}) does not overlap available range [{}, {}]",
336 opts.slice_start.value_or(0),
337 opts.num_slices.value_or(0),
338 recordid_range.first,
339 recordid_range.second));
340 }
341
342 if (!opts.quiet) {
343 std::cout << "Processing TimeSliceID range: [" << processing_range.first << ", " << processing_range.second << "]" << std::endl;
344 }
345
346 // Create the file handlers
347 std::vector<std::unique_ptr<TAEmulationWorker>> ta_emu_workers;
348 for (auto [name, files] : sorted_files) {
349 ta_emu_workers.push_back(std::make_unique<TAEmulationWorker>(files, config, processing_range, opts.run_parallel, opts.quiet));
350 }
351
352 // Start each file handler
353 for (const auto& handler : ta_emu_workers) {
354 handler->start_processing();
355 }
356
357 // Output map of TA vectors & function that appends TAs to that vector
358 std::map<uint64_t, std::vector<triggeralgs::TriggerActivity>> tas;
359 auto append_tas = [&tas](std::map<uint64_t, std::vector<triggeralgs::TriggerActivity>>&& _tas) {
360 for (auto& [sliceid, src_vec] : _tas) {
361 auto& dest_vec = tas[sliceid];
362 dest_vec.reserve(dest_vec.size() + src_vec.size());
363
364 dest_vec.insert(dest_vec.end(), std::make_move_iterator(src_vec.begin()), std::make_move_iterator(src_vec.end()));
365 src_vec.clear();
366 }
367 };
368
369 // Output map of Fragment vectors & function that appends TAs to that vector
370 std::map<uint64_t, std::vector<std::unique_ptr<daqdataformats::Fragment>>> frags;
371 auto append_frags = [&frags](std::map<uint64_t, std::vector<std::unique_ptr<daqdataformats::Fragment>>>&& _frags) {
372 for (auto& [sliceid, src_vec] : _frags) {
373 auto& dest_vec = frags[sliceid];
374
375 dest_vec.reserve(dest_vec.size() + src_vec.size());
376
377 dest_vec.insert(dest_vec.end(), std::make_move_iterator(src_vec.begin()), std::make_move_iterator(src_vec.end()));
378 src_vec.clear();
379 }
380 };
381
382 // Iterate over the handlers, wait for them to complete their job & append
383 // their TAs to our vector when ready.
385 for (const auto& handler : ta_emu_workers) {
386 // Wait for all TAs to be made
387 handler->wait_to_complete_work();
388
389 // Append output TA/fragments
390 append_tas(std::move(handler->get_tas()));
391 append_frags(std::move(handler->get_frags()));
392
393 // Get and merge the source ID map
394 hdf5libs::HDF5SourceIDHandler::source_id_geo_id_map_t map = handler->get_sourceid_geoid_map();
395 sourceid_geoid_map.insert(map.begin(), map.end());
396 }
397
398 // Sort the TAs in each slice before pushing them into the TCMaker
399 size_t n_tas = 0;
400 for (auto& [sliceid, vec_tas]: tas) {
401 std::sort(vec_tas.begin(), vec_tas.end(),
403 return std::tie(a.time_start, a.channel_start, a.time_end) <
404 std::tie(b.time_start, b.channel_start, b.time_end);
405 });
406 n_tas += vec_tas.size();
407 }
408
409 if (!opts.quiet) {
410 std::cout << "Total number of TAs made: " << n_tas << std::endl;
411 std::cout << "Creating a TCMaker..." << std::endl;
412 }
413 // Create the TC emulator
414 std::string algo_name = config["trigger_candidate_plugin"][0];
415 nlohmann::json algo_config = config["trigger_candidate_config"][0];
416
417 std::unique_ptr<triggeralgs::TriggerCandidateMaker> tc_maker =
419 tc_maker->configure(algo_config);
420
421 trgtools::TCEmulationUnit tc_emulator;
422 tc_emulator.set_maker(tc_maker);
423
424 // Emulate the TriggerCandidates
425 std::vector<triggeralgs::TriggerCandidate> tcs;
426 for (auto& [sliceid, vec_tas]: tas) {
427 std::unique_ptr<daqdataformats::Fragment> tc_frag = tc_emulator.emulate_vector(vec_tas);
428 if (!tc_frag) {
429 continue;
430 }
431
432 // Manipulate the fragment header
433 daqdataformats::FragmentHeader frag_hdr = tc_frag->get_header();
434 frag_hdr.element_id = daqdataformats::SourceID{daqdataformats::SourceID::Subsystem::kTrigger, tc_frag->get_element_id().id+10000};
435
436 tc_frag->set_header_fields(frag_hdr);
437 tc_frag->set_type(daqdataformats::FragmentType::kTriggerCandidate);
438 tc_frag->set_trigger_number(sliceid);
439 tc_frag->set_window_begin(frags[sliceid][0]->get_window_begin());
440 tc_frag->set_window_end(frags[sliceid][0]->get_window_end());
441
442 // Push the fragment to our list
443 frags[sliceid].push_back(std::move(tc_frag));
444
445 std::vector<triggeralgs::TriggerCandidate> tmp_tcs = tc_emulator.get_last_output_buffer();
446 tcs.reserve(tcs.size() + tmp_tcs.size());
447 tcs.insert(tcs.end(), std::make_move_iterator(tmp_tcs.begin()), std::make_move_iterator(tmp_tcs.end()));
448 }
449 if (!opts.quiet) {
450 std::cout << "Total number of TCs made: " << tcs.size() << std::endl;
451 }
452
453 if (!frags.empty()) {
454 save_fragments(opts.output_filename, sourceid_geoid_map, std::move(frags), opts.quiet);
455 } else {
456 std::cout << "No TA/TC fragments generated. Output file will not be generated" << std::endl;
457 }
458
459 return 0;
460}
std::map< daqdataformats::SourceID, std::vector< uint64_t > > source_id_geo_id_map_t
static std::shared_ptr< AbstractFactory< TriggerCandidateMaker > > get_instance()
std::pair< uint64_t, uint64_t > get_available_slice_id_range(const std::map< std::string, std::vector< std::shared_ptr< hdf5libs::HDF5RawDataFile > > > &_files, bool _quiet)
Compute the common SliceID interval shared by all writer groups.
std::map< std::string, std::vector< std::shared_ptr< hdf5libs::HDF5RawDataFile > > > sort_files_per_writer(const std::vector< std::string > &_files)
Group and order input TimeSlice files per TPStream writer application.
void parse_app(CLI::App &_app, Options &_opts)
Adds options to our CLI application.
void save_fragments(const std::string &_outputfilename, const hdf5libs::HDF5SourceIDHandler::source_id_geo_id_map_t &_sourceid_geoid_map, std::map< uint64_t, std::vector< std::unique_ptr< daqdataformats::Fragment > > > _frags, bool _quiet)
Saves fragments in timeslice format into output HDF5 file.
Struct with available cli application options.

◆ parse_app()

void parse_app ( CLI::App & _app,
Options & _opts )

Adds options to our CLI application.

Parameters
_appCLI application
_optsStruct with the available options

Definition at line 259 of file emulate_from_tpstream.cxx.

260{
261 _app.add_option("-i,--input-files", _opts.input_files, "List of input files (required)")
262 ->required()
263 ->check(CLI::ExistingFile); // Validate that each file exists
264
265 _app.add_option("-o,--output-file", _opts.output_filename, "Output file (required)")
266 ->required(); // make the argument required
267
268 _app.add_option("-j,--json-config", _opts.config_name, "Trigger Activity and Candidate config JSON to use (required)")
269 ->required()
270 ->check(CLI::ExistingFile);
271
272 _app.add_flag("--parallel", _opts.run_parallel, "Run the TAMakers in parallel");
273
274 _app.add_flag("--quiet", _opts.quiet, "Quiet outputs.");
275
276 _app.add_flag("--latencies", _opts.latencies, "Saves latencies per TP into csv");
277
278 _app.add_option("-s,--slice-start", _opts.slice_start, "Inclusive lower bound for TimeSlice ID to process");
279 _app.add_option("-n,--num-slices", _opts.num_slices, "Number of TimeSlices to process");
280}
std::string output_filename
output filename
std::optional< uint64_t > num_slices
optional number of TimeSlices to process
std::string config_name
the configuration filename
std::vector< std::string > input_files
vector of input filenames
bool quiet
do we want to quiet down the cout? Default: no
bool run_parallel
runs each TAMaker on a separate thread
std::optional< uint64_t > slice_start
optional lower bound (inclusive) for TimeSlice IDs to process
bool latencies
do we want to measure latencies? Default: no

◆ save_fragments()

void save_fragments ( const std::string & _outputfilename,
const hdf5libs::HDF5SourceIDHandler::source_id_geo_id_map_t & _sourceid_geoid_map,
std::map< uint64_t, std::vector< std::unique_ptr< daqdataformats::Fragment > > > _frags,
bool _quiet )

Saves fragments in timeslice format into output HDF5 file.

Parameters
_outputfilenameName of the hdf5 file to save the output into
_sourceid_geoid_mapsourceid–geoid map required to create a HDF5 file
_fragsMap of fragments, with a vector of fragment pointers for each slice id
_quietDo we want to quiet down the cout?
Todo
Maybe in the future we will want to emulate PDS TPs.

Definition at line 32 of file emulate_from_tpstream.cxx.

36{
37 std::string output_filename = _outputfilename;
38 if (output_filename.size() < 5 ||
39 output_filename.compare(output_filename.size() - 5, 5, ".hdf5") != 0) {
40 output_filename += ".hdf5";
41 }
42
43 // Create layout parameter object required for HDF5 creation
44 hdf5libs::HDF5FileLayoutParameters layout_params;
45
46 // Create HDF5 parameter path required for the layout
47 hdf5libs::HDF5PathParameters params_trigger;
48 params_trigger.detector_group_type = "Detector_Readout";
50 params_trigger.detector_group_name = "TPC";
51 params_trigger.element_name_prefix = "Link";
52 params_trigger.digits_for_element_number = 5;
53
54 // Fill the HDF5 layout
55 std::vector<hdf5libs::HDF5PathParameters> params;
56 params.push_back(params_trigger);
57 layout_params.record_name_prefix = "TimeSlice";
58 layout_params.digits_for_record_number = 6;
59 layout_params.digits_for_sequence_number = 0;
60 layout_params.record_header_dataset_name = "TimeSliceHeader";
61 layout_params.raw_data_group_name = "RawData";
62 layout_params.view_group_name = "Views";
63 layout_params.path_params_list = {params};
64
65 // Create pointer to a new output HDF5 file
66 std::unique_ptr<hdf5libs::HDF5RawDataFile> output_file = std::make_unique<hdf5libs::HDF5RawDataFile>(
67 output_filename,
68 _frags.begin()->second[0]->get_run_number(),
69 0,
70 "emulate_from_tpstream",
71 layout_params,
72 _sourceid_geoid_map);
73
74 // Iterate over the time slices & save all the fragments
75 for (auto& [slice_id, vec_frags]: _frags) {
76 // Create a new timeslice header
77 daqdataformats::TimeSliceHeader tsh;
78 tsh.timeslice_number = slice_id;
79 tsh.run_number = vec_frags[0]->get_run_number();
81
82 // Create a new timeslice
84 if (!_quiet) {
85 std::cout << "Time slice number: " << slice_id << std::endl;
86 }
87 // Add the fragments to the timeslices
88 for (std::unique_ptr<daqdataformats::Fragment>& frag_ptr: vec_frags) {
89 if (!_quiet) {
90 std::cout << " Writing elementid: " << frag_ptr->get_element_id() << " trigger number: " << frag_ptr->get_trigger_number() << " trigger_timestamp: " << frag_ptr->get_trigger_timestamp() << " window_begin: " << frag_ptr->get_window_begin() << " sequence_no: " << frag_ptr->get_sequence_number() << std::endl;
91 }
92 ts.add_fragment(std::move(frag_ptr));
93 }
94
95 // Write the timeslice to output file
96 output_file->write(ts);
97 }
98}
C++ Representation of a DUNE TimeSlice, consisting of a TimeSliceHeader object and a vector of pointe...
Definition TimeSlice.hpp:27
SourceID is a generalized representation of the source of a piece of data in the DAQ....
Definition SourceID.hpp:32

◆ sort_files_per_writer()

std::map< std::string, std::vector< std::shared_ptr< hdf5libs::HDF5RawDataFile > > > sort_files_per_writer ( const std::vector< std::string > & _files)

Group and order input TimeSlice files per TPStream writer application.

Opens each input file, validates that it is of TimeSlice type, and groups files by their application_name attribute. Within each application group, files are sorted by their file_index attribute so consecutive segments from the same writer are processed in order.

Parameters
_filesVector of input HDF5 file paths.
Returns
Map keyed by application_name, each value a vector of file handles ordered by file_index.
Exceptions
std::runtime_errorIf any input file is not of type TimeSlice.

Definition at line 114 of file emulate_from_tpstream.cxx.

115{
116 std::map<std::string, std::vector<std::shared_ptr<hdf5libs::HDF5RawDataFile>>> files_sorted;
117
118 // Put files into per-writer app groups
119 for (const std::string& file : _files) {
120 std::shared_ptr<hdf5libs::HDF5RawDataFile> fl = std::make_shared<hdf5libs::HDF5RawDataFile>(file);
121 if (!fl->is_timeslice_type()) {
122 throw std::runtime_error(fmt::format("ERROR: input file '{}' not of type 'TimeSlice'", file));
123 }
124
125 std::string application_name = fl->get_attribute<std::string>("application_name");
126 files_sorted[application_name].push_back(fl);
127 }
128
129 // Sort files for each writer application individually
130 for (auto& [app_name, vec_files]: files_sorted) {
131 // Don't sort if we have 0 or 1 files in the application...
132 if (vec_files.size() <= 1) {
133 continue;
134 }
135
136 // Sort w.r.t. file index attribute
137 std::sort(vec_files.begin(), vec_files.end(),
138 [](const std::shared_ptr<hdf5libs::HDF5RawDataFile>& a, const std::shared_ptr<hdf5libs::HDF5RawDataFile>& b) {
139 return a->get_attribute<size_t>("file_index") <
140 b->get_attribute<size_t>("file_index");
141 });
142 }
143
144 return files_sorted;
145};