Line data Source code
1 : #include "trgtools/TCEmulationUnit.hpp"
2 : #include "trgtools/TAEmulationWorker.hpp"
3 :
4 : #include "CLI/App.hpp"
5 : #include "CLI/Config.hpp"
6 : #include "CLI/Formatter.hpp"
7 :
8 : #include <fmt/core.h>
9 : #include <fmt/format.h>
10 : #include <fmt/chrono.h>
11 : #include <filesystem>
12 : #include <optional>
13 :
14 : #include "hdf5libs/HDF5RawDataFile.hpp"
15 : #include "hdf5libs/HDF5SourceIDHandler.hpp"
16 :
17 : #include "triggeralgs/TriggerCandidateFactory.hpp"
18 :
19 : //-----------------------------------------------------------------------------
20 :
21 : using namespace dunedaq;
22 : using namespace trgtools;
23 :
24 : /**
25 : * @brief Saves fragments in timeslice format into output HDF5 file
26 : *
27 : * @param _outputfilename Name of the hdf5 file to save the output into
28 : * @param _sourceid_geoid_map sourceid--geoid map required to create a HDF5 file
29 : * @param _frags Map of fragments, with a vector of fragment pointers for each slice id
30 : * @param _quiet Do we want to quiet down the cout?
31 : */
32 0 : void save_fragments(const std::string& _outputfilename,
33 : const hdf5libs::HDF5SourceIDHandler::source_id_geo_id_map_t& _sourceid_geoid_map,
34 : std::map<uint64_t, std::vector<std::unique_ptr<daqdataformats::Fragment>>> _frags,
35 : bool _quiet)
36 : {
37 0 : std::string output_filename = _outputfilename;
38 0 : if (output_filename.size() < 5 ||
39 0 : output_filename.compare(output_filename.size() - 5, 5, ".hdf5") != 0) {
40 0 : output_filename += ".hdf5";
41 : }
42 :
43 : // Create layout parameter object required for HDF5 creation
44 0 : hdf5libs::HDF5FileLayoutParameters layout_params;
45 :
46 : // Create HDF5 parameter path required for the layout
47 0 : hdf5libs::HDF5PathParameters params_trigger;
48 0 : params_trigger.detector_group_type = "Detector_Readout";
49 : /// @todo Maybe in the future we will want to emulate PDS TPs.
50 0 : params_trigger.detector_group_name = "TPC";
51 0 : params_trigger.element_name_prefix = "Link";
52 0 : params_trigger.digits_for_element_number = 5;
53 :
54 : // Fill the HDF5 layout
55 0 : std::vector<hdf5libs::HDF5PathParameters> params;
56 0 : params.push_back(params_trigger);
57 0 : layout_params.record_name_prefix = "TimeSlice";
58 0 : layout_params.digits_for_record_number = 6;
59 0 : layout_params.digits_for_sequence_number = 0;
60 0 : layout_params.record_header_dataset_name = "TimeSliceHeader";
61 0 : layout_params.raw_data_group_name = "RawData";
62 0 : layout_params.view_group_name = "Views";
63 0 : layout_params.path_params_list = {params};
64 :
65 : // Create pointer to a new output HDF5 file
66 0 : std::unique_ptr<hdf5libs::HDF5RawDataFile> output_file = std::make_unique<hdf5libs::HDF5RawDataFile>(
67 : output_filename,
68 0 : _frags.begin()->second[0]->get_run_number(),
69 0 : 0,
70 : "emulate_from_tpstream",
71 : layout_params,
72 0 : _sourceid_geoid_map);
73 :
74 : // Iterate over the time slices & save all the fragments
75 0 : for (auto& [slice_id, vec_frags]: _frags) {
76 : // Create a new timeslice header
77 0 : daqdataformats::TimeSliceHeader tsh;
78 0 : tsh.timeslice_number = slice_id;
79 0 : tsh.run_number = vec_frags[0]->get_run_number();
80 0 : tsh.element_id = dunedaq::daqdataformats::SourceID(dunedaq::daqdataformats::SourceID::Subsystem::kTRBuilder, 0);
81 :
82 : // Create a new timeslice
83 0 : dunedaq::daqdataformats::TimeSlice ts(tsh);
84 0 : if (!_quiet) {
85 0 : std::cout << "Time slice number: " << slice_id << std::endl;
86 : }
87 : // Add the fragments to the timeslices
88 0 : for (std::unique_ptr<daqdataformats::Fragment>& frag_ptr: vec_frags) {
89 0 : if (!_quiet) {
90 0 : std::cout << " Writing elementid: " << frag_ptr->get_element_id() << " trigger number: " << frag_ptr->get_trigger_number() << " trigger_timestamp: " << frag_ptr->get_trigger_timestamp() << " window_begin: " << frag_ptr->get_window_begin() << " sequence_no: " << frag_ptr->get_sequence_number() << std::endl;
91 : }
92 0 : ts.add_fragment(std::move(frag_ptr));
93 : }
94 :
95 : // Write the timeslice to output file
96 0 : output_file->write(ts);
97 0 : }
98 0 : }
99 :
100 : /**
101 : * @brief Group and order input TimeSlice files per TPStream writer application.
102 : *
103 : * Opens each input file, validates that it is of `TimeSlice` type, and groups
104 : * files by their `application_name` attribute. Within each application group,
105 : * files are sorted by their `file_index` attribute so consecutive segments from
106 : * the same writer are processed in order.
107 : *
108 : * @param _files Vector of input HDF5 file paths.
109 : * @return Map keyed by `application_name`, each value a vector of file handles
110 : * ordered by `file_index`.
111 : * @throws std::runtime_error If any input file is not of type `TimeSlice`.
112 : */
113 : std::map<std::string, std::vector<std::shared_ptr<hdf5libs::HDF5RawDataFile>>>
114 0 : sort_files_per_writer(const std::vector<std::string>& _files)
115 : {
116 0 : std::map<std::string, std::vector<std::shared_ptr<hdf5libs::HDF5RawDataFile>>> files_sorted;
117 :
118 : // Put files into per-writer app groups
119 0 : for (const std::string& file : _files) {
120 0 : std::shared_ptr<hdf5libs::HDF5RawDataFile> fl = std::make_shared<hdf5libs::HDF5RawDataFile>(file);
121 0 : if (!fl->is_timeslice_type()) {
122 0 : throw std::runtime_error(fmt::format("ERROR: input file '{}' not of type 'TimeSlice'", file));
123 : }
124 :
125 0 : std::string application_name = fl->get_attribute<std::string>("application_name");
126 0 : files_sorted[application_name].push_back(fl);
127 0 : }
128 :
129 : // Sort files for each writer application individually
130 0 : for (auto& [app_name, vec_files]: files_sorted) {
131 : // Don't sort if we have 0 or 1 files in the application...
132 0 : if (vec_files.size() <= 1) {
133 0 : continue;
134 : }
135 :
136 : // Sort w.r.t. file index attribute
137 0 : std::sort(vec_files.begin(), vec_files.end(),
138 0 : [](const std::shared_ptr<hdf5libs::HDF5RawDataFile>& a, const std::shared_ptr<hdf5libs::HDF5RawDataFile>& b) {
139 0 : return a->get_attribute<size_t>("file_index") <
140 0 : b->get_attribute<size_t>("file_index");
141 : });
142 : }
143 :
144 0 : return files_sorted;
145 0 : };
146 :
147 : /**
148 : * @brief Compute the common SliceID interval shared by all writer groups.
149 : *
150 : * For each writer application, this function scans all of its files and finds
151 : * the minimum and maximum available record IDs. It then computes the global
152 : * intersection across applications:
153 : * - global_start = max(all per-application starts)
154 : * - global_end = min(all per-application ends)
155 : *
156 : * The returned range is therefore the SliceID window for which data is
157 : * expected to be available from every application.
158 : *
159 : * @todo: Rather than returning the SliceID range, should try to return a time range and have processors use that.
160 : *
161 : * @param _files Map of writer application names to vectors of input HDF5 files.
162 : * @param _quiet If false, print per-application and global range diagnostics.
163 : * @return Inclusive pair `{global_start, global_end}` of overlapping SliceIDs.
164 : * @throws std::runtime_error If `_files` is empty, if any file has no records,
165 : * or if no overlapping SliceID interval exists.
166 : */
167 : std::pair<uint64_t, uint64_t>
168 0 : get_available_slice_id_range(const std::map<std::string, std::vector<std::shared_ptr<hdf5libs::HDF5RawDataFile>>>& _files,
169 : bool _quiet)
170 : {
171 0 : if (_files.empty()) {
172 0 : throw std::runtime_error("No files provided");
173 : }
174 :
175 0 : uint64_t global_start = std::numeric_limits<uint64_t>::min();
176 0 : uint64_t global_end = std::numeric_limits<uint64_t>::max();
177 :
178 : // Get the min & max record id per application, and the global
179 0 : for (auto& [appid, vec_files]: _files) {
180 0 : uint64_t app_start = std::numeric_limits<uint64_t>::max();
181 0 : uint64_t app_end = std::numeric_limits<uint64_t>::min();
182 :
183 : // Find min / max record id for this application
184 0 : for (auto& file : vec_files) {
185 0 : auto record_ids = file->get_all_record_ids();
186 0 : if (record_ids.empty()) {
187 0 : throw std::runtime_error(fmt::format("File from application {} contains no records.", appid));
188 : }
189 0 : app_start = std::min(app_start, record_ids.begin()->first);
190 0 : app_end = std::max(app_end, record_ids.rbegin()->first);
191 0 : }
192 :
193 : // Update the global min / max record id
194 0 : global_start = std::max(global_start, app_start);
195 0 : global_end = std::min(global_end, app_end);
196 :
197 0 : if (!_quiet) {
198 0 : std::cout << "Application: " << appid << " " << " TimeSliceID start: " << app_start << " end: " << app_end << std::endl;
199 : }
200 : }
201 :
202 0 : if (!_quiet) {
203 0 : std::cout << "Global start: " << global_start << " Global end: " << global_end << std::endl;
204 : }
205 0 : if (global_start > global_end) {
206 0 : throw std::runtime_error("One of the provided files' id range did not overlap with the rest. Please select files with overlapping TimeSlice IDs");
207 : }
208 :
209 : // Extra validation / error handling
210 0 : for (auto& [appid, vec_files]: _files) {
211 0 : for (auto& file : vec_files) {
212 0 : auto record_ids = file->get_all_record_ids();
213 :
214 0 : uint64_t file_start = record_ids.begin()->first;
215 0 : uint64_t file_end = record_ids.rbegin()->first;
216 0 : if ((file_start > global_end || file_end < global_start)) {
217 0 : uint64_t file_index = file->get_attribute<size_t>("file_index");
218 0 : throw std::runtime_error(fmt::format(
219 : "File from TPStreamWrite application '{}' (index '{}') has record id range [{}, {}], which does not overlap with global range [{}, {}].",
220 : appid, file_index, file_start, file_end, global_start, global_end
221 0 : ));
222 : }
223 0 : }
224 : }
225 :
226 0 : return {global_start, global_end};
227 : }
228 :
229 : /**
230 : * @brief Struct with available cli application options
231 : */
232 : struct Options
233 : {
234 : /// @brief vector of input filenames
235 : std::vector<std::string> input_files;
236 : /// @brief output filename
237 : std::string output_filename;
238 : /// @brief the configuration filename
239 : std::string config_name;
240 : /// @brief do we want to quiet down the cout? Default: no
241 : bool quiet = false;
242 : /// @brief do we want to measure latencies? Default: no
243 : /// @todo: Latencies currently not supported!
244 : bool latencies = false;
245 : /// @brief runs each TAMaker on a separate thread
246 : bool run_parallel = false;
247 : /// @brief optional lower bound (inclusive) for TimeSlice IDs to process
248 : std::optional<uint64_t> slice_start = std::nullopt;
249 : /// @brief optional number of TimeSlices to process
250 : std::optional<uint64_t> num_slices = std::nullopt;
251 : };
252 :
253 : /**
254 : * @brief Adds options to our CLI application
255 : *
256 : * @param _app CLI application
257 : * @param _opts Struct with the available options
258 : */
259 0 : void parse_app(CLI::App& _app, Options& _opts)
260 : {
261 0 : _app.add_option("-i,--input-files", _opts.input_files, "List of input files (required)")
262 : ->required()
263 0 : ->check(CLI::ExistingFile); // Validate that each file exists
264 :
265 0 : _app.add_option("-o,--output-file", _opts.output_filename, "Output file (required)")
266 0 : ->required(); // make the argument required
267 :
268 0 : _app.add_option("-j,--json-config", _opts.config_name, "Trigger Activity and Candidate config JSON to use (required)")
269 : ->required()
270 0 : ->check(CLI::ExistingFile);
271 :
272 0 : _app.add_flag("--parallel", _opts.run_parallel, "Run the TAMakers in parallel");
273 :
274 0 : _app.add_flag("--quiet", _opts.quiet, "Quiet outputs.");
275 :
276 0 : _app.add_flag("--latencies", _opts.latencies, "Saves latencies per TP into csv");
277 :
278 0 : _app.add_option("-s,--slice-start", _opts.slice_start, "Inclusive lower bound for TimeSlice ID to process");
279 0 : _app.add_option("-n,--num-slices", _opts.num_slices, "Number of TimeSlices to process");
280 0 : }
281 :
282 0 : int main(int argc, char const *argv[])
283 : {
284 : // Do all the CLI processing first
285 0 : CLI::App app{"Offline trigger TriggerActivity & TriggerCandidate emulatior"};
286 0 : Options opts{};
287 :
288 0 : parse_app(app, opts);
289 :
290 0 : try {
291 0 : app.parse(argc, argv);
292 : }
293 0 : catch (const CLI::ParseError &e) {
294 0 : return app.exit(e);
295 0 : }
296 :
297 : // Get the configuration file
298 0 : std::ifstream config_stream(opts.config_name);
299 0 : nlohmann::json config = nlohmann::json::parse(config_stream);
300 :
301 0 : if (!opts.quiet) {
302 0 : std::cout << "Files to process:\n";
303 0 : for (const std::string& file : opts.input_files) {
304 0 : std::cout << "- " << file << "\n";
305 : }
306 : }
307 :
308 : // Sort the files into a map writer_id::vector<HDF5>
309 0 : std::map<std::string, std::vector<std::shared_ptr<hdf5libs::HDF5RawDataFile>>> sorted_files =
310 0 : sort_files_per_writer(opts.input_files);
311 :
312 : // Get the available record_id range
313 0 : std::pair<uint64_t, uint64_t> recordid_range = get_available_slice_id_range(sorted_files, opts.quiet);
314 0 : std::pair<uint64_t, uint64_t> processing_range = recordid_range;
315 :
316 0 : if (opts.slice_start.has_value()) {
317 0 : processing_range.first = std::max(processing_range.first, opts.slice_start.value());
318 : }
319 :
320 0 : if (opts.num_slices.has_value()) {
321 0 : if (opts.num_slices.value() == 0) {
322 0 : throw std::runtime_error("Invalid --num-slices value: must be greater than 0");
323 : }
324 :
325 0 : uint64_t requested_end = processing_range.first + (opts.num_slices.value() - 1);
326 0 : if (requested_end < processing_range.first) {
327 0 : requested_end = std::numeric_limits<uint64_t>::max();
328 : }
329 :
330 0 : processing_range.second = std::min(processing_range.second, requested_end);
331 : }
332 :
333 0 : if (processing_range.first > processing_range.second) {
334 0 : throw std::runtime_error(fmt::format(
335 : "Requested timeslice subset (slice-start={}, num-slices={}) does not overlap available range [{}, {}]",
336 0 : opts.slice_start.value_or(0),
337 0 : opts.num_slices.value_or(0),
338 : recordid_range.first,
339 0 : recordid_range.second));
340 : }
341 :
342 0 : if (!opts.quiet) {
343 0 : std::cout << "Processing TimeSliceID range: [" << processing_range.first << ", " << processing_range.second << "]" << std::endl;
344 : }
345 :
346 : // Create the file handlers
347 0 : std::vector<std::unique_ptr<TAEmulationWorker>> ta_emu_workers;
348 0 : for (auto [name, files] : sorted_files) {
349 0 : ta_emu_workers.push_back(std::make_unique<TAEmulationWorker>(files, config, processing_range, opts.run_parallel, opts.quiet));
350 0 : }
351 :
352 : // Start each file handler
353 0 : for (const auto& handler : ta_emu_workers) {
354 0 : handler->start_processing();
355 : }
356 :
357 : // Output map of TA vectors & function that appends TAs to that vector
358 0 : std::map<uint64_t, std::vector<triggeralgs::TriggerActivity>> tas;
359 0 : auto append_tas = [&tas](std::map<uint64_t, std::vector<triggeralgs::TriggerActivity>>&& _tas) {
360 0 : for (auto& [sliceid, src_vec] : _tas) {
361 0 : auto& dest_vec = tas[sliceid];
362 0 : dest_vec.reserve(dest_vec.size() + src_vec.size());
363 :
364 0 : dest_vec.insert(dest_vec.end(), std::make_move_iterator(src_vec.begin()), std::make_move_iterator(src_vec.end()));
365 0 : src_vec.clear();
366 : }
367 0 : };
368 :
369 : // Output map of Fragment vectors & function that appends TAs to that vector
370 0 : std::map<uint64_t, std::vector<std::unique_ptr<daqdataformats::Fragment>>> frags;
371 0 : auto append_frags = [&frags](std::map<uint64_t, std::vector<std::unique_ptr<daqdataformats::Fragment>>>&& _frags) {
372 0 : for (auto& [sliceid, src_vec] : _frags) {
373 0 : auto& dest_vec = frags[sliceid];
374 :
375 0 : dest_vec.reserve(dest_vec.size() + src_vec.size());
376 :
377 0 : dest_vec.insert(dest_vec.end(), std::make_move_iterator(src_vec.begin()), std::make_move_iterator(src_vec.end()));
378 0 : src_vec.clear();
379 : }
380 0 : };
381 :
382 : // Iterate over the handlers, wait for them to complete their job & append
383 : // their TAs to our vector when ready.
384 0 : hdf5libs::HDF5SourceIDHandler::source_id_geo_id_map_t sourceid_geoid_map;
385 0 : for (const auto& handler : ta_emu_workers) {
386 : // Wait for all TAs to be made
387 0 : handler->wait_to_complete_work();
388 :
389 : // Append output TA/fragments
390 0 : append_tas(std::move(handler->get_tas()));
391 0 : append_frags(std::move(handler->get_frags()));
392 :
393 : // Get and merge the source ID map
394 0 : hdf5libs::HDF5SourceIDHandler::source_id_geo_id_map_t map = handler->get_sourceid_geoid_map();
395 0 : sourceid_geoid_map.insert(map.begin(), map.end());
396 0 : }
397 :
398 : // Sort the TAs in each slice before pushing them into the TCMaker
399 0 : size_t n_tas = 0;
400 0 : for (auto& [sliceid, vec_tas]: tas) {
401 0 : std::sort(vec_tas.begin(), vec_tas.end(),
402 0 : [](const triggeralgs::TriggerActivity& a, const triggeralgs::TriggerActivity& b) {
403 0 : return std::tie(a.time_start, a.channel_start, a.time_end) <
404 0 : std::tie(b.time_start, b.channel_start, b.time_end);
405 : });
406 0 : n_tas += vec_tas.size();
407 : }
408 :
409 0 : if (!opts.quiet) {
410 0 : std::cout << "Total number of TAs made: " << n_tas << std::endl;
411 0 : std::cout << "Creating a TCMaker..." << std::endl;
412 : }
413 : // Create the TC emulator
414 0 : std::string algo_name = config["trigger_candidate_plugin"][0];
415 0 : nlohmann::json algo_config = config["trigger_candidate_config"][0];
416 :
417 0 : std::unique_ptr<triggeralgs::TriggerCandidateMaker> tc_maker =
418 0 : triggeralgs::TriggerCandidateFactory::get_instance()->build_maker(algo_name);
419 0 : tc_maker->configure(algo_config);
420 :
421 0 : trgtools::TCEmulationUnit tc_emulator;
422 0 : tc_emulator.set_maker(tc_maker);
423 :
424 : // Emulate the TriggerCandidates
425 0 : std::vector<triggeralgs::TriggerCandidate> tcs;
426 0 : for (auto& [sliceid, vec_tas]: tas) {
427 0 : std::unique_ptr<daqdataformats::Fragment> tc_frag = tc_emulator.emulate_vector(vec_tas);
428 0 : if (!tc_frag) {
429 0 : continue;
430 : }
431 :
432 : // Manipulate the fragment header
433 0 : daqdataformats::FragmentHeader frag_hdr = tc_frag->get_header();
434 0 : frag_hdr.element_id = daqdataformats::SourceID{daqdataformats::SourceID::Subsystem::kTrigger, tc_frag->get_element_id().id+10000};
435 :
436 0 : tc_frag->set_header_fields(frag_hdr);
437 0 : tc_frag->set_type(daqdataformats::FragmentType::kTriggerCandidate);
438 0 : tc_frag->set_trigger_number(sliceid);
439 0 : tc_frag->set_window_begin(frags[sliceid][0]->get_window_begin());
440 0 : tc_frag->set_window_end(frags[sliceid][0]->get_window_end());
441 :
442 : // Push the fragment to our list
443 0 : frags[sliceid].push_back(std::move(tc_frag));
444 :
445 0 : std::vector<triggeralgs::TriggerCandidate> tmp_tcs = tc_emulator.get_last_output_buffer();
446 0 : tcs.reserve(tcs.size() + tmp_tcs.size());
447 0 : tcs.insert(tcs.end(), std::make_move_iterator(tmp_tcs.begin()), std::make_move_iterator(tmp_tcs.end()));
448 0 : }
449 0 : if (!opts.quiet) {
450 0 : std::cout << "Total number of TCs made: " << tcs.size() << std::endl;
451 : }
452 :
453 0 : if (!frags.empty()) {
454 0 : save_fragments(opts.output_filename, sourceid_geoid_map, std::move(frags), opts.quiet);
455 : } else {
456 0 : std::cout << "No TA/TC fragments generated. Output file will not be generated" << std::endl;
457 : }
458 :
459 0 : return 0;
460 0 : }
|