DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
emulate_from_tpstream.cxx File Reference
#include "trgtools/EmulateTCUnit.hpp"
#include "trgtools/TAFileHandler.hpp"
#include "CLI/App.hpp"
#include "CLI/Config.hpp"
#include "CLI/Formatter.hpp"
#include <fmt/core.h>
#include <fmt/format.h>
#include <fmt/chrono.h>
#include <filesystem>
#include "hdf5libs/HDF5RawDataFile.hpp"
#include "hdf5libs/HDF5SourceIDHandler.hpp"
#include "triggeralgs/TriggerCandidateFactory.hpp"
Include dependency graph for emulate_from_tpstream.cxx:

Go to the source code of this file.

Classes

struct  Options
 Struct with available cli application options. More...
 

Functions

void SaveFragments (const std::string &_outputfilename, const hdf5libs::HDF5SourceIDHandler::source_id_geo_id_map_t &_sourceid_geoid_map, std::map< uint64_t, std::vector< std::unique_ptr< daqdataformats::Fragment > > > _frags, bool _quiet)
 Saves fragments in timeslice format into output HDF5 file.
 
std::map< std::string, std::vector< std::shared_ptr< hdf5libs::HDF5RawDataFile > > > SortFilesPerWriter (const std::vector< std::string > &_files)
 Returns sorted map of HDF5 files per datawriter application.
 
std::pair< uint64_t, uint64_t > GetAvailableSliceIDRange (const std::map< std::string, std::vector< std::shared_ptr< hdf5libs::HDF5RawDataFile > > > &_files, bool _quiet)
 Retrieves the available slice ID range.
 
void ParseApp (CLI::App &_app, Options &_opts)
 Adds options to our CLI application.
 
int main (int argc, char const *argv[])
 

Function Documentation

◆ GetAvailableSliceIDRange()

std::pair< uint64_t, uint64_t > GetAvailableSliceIDRange ( const std::map< std::string, std::vector< std::shared_ptr< hdf5libs::HDF5RawDataFile > > > & _files,
bool _quiet )

Retrieves the available slice ID range.

Finds the overlap in the slice ID range between the provided files, and returns that overlap as an available range – or crashes if there is a file with a range that does not overlap.

Todo
: Rather than returning the sliceID range, should try to return a time range – and have processors go off that.
Parameters
_filesa map of writer app names & vectors of HDF5 files from that application.
_quietDo we want to quiet down the cout?

Definition at line 145 of file emulate_from_tpstream.cxx.

147{
148 if (_files.empty()) {
149 throw std::runtime_error("No files provided");
150 }
151
152 uint64_t global_start = std::numeric_limits<uint64_t>::min();
153 uint64_t global_end = std::numeric_limits<uint64_t>::max();
154
155 // Get the min & max record id per application, and the global
156 for (auto& [appid, vec_files]: _files) {
157 uint64_t app_start = std::numeric_limits<uint64_t>::max();
158 uint64_t app_end = std::numeric_limits<uint64_t>::min();
159
160 // Find min / max record id for this application
161 for (auto& file : vec_files) {
162 auto record_ids = file->get_all_record_ids();
163 if (record_ids.empty()) {
164 throw std::runtime_error(fmt::format("File from application {} contains no records.", appid));
165 }
166 app_start = std::min(app_start, record_ids.begin()->first);
167 app_end = std::max(app_end, record_ids.rbegin()->first);
168 }
169
170 // Update the global min / max record id
171 global_start = std::max(global_start, app_start);
172 global_end = std::min(global_end, app_end);
173
174 if (!_quiet) {
175 std::cout << "Application: " << appid << " " << " TimeSliceID start: " << app_start << " end: " << app_end << std::endl;
176 }
177 }
178
179 if (!_quiet) {
180 std::cout << "Global start: " << global_start << " Global end: " << global_end << std::endl;
181 }
182 if (global_start > global_end) {
183 throw std::runtime_error("One of the provided files' id range did not overlap with the rest. Please select files with overlapping TimeSlice IDs");
184 }
185
186 // Extra validation / error handling
187 for (auto& [appid, vec_files]: _files) {
188 for (auto& file : vec_files) {
189 auto record_ids = file->get_all_record_ids();
190
191 uint64_t file_start = record_ids.begin()->first;
192 uint64_t file_end = record_ids.rbegin()->first;
193 if ((file_start > global_end || file_end < global_start)) {
194 uint64_t file_index = file->get_attribute<size_t>("file_index");
195 throw std::runtime_error(fmt::format(
196 "File from TPStreamWrite application '{}' (index '{}') has record id range [{}, {}], which does not overlap with global range [{}, {}].",
197 appid, file_index, file_start, file_end, global_start, global_end
198 ));
199 }
200 }
201 }
202
203 return {global_start, global_end};
204}

◆ main()

