DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
emulate_from_tpstream.cxx
Go to the documentation of this file.
3
4#include "CLI/App.hpp"
5#include "CLI/Config.hpp"
6#include "CLI/Formatter.hpp"
7
8#include <fmt/core.h>
9#include <fmt/format.h>
10#include <fmt/chrono.h>
11#include <filesystem>
12#include <optional>
13
16
18
19//-----------------------------------------------------------------------------
20
21using namespace dunedaq;
22using namespace trgtools;
23
32void save_fragments(const std::string& _outputfilename,
34 std::map<uint64_t, std::vector<std::unique_ptr<daqdataformats::Fragment>>> _frags,
35 bool _quiet)
36{
37 std::string output_filename = _outputfilename;
38 if (output_filename.size() < 5 ||
39 output_filename.compare(output_filename.size() - 5, 5, ".hdf5") != 0) {
40 output_filename += ".hdf5";
41 }
42
43 // Create layout parameter object required for HDF5 creation
45
46 // Create HDF5 parameter path required for the layout
47 hdf5libs::HDF5PathParameters params_trigger;
48 params_trigger.detector_group_type = "Detector_Readout";
50 params_trigger.detector_group_name = "TPC";
51 params_trigger.element_name_prefix = "Link";
52 params_trigger.digits_for_element_number = 5;
53
54 // Fill the HDF5 layout
55 std::vector<hdf5libs::HDF5PathParameters> params;
56 params.push_back(params_trigger);
57 layout_params.record_name_prefix = "TimeSlice";
58 layout_params.digits_for_record_number = 6;
59 layout_params.digits_for_sequence_number = 0;
60 layout_params.record_header_dataset_name = "TimeSliceHeader";
61 layout_params.raw_data_group_name = "RawData";
62 layout_params.view_group_name = "Views";
63 layout_params.path_params_list = {params};
64
65 // Create pointer to a new output HDF5 file
66 std::unique_ptr<hdf5libs::HDF5RawDataFile> output_file = std::make_unique<hdf5libs::HDF5RawDataFile>(
67 output_filename,
68 _frags.begin()->second[0]->get_run_number(),
69 0,
70 "emulate_from_tpstream",
71 layout_params,
72 _sourceid_geoid_map);
73
74 // Iterate over the time slices & save all the fragments
75 for (auto& [slice_id, vec_frags]: _frags) {
76 // Create a new timeslice header
78 tsh.timeslice_number = slice_id;
79 tsh.run_number = vec_frags[0]->get_run_number();
81
82 // Create a new timeslice
84 if (!_quiet) {
85 std::cout << "Time slice number: " << slice_id << std::endl;
86 }
87 // Add the fragments to the timeslices
88 for (std::unique_ptr<daqdataformats::Fragment>& frag_ptr: vec_frags) {
89 if (!_quiet) {
90 std::cout << " Writing elementid: " << frag_ptr->get_element_id() << " trigger number: " << frag_ptr->get_trigger_number() << " trigger_timestamp: " << frag_ptr->get_trigger_timestamp() << " window_begin: " << frag_ptr->get_window_begin() << " sequence_no: " << frag_ptr->get_sequence_number() << std::endl;
91 }
92 ts.add_fragment(std::move(frag_ptr));
93 }
94
95 // Write the timeslice to output file
96 output_file->write(ts);
97 }
98}
99
113std::map<std::string, std::vector<std::shared_ptr<hdf5libs::HDF5RawDataFile>>>
114sort_files_per_writer(const std::vector<std::string>& _files)
115{
116 std::map<std::string, std::vector<std::shared_ptr<hdf5libs::HDF5RawDataFile>>> files_sorted;
117
118 // Put files into per-writer app groups
119 for (const std::string& file : _files) {
120 std::shared_ptr<hdf5libs::HDF5RawDataFile> fl = std::make_shared<hdf5libs::HDF5RawDataFile>(file);
121 if (!fl->is_timeslice_type()) {
122 throw std::runtime_error(fmt::format("ERROR: input file '{}' not of type 'TimeSlice'", file));
123 }
124
125 std::string application_name = fl->get_attribute<std::string>("application_name");
126 files_sorted[application_name].push_back(fl);
127 }
128
129 // Sort files for each writer application individually
130 for (auto& [app_name, vec_files]: files_sorted) {
131 // Don't sort if we have 0 or 1 files in the application...
132 if (vec_files.size() <= 1) {
133 continue;
134 }
135
136 // Sort w.r.t. file index attribute
137 std::sort(vec_files.begin(), vec_files.end(),
138 [](const std::shared_ptr<hdf5libs::HDF5RawDataFile>& a, const std::shared_ptr<hdf5libs::HDF5RawDataFile>& b) {
139 return a->get_attribute<size_t>("file_index") <
140 b->get_attribute<size_t>("file_index");
141 });
142 }
143
144 return files_sorted;
145};
146
167std::pair<uint64_t, uint64_t>
168get_available_slice_id_range(const std::map<std::string, std::vector<std::shared_ptr<hdf5libs::HDF5RawDataFile>>>& _files,
169 bool _quiet)
170{
171 if (_files.empty()) {
172 throw std::runtime_error("No files provided");
173 }
174
175 uint64_t global_start = std::numeric_limits<uint64_t>::min();
176 uint64_t global_end = std::numeric_limits<uint64_t>::max();
177
178 // Get the min & max record id per application, and the global
179 for (auto& [appid, vec_files]: _files) {
180 uint64_t app_start = std::numeric_limits<uint64_t>::max();
181 uint64_t app_end = std::numeric_limits<uint64_t>::min();
182
183 // Find min / max record id for this application
184 for (auto& file : vec_files) {
185 auto record_ids = file->get_all_record_ids();
186 if (record_ids.empty()) {
187 throw std::runtime_error(fmt::format("File from application {} contains no records.", appid));
188 }
189 app_start = std::min(app_start, record_ids.begin()->first);
190 app_end = std::max(app_end, record_ids.rbegin()->first);
191 }
192
193 // Update the global min / max record id
194 global_start = std::max(global_start, app_start);
195 global_end = std::min(global_end, app_end);
196
197 if (!_quiet) {
198 std::cout << "Application: " << appid << " " << " TimeSliceID start: " << app_start << " end: " << app_end << std::endl;
199 }
200 }
201
202 if (!_quiet) {
203 std::cout << "Global start: " << global_start << " Global end: " << global_end << std::endl;
204 }
205 if (global_start > global_end) {
206 throw std::runtime_error("One of the provided files' id range did not overlap with the rest. Please select files with overlapping TimeSlice IDs");
207 }
208
209 // Extra validation / error handling
210 for (auto& [appid, vec_files]: _files) {
211 for (auto& file : vec_files) {
212 auto record_ids = file->get_all_record_ids();
213
214 uint64_t file_start = record_ids.begin()->first;
215 uint64_t file_end = record_ids.rbegin()->first;
216 if ((file_start > global_end || file_end < global_start)) {
217 uint64_t file_index = file->get_attribute<size_t>("file_index");
218 throw std::runtime_error(fmt::format(
219 "File from TPStreamWrite application '{}' (index '{}') has record id range [{}, {}], which does not overlap with global range [{}, {}].",
220 appid, file_index, file_start, file_end, global_start, global_end
221 ));
222 }
223 }
224 }
225
226 return {global_start, global_end};
227}
228
233{
235 std::vector<std::string> input_files;
237 std::string output_filename;
239 std::string config_name;
241 bool quiet = false;
244 bool latencies = false;
246 bool run_parallel = false;
248 std::optional<uint64_t> slice_start = std::nullopt;
250 std::optional<uint64_t> num_slices = std::nullopt;
251};
252
259void parse_app(CLI::App& _app, Options& _opts)
260{
261 _app.add_option("-i,--input-files", _opts.input_files, "List of input files (required)")
262 ->required()
263 ->check(CLI::ExistingFile); // Validate that each file exists
264
265 _app.add_option("-o,--output-file", _opts.output_filename, "Output file (required)")
266 ->required(); // make the argument required
267
268 _app.add_option("-j,--json-config", _opts.config_name, "Trigger Activity and Candidate config JSON to use (required)")
269 ->required()
270 ->check(CLI::ExistingFile);
271
272 _app.add_flag("--parallel", _opts.run_parallel, "Run the TAMakers in parallel");
273
274 _app.add_flag("--quiet", _opts.quiet, "Quiet outputs.");
275
276 _app.add_flag("--latencies", _opts.latencies, "Saves latencies per TP into csv");
277
278 _app.add_option("-s,--slice-start", _opts.slice_start, "Inclusive lower bound for TimeSlice ID to process");
279 _app.add_option("-n,--num-slices", _opts.num_slices, "Number of TimeSlices to process");
280}
281
282int main(int argc, char const *argv[])
283{
284 // Do all the CLI processing first
285 CLI::App app{"Offline trigger TriggerActivity & TriggerCandidate emulatior"};
286 Options opts{};
287
288 parse_app(app, opts);
289
290 try {
291 app.parse(argc, argv);
292 }
293 catch (const CLI::ParseError &e) {
294 return app.exit(e);
295 }
296
297 // Get the configuration file
298 std::ifstream config_stream(opts.config_name);
299 nlohmann::json config = nlohmann::json::parse(config_stream);
300
301 if (!opts.quiet) {
302 std::cout << "Files to process:\n";
303 for (const std::string& file : opts.input_files) {
304 std::cout << "- " << file << "\n";
305 }
306 }
307
308 // Sort the files into a map writer_id::vector<HDF5>
309 std::map<std::string, std::vector<std::shared_ptr<hdf5libs::HDF5RawDataFile>>> sorted_files =
310 sort_files_per_writer(opts.input_files);
311
312 // Get the available record_id range
313 std::pair<uint64_t, uint64_t> recordid_range = get_available_slice_id_range(sorted_files, opts.quiet);
314 std::pair<uint64_t, uint64_t> processing_range = recordid_range;
315
316 if (opts.slice_start.has_value()) {
317 processing_range.first = std::max(processing_range.first, opts.slice_start.value());
318 }
319
320 if (opts.num_slices.has_value()) {
321 if (opts.num_slices.value() == 0) {
322 throw std::runtime_error("Invalid --num-slices value: must be greater than 0");
323 }
324
325 uint64_t requested_end = processing_range.first + (opts.num_slices.value() - 1);
326 if (requested_end < processing_range.first) {
327 requested_end = std::numeric_limits<uint64_t>::max();
328 }
329
330 processing_range.second = std::min(processing_range.second, requested_end);
331 }
332
333 if (processing_range.first > processing_range.second) {
334 throw std::runtime_error(fmt::format(
335 "Requested timeslice subset (slice-start={}, num-slices={}) does not overlap available range [{}, {}]",
336 opts.slice_start.value_or(0),
337 opts.num_slices.value_or(0),
338 recordid_range.first,
339 recordid_range.second));
340 }
341
342 if (!opts.quiet) {
343 std::cout << "Processing TimeSliceID range: [" << processing_range.first << ", " << processing_range.second << "]" << std::endl;
344 }
345
346 // Create the file handlers
347 std::vector<std::unique_ptr<TAEmulationWorker>> ta_emu_workers;
348 for (auto [name, files] : sorted_files) {
349 ta_emu_workers.push_back(std::make_unique<TAEmulationWorker>(files, config, processing_range, opts.run_parallel, opts.quiet));
350 }
351
352 // Start each file handler
353 for (const auto& handler : ta_emu_workers) {
354 handler->start_processing();
355 }
356
357 // Output map of TA vectors & function that appends TAs to that vector
358 std::map<uint64_t, std::vector<triggeralgs::TriggerActivity>> tas;
359 auto append_tas = [&tas](std::map<uint64_t, std::vector<triggeralgs::TriggerActivity>>&& _tas) {
360 for (auto& [sliceid, src_vec] : _tas) {
361 auto& dest_vec = tas[sliceid];
362 dest_vec.reserve(dest_vec.size() + src_vec.size());
363
364 dest_vec.insert(dest_vec.end(), std::make_move_iterator(src_vec.begin()), std::make_move_iterator(src_vec.end()));
365 src_vec.clear();
366 }
367 };
368
369 // Output map of Fragment vectors & function that appends TAs to that vector
370 std::map<uint64_t, std::vector<std::unique_ptr<daqdataformats::Fragment>>> frags;
371 auto append_frags = [&frags](std::map<uint64_t, std::vector<std::unique_ptr<daqdataformats::Fragment>>>&& _frags) {
372 for (auto& [sliceid, src_vec] : _frags) {
373 auto& dest_vec = frags[sliceid];
374
375 dest_vec.reserve(dest_vec.size() + src_vec.size());
376
377 dest_vec.insert(dest_vec.end(), std::make_move_iterator(src_vec.begin()), std::make_move_iterator(src_vec.end()));
378 src_vec.clear();
379 }
380 };
381
382 // Iterate over the handlers, wait for them to complete their job & append
383 // their TAs to our vector when ready.
385 for (const auto& handler : ta_emu_workers) {
386 // Wait for all TAs to be made
387 handler->wait_to_complete_work();
388
389 // Append output TA/fragments
390 append_tas(std::move(handler->get_tas()));
391 append_frags(std::move(handler->get_frags()));
392
393 // Get and merge the source ID map
394 hdf5libs::HDF5SourceIDHandler::source_id_geo_id_map_t map = handler->get_sourceid_geoid_map();
395 sourceid_geoid_map.insert(map.begin(), map.end());
396 }
397
398 // Sort the TAs in each slice before pushing them into the TCMaker
399 size_t n_tas = 0;
400 for (auto& [sliceid, vec_tas]: tas) {
401 std::sort(vec_tas.begin(), vec_tas.end(),
403 return std::tie(a.time_start, a.channel_start, a.time_end) <
404 std::tie(b.time_start, b.channel_start, b.time_end);
405 });
406 n_tas += vec_tas.size();
407 }
408
409 if (!opts.quiet) {
410 std::cout << "Total number of TAs made: " << n_tas << std::endl;
411 std::cout << "Creating a TCMaker..." << std::endl;
412 }
413 // Create the TC emulator
414 std::string algo_name = config["trigger_candidate_plugin"][0];
415 nlohmann::json algo_config = config["trigger_candidate_config"][0];
416
417 std::unique_ptr<triggeralgs::TriggerCandidateMaker> tc_maker =
419 tc_maker->configure(algo_config);
420
421 trgtools::TCEmulationUnit tc_emulator;
422 tc_emulator.set_maker(tc_maker);
423
424 // Emulate the TriggerCandidates
425 std::vector<triggeralgs::TriggerCandidate> tcs;
426 for (auto& [sliceid, vec_tas]: tas) {
427 std::unique_ptr<daqdataformats::Fragment> tc_frag = tc_emulator.emulate_vector(vec_tas);
428 if (!tc_frag) {
429 continue;
430 }
431
432 // Manipulate the fragment header
433 daqdataformats::FragmentHeader frag_hdr = tc_frag->get_header();
434 frag_hdr.element_id = daqdataformats::SourceID{daqdataformats::SourceID::Subsystem::kTrigger, tc_frag->get_element_id().id+10000};
435
436 tc_frag->set_header_fields(frag_hdr);
437 tc_frag->set_type(daqdataformats::FragmentType::kTriggerCandidate);
438 tc_frag->set_trigger_number(sliceid);
439 tc_frag->set_window_begin(frags[sliceid][0]->get_window_begin());
440 tc_frag->set_window_end(frags[sliceid][0]->get_window_end());
441
442 // Push the fragment to our list
443 frags[sliceid].push_back(std::move(tc_frag));
444
445 std::vector<triggeralgs::TriggerCandidate> tmp_tcs = tc_emulator.get_last_output_buffer();
446 tcs.reserve(tcs.size() + tmp_tcs.size());
447 tcs.insert(tcs.end(), std::make_move_iterator(tmp_tcs.begin()), std::make_move_iterator(tmp_tcs.end()));
448 }
449 if (!opts.quiet) {
450 std::cout << "Total number of TCs made: " << tcs.size() << std::endl;
451 }
452
453 if (!frags.empty()) {
454 save_fragments(opts.output_filename, sourceid_geoid_map, std::move(frags), opts.quiet);
455 } else {
456 std::cout << "No TA/TC fragments generated. Output file will not be generated" << std::endl;
457 }
458
459 return 0;
460}
C++ Representation of a DUNE TimeSlice, consisting of a TimeSliceHeader object and a vector of pointe...
Definition TimeSlice.hpp:27
void add_fragment(std::unique_ptr< Fragment > &&fragment)
Add a Fragment pointer to the Fragments vector.
Definition TimeSlice.hpp:74
std::map< daqdataformats::SourceID, std::vector< uint64_t > > source_id_geo_id_map_t
std::unique_ptr< daqdataformats::Fragment > emulate_vector(const std::vector< input_t > &inputs)
std::vector< output_t > get_last_output_buffer()
void set_maker(std::unique_ptr< maker_t > &maker)
static std::shared_ptr< AbstractFactory< TriggerCandidateMaker > > get_instance()
std::pair< uint64_t, uint64_t > get_available_slice_id_range(const std::map< std::string, std::vector< std::shared_ptr< hdf5libs::HDF5RawDataFile > > > &_files, bool _quiet)
Compute the common SliceID interval shared by all writer groups.
std::map< std::string, std::vector< std::shared_ptr< hdf5libs::HDF5RawDataFile > > > sort_files_per_writer(const std::vector< std::string > &_files)
Group and order input TimeSlice files per TPStream writer application.
void parse_app(CLI::App &_app, Options &_opts)
Adds options to our CLI application.
void save_fragments(const std::string &_outputfilename, const hdf5libs::HDF5SourceIDHandler::source_id_geo_id_map_t &_sourceid_geoid_map, std::map< uint64_t, std::vector< std::unique_ptr< daqdataformats::Fragment > > > _frags, bool _quiet)
Saves fragments in timeslice format into output HDF5 file.
int main(int argc, char const *argv[])
Including Qt Headers.
Struct with available cli application options.
std::string output_filename
output filename
std::optional< uint64_t > num_slices
optional number of TimeSlices to process
std::string config_name
the configuration filename
std::vector< std::string > input_files
vector of input filenames
bool quiet
do we want to quiet down the cout? Default: no
bool run_parallel
runs each TAMaker on a separate thread
std::optional< uint64_t > slice_start
optional lower bound (inclusive) for TimeSlice IDs to process
bool latencies
do we want to measure latencies? Default: no
The header for a DUNE Fragment.
SourceID element_id
Component that generated the data in this Fragment.
SourceID is a generalized representation of the source of a piece of data in the DAQ....
Definition SourceID.hpp:32
Additional data fields associated with a TimeSliceHeader.
run_number_t run_number
Run Number for the TimeSlice.
timeslice_number_t timeslice_number
Slice number of this TimeSlice within the stream.