Line data Source code
1 : #include "trgtools/EmulateTCUnit.hpp"
2 : #include "trgtools/TAFileHandler.hpp"
3 :
4 : #include "CLI/App.hpp"
5 : #include "CLI/Config.hpp"
6 : #include "CLI/Formatter.hpp"
7 :
8 : #include <fmt/core.h>
9 : #include <fmt/format.h>
10 : #include <fmt/chrono.h>
11 : #include <filesystem>
12 :
13 : #include "hdf5libs/HDF5RawDataFile.hpp"
14 : #include "hdf5libs/HDF5SourceIDHandler.hpp"
15 :
16 : #include "triggeralgs/TriggerCandidateFactory.hpp"
17 :
18 : //-----------------------------------------------------------------------------
19 :
20 : using namespace dunedaq;
21 : using namespace trgtools;
22 :
23 : /**
24 : * @brief Saves fragments in timeslice format into output HDF5 file
25 : *
26 : * @param _outputfilename Name of the hdf5 file to save the output into
27 : * @param _sourceid_geoid_map sourceid--geoid map required to create a HDF5 file
28 : * @param _frags Map of fragments, with a vector of fragment pointers for each slice id
29 : * @param _quiet Do we want to quiet down the cout?
30 : */
31 0 : void SaveFragments(const std::string& _outputfilename,
32 : const hdf5libs::HDF5SourceIDHandler::source_id_geo_id_map_t& _sourceid_geoid_map,
33 : std::map<uint64_t, std::vector<std::unique_ptr<daqdataformats::Fragment>>> _frags,
34 : bool _quiet)
35 : {
36 : // Create layout parameter object required for HDF5 creation
37 0 : hdf5libs::HDF5FileLayoutParameters layout_params;
38 :
39 : // Create HDF5 parameter path required for the layout
40 0 : hdf5libs::HDF5PathParameters params_trigger;
41 0 : params_trigger.detector_group_type = "Detector_Readout";
42 : /// @todo Maybe in the future we will want to emulate PDS TPs.
43 0 : params_trigger.detector_group_name = "TPC";
44 0 : params_trigger.element_name_prefix = "Link";
45 0 : params_trigger.digits_for_element_number = 5;
46 :
47 : // Fill the HDF5 layout
48 0 : std::vector<hdf5libs::HDF5PathParameters> params;
49 0 : params.push_back(params_trigger);
50 0 : layout_params.record_name_prefix = "TimeSlice";
51 0 : layout_params.digits_for_record_number = 6;
52 0 : layout_params.digits_for_sequence_number = 0;
53 0 : layout_params.record_header_dataset_name = "TimeSliceHeader";
54 0 : layout_params.raw_data_group_name = "RawData";
55 0 : layout_params.view_group_name = "Views";
56 0 : layout_params.path_params_list = {params};
57 :
58 : // Create pointer to a new output HDF5 file
59 0 : std::unique_ptr<hdf5libs::HDF5RawDataFile> output_file = std::make_unique<hdf5libs::HDF5RawDataFile>(
60 0 : _outputfilename + ".hdf5",
61 0 : _frags.begin()->second[0]->get_run_number(),
62 0 : 0,
63 : "emulate_from_tpstream",
64 : layout_params,
65 0 : _sourceid_geoid_map);
66 :
67 : // Iterate over the time slices & save all the fragments
68 0 : for (auto& [slice_id, vec_frags]: _frags) {
69 : // Create a new timeslice header
70 0 : daqdataformats::TimeSliceHeader tsh;
71 0 : tsh.timeslice_number = slice_id;
72 0 : tsh.run_number = vec_frags[0]->get_run_number();
73 0 : tsh.element_id = dunedaq::daqdataformats::SourceID(dunedaq::daqdataformats::SourceID::Subsystem::kTRBuilder, 0);
74 :
75 : // Create a new timeslice
76 0 : dunedaq::daqdataformats::TimeSlice ts(tsh);
77 0 : if (!_quiet) {
78 0 : std::cout << "Time slice number: " << slice_id << std::endl;
79 : }
80 : // Add the fragments to the timeslices
81 0 : for (std::unique_ptr<daqdataformats::Fragment>& frag_ptr: vec_frags) {
82 0 : if (!_quiet) {
83 0 : std::cout << " Writing elementid: " << frag_ptr->get_element_id() << " trigger number: " << frag_ptr->get_trigger_number() << " trigger_timestamp: " << frag_ptr->get_trigger_timestamp() << " window_begin: " << frag_ptr->get_window_begin() << " sequence_no: " << frag_ptr->get_sequence_number() << std::endl;
84 : }
85 0 : ts.add_fragment(std::move(frag_ptr));
86 : }
87 :
88 : // Write the timeslice to output file
89 0 : output_file->write(ts);
90 0 : }
91 0 : }
92 :
93 : /**
94 : * @brief Returns sorted map of HDF5 files per datawriter application
95 : *
96 : * @param _files: vector of strings corresponding to the input file paths
97 : */
98 : std::map<std::string, std::vector<std::shared_ptr<hdf5libs::HDF5RawDataFile>>>
99 0 : SortFilesPerWriter(const std::vector<std::string>& _files)
100 : {
101 0 : std::map<std::string, std::vector<std::shared_ptr<hdf5libs::HDF5RawDataFile>>> files_sorted;
102 :
103 : // Put files into per-writer app groups
104 0 : for (const std::string& file : _files) {
105 0 : std::shared_ptr<hdf5libs::HDF5RawDataFile> fl = std::make_shared<hdf5libs::HDF5RawDataFile>(file);
106 0 : if (!fl->is_timeslice_type()) {
107 0 : throw std::runtime_error(fmt::format("ERROR: input file '{}' not of type 'TimeSlice'", file));
108 : }
109 :
110 0 : std::string application_name = fl->get_attribute<std::string>("application_name");
111 0 : files_sorted[application_name].push_back(fl);
112 0 : }
113 :
114 : // Sort files for each writer application individually
115 0 : for (auto& [app_name, vec_files]: files_sorted) {
116 : // Don't sort if we have 0 or 1 files in the application...
117 0 : if (vec_files.size() <= 1) {
118 0 : continue;
119 : }
120 :
121 : // Sort w.r.t. file index attribute
122 0 : std::sort(vec_files.begin(), vec_files.end(),
123 0 : [](const std::shared_ptr<hdf5libs::HDF5RawDataFile>& a, const std::shared_ptr<hdf5libs::HDF5RawDataFile>& b) {
124 0 : return a->get_attribute<size_t>("file_index") <
125 0 : b->get_attribute<size_t>("file_index");
126 : });
127 : }
128 :
129 0 : return files_sorted;
130 0 : };
131 :
132 : /**
133 : * @brief Retrieves the available slice ID range
134 : *
135 : * Finds the overlap in the slice ID range between the provided files, and
136 : * returns that overlap as an available range -- or crashes if there is a file
137 : * with a range that does not overlap.
138 : *
139 : * @todo: Rather than returning the sliceID range, should try to return a time range -- and have processors go off that.
140 : *
141 : * @param _files: a map of writer app names & vectors of HDF5 files from that application.
142 : * @param _quiet Do we want to quiet down the cout?
143 : */
144 : std::pair<uint64_t, uint64_t>
145 0 : GetAvailableSliceIDRange(const std::map<std::string, std::vector<std::shared_ptr<hdf5libs::HDF5RawDataFile>>>& _files,
146 : bool _quiet)
147 : {
148 0 : if (_files.empty()) {
149 0 : throw std::runtime_error("No files provided");
150 : }
151 :
152 0 : uint64_t global_start = std::numeric_limits<uint64_t>::min();
153 0 : uint64_t global_end = std::numeric_limits<uint64_t>::max();
154 :
155 : // Get the min & max record id per application, and the global
156 0 : for (auto& [appid, vec_files]: _files) {
157 0 : uint64_t app_start = std::numeric_limits<uint64_t>::max();
158 0 : uint64_t app_end = std::numeric_limits<uint64_t>::min();
159 :
160 : // Find min / max record id for this application
161 0 : for (auto& file : vec_files) {
162 0 : auto record_ids = file->get_all_record_ids();
163 0 : if (record_ids.empty()) {
164 0 : throw std::runtime_error(fmt::format("File from application {} contains no records.", appid));
165 : }
166 0 : app_start = std::min(app_start, record_ids.begin()->first);
167 0 : app_end = std::max(app_end, record_ids.rbegin()->first);
168 0 : }
169 :
170 : // Update the global min / max record id
171 0 : global_start = std::max(global_start, app_start);
172 0 : global_end = std::min(global_end, app_end);
173 :
174 0 : if (!_quiet) {
175 0 : std::cout << "Application: " << appid << " " << " TimeSliceID start: " << app_start << " end: " << app_end << std::endl;
176 : }
177 : }
178 :
179 0 : if (!_quiet) {
180 0 : std::cout << "Global start: " << global_start << " Global end: " << global_end << std::endl;
181 : }
182 0 : if (global_start > global_end) {
183 0 : throw std::runtime_error("One of the provided files' id range did not overlap with the rest. Please select files with overlapping TimeSlice IDs");
184 : }
185 :
186 : // Extra validation / error handling
187 0 : for (auto& [appid, vec_files]: _files) {
188 0 : for (auto& file : vec_files) {
189 0 : auto record_ids = file->get_all_record_ids();
190 :
191 0 : uint64_t file_start = record_ids.begin()->first;
192 0 : uint64_t file_end = record_ids.rbegin()->first;
193 0 : if ((file_start > global_end || file_end < global_start)) {
194 0 : uint64_t file_index = file->get_attribute<size_t>("file_index");
195 0 : throw std::runtime_error(fmt::format(
196 : "File from TPStreamWrite application '{}' (index '{}') has record id range [{}, {}], which does not overlap with global range [{}, {}].",
197 : appid, file_index, file_start, file_end, global_start, global_end
198 0 : ));
199 : }
200 0 : }
201 : }
202 :
203 0 : return {global_start, global_end};
204 : }
205 :
206 : /**
207 : * @brief Struct with available cli application options
208 : */
209 : struct Options
210 : {
211 : /// @brief vector of input filenames
212 : std::vector<std::string> input_files;
213 : /// @brief output filename
214 : std::string output_filename;
215 : /// @brief the configuration filename
216 : std::string config_name;
217 : /// @brief do we want to quiet down the cout? Default: no
218 : bool quiet = false;
219 : /// @brief do we want to measure latencies? Default: no
220 : /// @todo: Latencies currently not supported!
221 : bool latencies = false;
222 : /// @brief runs each TAMaker on a separate thread
223 : bool run_parallel = false;
224 : };
225 :
226 : /**
227 : * @brief Adds options to our CLI application
228 : *
229 : * @param _app CLI application
230 : * @param _opts Struct with the available options
231 : */
232 0 : void ParseApp(CLI::App& _app, Options& _opts)
233 : {
234 0 : _app.add_option("-i,--input-files", _opts.input_files, "List of input files (required)")
235 : ->required()
236 0 : ->check(CLI::ExistingFile); // Validate that each file exists
237 :
238 0 : _app.add_option("-o,--output-file", _opts.output_filename, "Output file (required)")
239 0 : ->required(); // make the argument required
240 :
241 0 : _app.add_option("-j,--json-config", _opts.config_name, "Trigger Activity and Candidate config JSON to use (required)")
242 : ->required()
243 0 : ->check(CLI::ExistingFile);
244 :
245 0 : _app.add_flag("--parallel", _opts.run_parallel, "Run the TAMakers in parallel");
246 :
247 0 : _app.add_flag("--quiet", _opts.quiet, "Quiet outputs.");
248 :
249 0 : _app.add_flag("--latencies", _opts.latencies, "Saves latencies per TP into csv");
250 0 : }
251 :
252 0 : int main(int argc, char const *argv[])
253 : {
254 : // Do all the CLI processing first
255 0 : CLI::App app{"Offline trigger TriggerActivity & TriggerCandidate emulatior"};
256 0 : Options opts{};
257 :
258 0 : ParseApp(app, opts);
259 :
260 0 : try {
261 0 : app.parse(argc, argv);
262 : }
263 0 : catch (const CLI::ParseError &e) {
264 0 : return app.exit(e);
265 0 : }
266 :
267 : // Get the configuration file
268 0 : std::ifstream config_stream(opts.config_name);
269 0 : nlohmann::json config = nlohmann::json::parse(config_stream);
270 :
271 0 : if (!opts.quiet) {
272 0 : std::cout << "Files to process:\n";
273 0 : for (const std::string& file : opts.input_files) {
274 0 : std::cout << "- " << file << "\n";
275 : }
276 : }
277 :
278 : // Sort the files into a map writer_id::vector<HDF5>
279 0 : std::map<std::string, std::vector<std::shared_ptr<hdf5libs::HDF5RawDataFile>>> sorted_files =
280 0 : SortFilesPerWriter(opts.input_files);
281 :
282 : // Get the available record_id range
283 0 : std::pair<uint64_t, uint64_t> recordid_range = GetAvailableSliceIDRange(sorted_files, opts.quiet);
284 :
285 : // Create the file handlers
286 0 : std::vector<std::unique_ptr<TAFileHandler>> file_handlers;
287 0 : for (auto [name, files] : sorted_files) {
288 0 : file_handlers.push_back(std::make_unique<TAFileHandler>(files, config, recordid_range, opts.run_parallel, opts.quiet));
289 0 : }
290 :
291 : // Start each file handler
292 0 : for (const auto& handler : file_handlers) {
293 0 : handler->start_processing();
294 : }
295 :
296 : // Output map of TA vectors & function that appends TAs to that vector
297 0 : std::map<uint64_t, std::vector<triggeralgs::TriggerActivity>> tas;
298 0 : auto append_tas = [&tas](std::map<uint64_t, std::vector<triggeralgs::TriggerActivity>>&& _tas) {
299 0 : for (auto& [sliceid, src_vec] : _tas) {
300 0 : auto& dest_vec = tas[sliceid];
301 0 : dest_vec.reserve(dest_vec.size() + src_vec.size());
302 :
303 0 : dest_vec.insert(dest_vec.end(), std::make_move_iterator(src_vec.begin()), std::make_move_iterator(src_vec.end()));
304 0 : src_vec.clear();
305 : }
306 0 : };
307 :
308 : // Output map of Fragment vectors & function that appends TAs to that vector
309 0 : std::map<uint64_t, std::vector<std::unique_ptr<daqdataformats::Fragment>>> frags;
310 0 : auto append_frags = [&frags](std::map<uint64_t, std::vector<std::unique_ptr<daqdataformats::Fragment>>>&& _frags) {
311 0 : for (auto& [sliceid, src_vec] : _frags) {
312 0 : auto& dest_vec = frags[sliceid];
313 :
314 0 : dest_vec.reserve(dest_vec.size() + src_vec.size());
315 :
316 0 : dest_vec.insert(dest_vec.end(), std::make_move_iterator(src_vec.begin()), std::make_move_iterator(src_vec.end()));
317 0 : src_vec.clear();
318 : }
319 0 : };
320 :
321 : // Iterate over the handlers, wait for them to complete their job & append
322 : // their TAs to our vector when ready.
323 0 : hdf5libs::HDF5SourceIDHandler::source_id_geo_id_map_t sourceid_geoid_map;
324 0 : for (const auto& handler : file_handlers) {
325 : // Wait for all TAs to be made
326 0 : handler->wait_to_complete_work();
327 :
328 : // Append output TA/fragments
329 0 : append_tas(std::move(handler->get_tas()));
330 0 : append_frags(std::move(handler->get_frags()));
331 :
332 : // Get and merge the source ID map
333 0 : hdf5libs::HDF5SourceIDHandler::source_id_geo_id_map_t map = handler->get_sourceid_geoid_map();
334 0 : sourceid_geoid_map.insert(map.begin(), map.end());
335 0 : }
336 :
337 : // Sort the TAs in each slice before pushing them into the TCMaker
338 0 : size_t n_tas = 0;
339 0 : for (auto& [sliceid, vec_tas]: tas) {
340 0 : std::sort(vec_tas.begin(), vec_tas.end(),
341 0 : [](const triggeralgs::TriggerActivity& a, const triggeralgs::TriggerActivity& b) {
342 0 : return std::tie(a.time_start, a.channel_start, a.time_end) <
343 0 : std::tie(b.time_start, b.channel_start, b.time_end);
344 : });
345 0 : n_tas += vec_tas.size();
346 : }
347 0 : if (!opts.quiet) {
348 0 : std::cout << "Total number of TAs made: " << n_tas << std::endl;
349 0 : std::cout << "Creating a TCMaker..." << std::endl;
350 : }
351 : // Create the TC emulator
352 0 : std::string algo_name = config["trigger_candidate_plugin"][0];
353 0 : nlohmann::json algo_config = config["trigger_candidate_config"][0];
354 :
355 0 : std::unique_ptr<triggeralgs::TriggerCandidateMaker> tc_maker =
356 0 : triggeralgs::TriggerCandidateFactory::get_instance()->build_maker(algo_name);
357 0 : tc_maker->configure(algo_config);
358 :
359 0 : trgtools::EmulateTCUnit tc_emulator;
360 0 : tc_emulator.set_maker(tc_maker);
361 :
362 : // Emulate the TriggerCandidates
363 0 : std::vector<triggeralgs::TriggerCandidate> tcs;
364 0 : for (auto& [sliceid, vec_tas]: tas) {
365 0 : std::unique_ptr<daqdataformats::Fragment> tc_frag = tc_emulator.emulate_vector(vec_tas);
366 0 : if (!tc_frag) {
367 0 : continue;
368 : }
369 :
370 : // Manipulate the fragment header
371 0 : daqdataformats::FragmentHeader frag_hdr = tc_frag->get_header();
372 0 : frag_hdr.element_id = daqdataformats::SourceID{daqdataformats::SourceID::Subsystem::kTrigger, tc_frag->get_element_id().id+10000};
373 :
374 0 : tc_frag->set_header_fields(frag_hdr);
375 0 : tc_frag->set_type(daqdataformats::FragmentType::kTriggerCandidate);
376 0 : tc_frag->set_trigger_number(sliceid);
377 0 : tc_frag->set_window_begin(frags[sliceid][0]->get_window_begin());
378 0 : tc_frag->set_window_end(frags[sliceid][0]->get_window_end());
379 :
380 : // Push the fragment to our list
381 0 : frags[sliceid].push_back(std::move(tc_frag));
382 :
383 0 : std::vector<triggeralgs::TriggerCandidate> tmp_tcs = tc_emulator.get_last_output_buffer();
384 0 : tcs.reserve(tcs.size() + tmp_tcs.size());
385 0 : tcs.insert(tcs.end(), std::make_move_iterator(tmp_tcs.begin()), std::make_move_iterator(tmp_tcs.end()));
386 0 : }
387 0 : if (!opts.quiet) {
388 0 : std::cout << "Total number of TCs made: " << tcs.size() << std::endl;
389 : }
390 :
391 0 : SaveFragments(opts.output_filename, sourceid_geoid_map, std::move(frags), opts.quiet);
392 :
393 0 : return 0;
394 0 : }
|