int main ( int argc,
char const * argv[] )

Definition at line 252 of file emulate_from_tpstream.cxx.

253{
254 // Do all the CLI processing first
255 CLI::App app{"Offline trigger TriggerActivity & TriggerCandidate emulatior"};
256 Options opts{};
257
258 ParseApp(app, opts);
259
260 try {
261 app.parse(argc, argv);
262 }
263 catch (const CLI::ParseError &e) {
264 return app.exit(e);
265 }
266
267 // Get the configuration file
268 std::ifstream config_stream(opts.config_name);
269 nlohmann::json config = nlohmann::json::parse(config_stream);
270
271 if (!opts.quiet) {
272 std::cout << "Files to process:\n";
273 for (const std::string& file : opts.input_files) {
274 std::cout << "- " << file << "\n";
275 }
276 }
277
278 // Sort the files into a map writer_id::vector<HDF5>
279 std::map<std::string, std::vector<std::shared_ptr<hdf5libs::HDF5RawDataFile>>> sorted_files =
280 SortFilesPerWriter(opts.input_files);
281
282 // Get the available record_id range
283 std::pair<uint64_t, uint64_t> recordid_range = GetAvailableSliceIDRange(sorted_files, opts.quiet);
284
285 // Create the file handlers
286 std::vector<std::unique_ptr<TAFileHandler>> file_handlers;
287 for (auto [name, files] : sorted_files) {
288 file_handlers.push_back(std::make_unique<TAFileHandler>(files, config, recordid_range, opts.run_parallel, opts.quiet));
289 }
290
291 // Start each file handler
292 for (const auto& handler : file_handlers) {
293 handler->start_processing();
294 }
295
296 // Output map of TA vectors & function that appends TAs to that vector
297 std::map<uint64_t, std::vector<triggeralgs::TriggerActivity>> tas;
298 auto append_tas = [&tas](std::map<uint64_t, std::vector<triggeralgs::TriggerActivity>>&& _tas) {
299 for (auto& [sliceid, src_vec] : _tas) {
300 auto& dest_vec = tas[sliceid];
301 dest_vec.reserve(dest_vec.size() + src_vec.size());
302
303 dest_vec.insert(dest_vec.end(), std::make_move_iterator(src_vec.begin()), std::make_move_iterator(src_vec.end()));
304 src_vec.clear();
305 }
306 };
307
308 // Output map of Fragment vectors & function that appends TAs to that vector
309 std::map<uint64_t, std::vector<std::unique_ptr<daqdataformats::Fragment>>> frags;
310 auto append_frags = [&frags](std::map<uint64_t, std::vector<std::unique_ptr<daqdataformats::Fragment>>>&& _frags) {
311 for (auto& [sliceid, src_vec] : _frags) {
312 auto& dest_vec = frags[sliceid];
313
314 dest_vec.reserve(dest_vec.size() + src_vec.size());
315
316 dest_vec.insert(dest_vec.end(), std::make_move_iterator(src_vec.begin()), std::make_move_iterator(src_vec.end()));
317 src_vec.clear();
318 }
319 };
320
321 // Iterate over the handlers, wait for them to complete their job & append
322 // their TAs to our vector when ready.
324 for (const auto& handler : file_handlers) {
325 // Wait for all TAs to be made
326 handler->wait_to_complete_work();
327
328 // Append output TA/fragments
329 append_tas(std::move(handler->get_tas()));
330 append_frags(std::move(handler->get_frags()));
331
332 // Get and merge the source ID map
333 hdf5libs::HDF5SourceIDHandler::source_id_geo_id_map_t map = handler->get_sourceid_geoid_map();
334 sourceid_geoid_map.insert(map.begin(), map.end());
335 }
336
337 // Sort the TAs in each slice before pushing them into the TCMaker
338 size_t n_tas = 0;
339 for (auto& [sliceid, vec_tas]: tas) {
340 std::sort(vec_tas.begin(), vec_tas.end(),
342 return std::tie(a.time_start, a.channel_start, a.time_end) <
343 std::tie(b.time_start, b.channel_start, b.time_end);
344 });
345 n_tas += vec_tas.size();
346 }
347 if (!opts.quiet) {
348 std::cout << "Total number of TAs made: " << n_tas << std::endl;
349 std::cout << "Creating a TCMaker..." << std::endl;
350 }
351 // Create the TC emulator
352 std::string algo_name = config["trigger_candidate_plugin"][0];
353 nlohmann::json algo_config = config["trigger_candidate_config"][0];
354
355 std::unique_ptr<triggeralgs::TriggerCandidateMaker> tc_maker =
357 tc_maker->configure(algo_config);
358
359 trgtools::EmulateTCUnit tc_emulator;
360 tc_emulator.set_maker(tc_maker);
361
362 // Emulate the TriggerCandidates
363 std::vector<triggeralgs::TriggerCandidate> tcs;
364 for (auto& [sliceid, vec_tas]: tas) {
365 std::unique_ptr<daqdataformats::Fragment> tc_frag = tc_emulator.emulate_vector(vec_tas);
366 if (!tc_frag) {
367 continue;
368 }
369
370 // Manipulate the fragment header
371 daqdataformats::FragmentHeader frag_hdr = tc_frag->get_header();
372 frag_hdr.element_id = daqdataformats::SourceID{daqdataformats::SourceID::Subsystem::kTrigger, tc_frag->get_element_id().id+10000};
373
374 tc_frag->set_header_fields(frag_hdr);
375 tc_frag->set_type(daqdataformats::FragmentType::kTriggerCandidate);
376 tc_frag->set_trigger_number(sliceid);
377 tc_frag->set_window_begin(frags[sliceid][0]->get_window_begin());
378 tc_frag->set_window_end(frags[sliceid][0]->get_window_end());
379
380 // Push the fragment to our list
381 frags[sliceid].push_back(std::move(tc_frag));
382
383 std::vector<triggeralgs::TriggerCandidate> tmp_tcs = tc_emulator.get_last_output_buffer();
384 tcs.reserve(tcs.size() + tmp_tcs.size());
385 tcs.insert(tcs.end(), std::make_move_iterator(tmp_tcs.begin()), std::make_move_iterator(tmp_tcs.end()));
386 }
387 if (!opts.quiet) {
388 std::cout << "Total number of TCs made: " << tcs.size() << std::endl;
389 }
390
391 SaveFragments(opts.output_filename, sourceid_geoid_map, std::move(frags), opts.quiet);
392
393 return 0;
394}
std::map< daqdataformats::SourceID, std::vector< uint64_t > > source_id_geo_id_map_t
static std::shared_ptr< AbstractFactory< TriggerCandidateMaker > > get_instance()
void SaveFragments(const std::string &_outputfilename, const hdf5libs::HDF5SourceIDHandler::source_id_geo_id_map_t &_sourceid_geoid_map, std::map< uint64_t, std::vector< std::unique_ptr< daqdataformats::Fragment > > > _frags, bool _quiet)
Saves fragments in timeslice format into output HDF5 file.
std::map< std::string, std::vector< std::shared_ptr< hdf5libs::HDF5RawDataFile > > > SortFilesPerWriter(const std::vector< std::string > &_files)
Returns sorted map of HDF5 files per datawriter application.
std::pair< uint64_t, uint64_t > GetAvailableSliceIDRange(const std::map< std::string, std::vector< std::shared_ptr< hdf5libs::HDF5RawDataFile > > > &_files, bool _quiet)
Retrieves the available slice ID range.
void ParseApp(CLI::App &_app, Options &_opts)
Adds options to our CLI application.
Struct with available cli application options.

◆ ParseApp()

void ParseApp ( CLI::App & _app,
Options & _opts )

Adds options to our CLI application.

Parameters
_appCLI application
_optsStruct with the available options

Definition at line 232 of file emulate_from_tpstream.cxx.

233{
234 _app.add_option("-i,--input-files", _opts.input_files, "List of input files (required)")
235 ->required()
236 ->check(CLI::ExistingFile); // Validate that each file exists
237
238 _app.add_option("-o,--output-file", _opts.output_filename, "Output file (required)")
239 ->required(); // make the argument required
240
241 _app.add_option("-j,--json-config", _opts.config_name, "Trigger Activity and Candidate config JSON to use (required)")
242 ->required()
243 ->check(CLI::ExistingFile);
244
245 _app.add_flag("--parallel", _opts.run_parallel, "Run the TAMakers in parallel");
246
247 _app.add_flag("--quiet", _opts.quiet, "Quiet outputs.");
248
249 _app.add_flag("--latencies", _opts.latencies, "Saves latencies per TP into csv");
250}
std::string output_filename
output filename
std::string config_name
the configuration filename
std::vector< std::string > input_files
vector of input filenames
bool quiet
do we want to quiet down the cout? Default: no
bool run_parallel
runs each TAMaker on a separate thread
bool latencies
do we want to measure latencies? Default: no

◆ SaveFragments()

void SaveFragments ( const std::string & _outputfilename,
const hdf5libs::HDF5SourceIDHandler::source_id_geo_id_map_t & _sourceid_geoid_map,
std::map< uint64_t, std::vector< std::unique_ptr< daqdataformats::Fragment > > > _frags,
bool _quiet )

Saves fragments in timeslice format into output HDF5 file.

Parameters
_outputfilenameName of the hdf5 file to save the output into
_sourceid_geoid_mapsourceid–geoid map required to create a HDF5 file
_fragsMap of fragments, with a vector of fragment pointers for each slice id
_quietDo we want to quiet down the cout?
Todo
Maybe in the future we will want to emulate PDS TPs.

Definition at line 31 of file emulate_from_tpstream.cxx.

35{
36 // Create layout parameter object required for HDF5 creation
37 hdf5libs::HDF5FileLayoutParameters layout_params;
38
39 // Create HDF5 parameter path required for the layout
40 hdf5libs::HDF5PathParameters params_trigger;
41 params_trigger.detector_group_type = "Detector_Readout";
43 params_trigger.detector_group_name = "TPC";
44 params_trigger.element_name_prefix = "Link";
45 params_trigger.digits_for_element_number = 5;
46
47 // Fill the HDF5 layout
48 std::vector<hdf5libs::HDF5PathParameters> params;
49 params.push_back(params_trigger);
50 layout_params.record_name_prefix = "TimeSlice";
51 layout_params.digits_for_record_number = 6;
52 layout_params.digits_for_sequence_number = 0;
53 layout_params.record_header_dataset_name = "TimeSliceHeader";
54 layout_params.raw_data_group_name = "RawData";
55 layout_params.view_group_name = "Views";
56 layout_params.path_params_list = {params};
57
58 // Create pointer to a new output HDF5 file
59 std::unique_ptr<hdf5libs::HDF5RawDataFile> output_file = std::make_unique<hdf5libs::HDF5RawDataFile>(
60 _outputfilename + ".hdf5",
61 _frags.begin()->second[0]->get_run_number(),
62 0,
63 "emulate_from_tpstream",
64 layout_params,
65 _sourceid_geoid_map);
66
67 // Iterate over the time slices & save all the fragments
68 for (auto& [slice_id, vec_frags]: _frags) {
69 // Create a new timeslice header
70 daqdataformats::TimeSliceHeader tsh;
71 tsh.timeslice_number = slice_id;
72 tsh.run_number = vec_frags[0]->get_run_number();
74
75 // Create a new timeslice
77 if (!_quiet) {
78 std::cout << "Time slice number: " << slice_id << std::endl;
79 }
80 // Add the fragments to the timeslices
81 for (std::unique_ptr<daqdataformats::Fragment>& frag_ptr: vec_frags) {
82 if (!_quiet) {
83 std::cout << " Writing elementid: " << frag_ptr->get_element_id() << " trigger number: " << frag_ptr->get_trigger_number() << " trigger_timestamp: " << frag_ptr->get_trigger_timestamp() << " window_begin: " << frag_ptr->get_window_begin() << " sequence_no: " << frag_ptr->get_sequence_number() << std::endl;
84 }
85 ts.add_fragment(std::move(frag_ptr));
86 }
87
88 // Write the timeslice to output file
89 output_file->write(ts);
90 }
91}
C++ Representation of a DUNE TimeSlice, consisting of a TimeSliceHeader object and a vector of pointe...
Definition TimeSlice.hpp:27
PDS Frame with unphysical timestamp detected with ts
SourceID is a generalized representation of the source of a piece of data in the DAQ....
Definition SourceID.hpp:32

◆ SortFilesPerWriter()

std::map< std::string, std::vector< std::shared_ptr< hdf5libs::HDF5RawDataFile > > > SortFilesPerWriter ( const std::vector< std::string > & _files)

Returns sorted map of HDF5 files per datawriter application.

Parameters
_filesvector of strings corresponding to the input file paths

Definition at line 99 of file emulate_from_tpstream.cxx.

100{
101 std::map<std::string, std::vector<std::shared_ptr<hdf5libs::HDF5RawDataFile>>> files_sorted;
102
103 // Put files into per-writer app groups
104 for (const std::string& file : _files) {
105 std::shared_ptr<hdf5libs::HDF5RawDataFile> fl = std::make_shared<hdf5libs::HDF5RawDataFile>(file);
106 if (!fl->is_timeslice_type()) {
107 throw std::runtime_error(fmt::format("ERROR: input file '{}' not of type 'TimeSlice'", file));
108 }
109
110 std::string application_name = fl->get_attribute<std::string>("application_name");
111 files_sorted[application_name].push_back(fl);
112 }
113
114 // Sort files for each writer application individually
115 for (auto& [app_name, vec_files]: files_sorted) {
116 // Don't sort if we have 0 or 1 files in the application...
117 if (vec_files.size() <= 1) {
118 continue;
119 }
120
121 // Sort w.r.t. file index attribute
122 std::sort(vec_files.begin(), vec_files.end(),
123 [](const std::shared_ptr<hdf5libs::HDF5RawDataFile>& a, const std::shared_ptr<hdf5libs::HDF5RawDataFile>& b) {
124 return a->get_attribute<size_t>("file_index") <
125 b->get_attribute<size_t>("file_index");
126 });
127 }
128
129 return files_sorted;
130};