LCOV - code coverage report
Current view: top level - trgtools/apps - emulate_from_tpstream.cxx (source / functions) Coverage Total Hit
Test: code.result Lines: 0.0 % 210 0
Test Date: 2026-03-29 15:29:34 Functions: 0.0 % 9 0

            Line data    Source code
       1              : #include "trgtools/TCEmulationUnit.hpp"
       2              : #include "trgtools/TAEmulationWorker.hpp"
       3              : 
       4              : #include "CLI/App.hpp"
       5              : #include "CLI/Config.hpp"
       6              : #include "CLI/Formatter.hpp"
       7              : 
       8              : #include <fmt/core.h>
       9              : #include <fmt/format.h>
      10              : #include <fmt/chrono.h>
      11              : #include <filesystem>
      12              : #include <optional>
      13              : 
      14              : #include "hdf5libs/HDF5RawDataFile.hpp"
      15              : #include "hdf5libs/HDF5SourceIDHandler.hpp"
      16              : 
      17              : #include "triggeralgs/TriggerCandidateFactory.hpp"
      18              : 
      19              : //-----------------------------------------------------------------------------
      20              : 
      21              : using namespace dunedaq;
      22              : using namespace trgtools;
      23              : 
      24              : /**
      25              :  * @brief Saves fragments in timeslice format into output HDF5 file
      26              :  *
      27              :  * @param _outputfilename Name of the hdf5 file to save the output into
      28              :  * @param _sourceid_geoid_map sourceid--geoid map required to create a HDF5 file
      29              :  * @param _frags Map of fragments, with a vector of fragment pointers for each slice id
      30              :  * @param _quiet Do we want to quiet down the cout?
      31              :  */
      32            0 : void save_fragments(const std::string& _outputfilename,
      33              :                    const hdf5libs::HDF5SourceIDHandler::source_id_geo_id_map_t& _sourceid_geoid_map,
      34              :                    std::map<uint64_t, std::vector<std::unique_ptr<daqdataformats::Fragment>>> _frags,
      35              :                    bool _quiet)
      36              : {
      37            0 :   std::string output_filename = _outputfilename;
      38            0 :   if (output_filename.size() < 5 ||
      39            0 :       output_filename.compare(output_filename.size() - 5, 5, ".hdf5") != 0) {
      40            0 :     output_filename += ".hdf5";
      41              :   }
      42              : 
      43              :   // Create layout parameter object required for HDF5 creation
      44            0 :   hdf5libs::HDF5FileLayoutParameters layout_params;
      45              : 
      46              :   // Create HDF5 parameter path required for the layout
      47            0 :   hdf5libs::HDF5PathParameters params_trigger;
      48            0 :   params_trigger.detector_group_type = "Detector_Readout";
      49              :   /// @todo Maybe in the future we will want to emulate PDS TPs. 
      50            0 :   params_trigger.detector_group_name = "TPC";
      51            0 :   params_trigger.element_name_prefix = "Link";
      52            0 :   params_trigger.digits_for_element_number = 5;
      53              : 
      54              :   // Fill the HDF5 layout
      55            0 :   std::vector<hdf5libs::HDF5PathParameters> params;
      56            0 :   params.push_back(params_trigger);
      57            0 :   layout_params.record_name_prefix = "TimeSlice";
      58            0 :   layout_params.digits_for_record_number = 6;
      59            0 :   layout_params.digits_for_sequence_number = 0;
      60            0 :   layout_params.record_header_dataset_name = "TimeSliceHeader";
      61            0 :   layout_params.raw_data_group_name = "RawData";
      62            0 :   layout_params.view_group_name = "Views";
      63            0 :   layout_params.path_params_list = {params};
      64              : 
      65              :   // Create pointer to a new output HDF5 file
      66            0 :   std::unique_ptr<hdf5libs::HDF5RawDataFile> output_file = std::make_unique<hdf5libs::HDF5RawDataFile>(
      67              :       output_filename,
      68            0 :       _frags.begin()->second[0]->get_run_number(),
      69            0 :       0,
      70              :       "emulate_from_tpstream",
      71              :       layout_params,
      72            0 :       _sourceid_geoid_map);
      73              : 
      74              :   // Iterate over the time slices & save all the fragments
      75            0 :   for (auto& [slice_id, vec_frags]: _frags) {
      76              :     // Create a new timeslice header
      77            0 :     daqdataformats::TimeSliceHeader tsh;
      78            0 :     tsh.timeslice_number = slice_id;
      79            0 :     tsh.run_number = vec_frags[0]->get_run_number();
      80            0 :     tsh.element_id = dunedaq::daqdataformats::SourceID(dunedaq::daqdataformats::SourceID::Subsystem::kTRBuilder, 0);
      81              : 
      82              :     // Create a new timeslice
      83            0 :     dunedaq::daqdataformats::TimeSlice ts(tsh);
      84            0 :     if (!_quiet) {
      85            0 :       std::cout << "Time slice number: " << slice_id << std::endl;
      86              :     }
      87              :     // Add the fragments to the timeslices
      88            0 :     for (std::unique_ptr<daqdataformats::Fragment>& frag_ptr: vec_frags) {
      89            0 :       if (!_quiet) {
      90            0 :         std::cout << "  Writing elementid: " << frag_ptr->get_element_id()  << " trigger number: " << frag_ptr->get_trigger_number() << " trigger_timestamp: " << frag_ptr->get_trigger_timestamp() << " window_begin: " << frag_ptr->get_window_begin() << " sequence_no: " << frag_ptr->get_sequence_number() << std::endl;
      91              :       }
      92            0 :       ts.add_fragment(std::move(frag_ptr));
      93              :     }
      94              : 
      95              :     // Write the timeslice to output file
      96            0 :     output_file->write(ts);
      97            0 :   }
      98            0 : }
      99              : 
     100              : /**
     101              :  * @brief Group and order input TimeSlice files per TPStream writer application.
     102              :  *
     103              :  * Opens each input file, validates that it is of `TimeSlice` type, and groups
     104              :  * files by their `application_name` attribute. Within each application group,
     105              :  * files are sorted by their `file_index` attribute so consecutive segments from
     106              :  * the same writer are processed in order.
     107              :  *
     108              :  * @param _files Vector of input HDF5 file paths.
     109              :  * @return Map keyed by `application_name`, each value a vector of file handles
     110              :  *         ordered by `file_index`.
     111              :  * @throws std::runtime_error If any input file is not of type `TimeSlice`.
     112              :  */
     113              : std::map<std::string, std::vector<std::shared_ptr<hdf5libs::HDF5RawDataFile>>>
     114            0 : sort_files_per_writer(const std::vector<std::string>& _files)
     115              : {
     116            0 :   std::map<std::string, std::vector<std::shared_ptr<hdf5libs::HDF5RawDataFile>>> files_sorted;
     117              : 
     118              :   // Put files into per-writer app groups
     119            0 :   for (const std::string& file : _files) {
     120            0 :     std::shared_ptr<hdf5libs::HDF5RawDataFile> fl = std::make_shared<hdf5libs::HDF5RawDataFile>(file);
     121            0 :     if (!fl->is_timeslice_type()) {
     122            0 :       throw std::runtime_error(fmt::format("ERROR: input file '{}' not of type 'TimeSlice'", file));
     123              :     }
     124              : 
     125            0 :     std::string application_name = fl->get_attribute<std::string>("application_name");
     126            0 :     files_sorted[application_name].push_back(fl);
     127            0 :   }
     128              : 
     129              :   // Sort files for each writer application individually
     130            0 :   for (auto& [app_name, vec_files]: files_sorted) {
     131              :     // Don't sort if we have 0 or 1 files in the application...
     132            0 :     if (vec_files.size() <= 1) {
     133            0 :       continue;
     134              :     }
     135              : 
     136              :     // Sort w.r.t. file index attribute
     137            0 :     std::sort(vec_files.begin(), vec_files.end(),
     138            0 :         [](const std::shared_ptr<hdf5libs::HDF5RawDataFile>& a, const std::shared_ptr<hdf5libs::HDF5RawDataFile>& b) {
     139            0 :         return a->get_attribute<size_t>("file_index") <
     140            0 :                b->get_attribute<size_t>("file_index");
     141              :         });
     142              :   }
     143              : 
     144            0 :   return files_sorted;
     145            0 : };
     146              : 
     147              : /**
     148              :  * @brief Compute the common SliceID interval shared by all writer groups.
     149              :  *
     150              :  * For each writer application, this function scans all of its files and finds
     151              :  * the minimum and maximum available record IDs. It then computes the global
     152              :  * intersection across applications:
     153              :  * - global_start = max(all per-application starts)
     154              :  * - global_end   = min(all per-application ends)
     155              :  *
     156              :  * The returned range is therefore the SliceID window for which data is
     157              :  * expected to be available from every application.
     158              :  *
     159              :  * @todo: Rather than returning the SliceID range, should try to return a time range and have processors use that.
     160              :  *
     161              :  * @param _files Map of writer application names to vectors of input HDF5 files.
     162              :  * @param _quiet If false, print per-application and global range diagnostics.
     163              :  * @return Inclusive pair `{global_start, global_end}` of overlapping SliceIDs.
     164              :  * @throws std::runtime_error If `_files` is empty, if any file has no records,
     165              :  *         or if no overlapping SliceID interval exists.
     166              :  */
     167              : std::pair<uint64_t, uint64_t>
     168            0 : get_available_slice_id_range(const std::map<std::string, std::vector<std::shared_ptr<hdf5libs::HDF5RawDataFile>>>& _files,
     169              :                          bool _quiet)
     170              : {
     171            0 :   if (_files.empty()) {
     172            0 :     throw std::runtime_error("No files provided");
     173              :   }
     174              : 
     175            0 :   uint64_t global_start = std::numeric_limits<uint64_t>::min();
     176            0 :   uint64_t global_end = std::numeric_limits<uint64_t>::max();
     177              : 
     178              :   // Get the min & max record id per application, and the global
     179            0 :   for (auto& [appid, vec_files]: _files) {
     180            0 :     uint64_t app_start = std::numeric_limits<uint64_t>::max();
     181            0 :     uint64_t app_end = std::numeric_limits<uint64_t>::min();
     182              : 
     183              :     // Find min / max record id for this application
     184            0 :     for (auto& file : vec_files) {
     185            0 :       auto record_ids = file->get_all_record_ids();
     186            0 :       if (record_ids.empty()) {
     187            0 :         throw std::runtime_error(fmt::format("File from application {} contains no records.", appid));
     188              :       }
     189            0 :       app_start = std::min(app_start, record_ids.begin()->first);
     190            0 :       app_end = std::max(app_end, record_ids.rbegin()->first);
     191            0 :     }
     192              : 
     193              :     // Update the global min / max record id
     194            0 :     global_start = std::max(global_start, app_start);
     195            0 :     global_end = std::min(global_end, app_end);
     196              : 
     197            0 :     if (!_quiet) {
     198            0 :       std::cout << "Application: " << appid << " " << " TimeSliceID start: " << app_start << " end: " << app_end << std::endl;
     199              :     }
     200              :   }
     201              : 
     202            0 :   if (!_quiet) {
     203            0 :     std::cout << "Global start: " << global_start << " Global end: " << global_end << std::endl;
     204              :   }
     205            0 :   if (global_start > global_end) {
     206            0 :     throw std::runtime_error("One of the provided files' id range did not overlap with the rest. Please select files with overlapping TimeSlice IDs");
     207              :   }
     208              : 
     209              :   // Extra validation / error handling
     210            0 :   for (auto& [appid, vec_files]: _files) {
     211            0 :     for (auto& file : vec_files) {
     212            0 :       auto record_ids = file->get_all_record_ids();
     213              : 
     214            0 :       uint64_t file_start = record_ids.begin()->first;
     215            0 :       uint64_t file_end = record_ids.rbegin()->first;
     216            0 :       if ((file_start > global_end || file_end < global_start)) {
     217            0 :         uint64_t file_index = file->get_attribute<size_t>("file_index");
     218            0 :         throw std::runtime_error(fmt::format(
     219              :           "File from TPStreamWrite application '{}' (index '{}') has record id range [{}, {}], which does not overlap with global range [{}, {}].",
     220              :            appid, file_index, file_start, file_end, global_start, global_end
     221            0 :           ));
     222              :       }
     223            0 :     }
     224              :   }
     225              : 
     226            0 :   return {global_start, global_end};
     227              : }
     228              : 
     229              : /**
     230              :  * @brief Struct with available cli application options
     231              :  */
     232              : struct Options
     233              : {
     234              :   /// @brief vector of input filenames
     235              :   std::vector<std::string> input_files;
     236              :   /// @brief output filename
     237              :   std::string output_filename;
     238              :   /// @brief the configuration filename
     239              :   std::string config_name;
     240              :   /// @brief do we want to quiet down the cout? Default: no
     241              :   bool quiet = false;
     242              :   /// @brief do we want to measure latencies? Default: no
     243              :   /// @todo: Latencies currently not supported!
     244              :   bool latencies = false;
     245              :   /// @brief runs each TAMaker on a separate thread
     246              :   bool run_parallel = false;
     247              :   /// @brief optional lower bound (inclusive) for TimeSlice IDs to process
     248              :   std::optional<uint64_t> slice_start = std::nullopt;
     249              :   /// @brief optional number of TimeSlices to process
     250              :   std::optional<uint64_t> num_slices = std::nullopt;
     251              : };
     252              : 
     253              : /**
     254              :  * @brief Adds options to our CLI application
     255              :  * 
     256              :  * @param _app CLI application
     257              :  * @param _opts Struct with the available options
     258              :  */
     259            0 : void parse_app(CLI::App& _app, Options& _opts)
     260              : {
     261            0 :   _app.add_option("-i,--input-files", _opts.input_files, "List of input files (required)")
     262              :     ->required()
     263            0 :     ->check(CLI::ExistingFile); // Validate that each file exists
     264              : 
     265            0 :   _app.add_option("-o,--output-file", _opts.output_filename, "Output file (required)")
     266            0 :     ->required(); // make the argument required
     267              : 
     268            0 :   _app.add_option("-j,--json-config", _opts.config_name, "Trigger Activity and Candidate config JSON to use (required)")
     269              :     ->required()
     270            0 :     ->check(CLI::ExistingFile);
     271              :   
     272            0 :   _app.add_flag("--parallel", _opts.run_parallel, "Run the TAMakers in parallel");
     273              : 
     274            0 :   _app.add_flag("--quiet", _opts.quiet, "Quiet outputs.");
     275              : 
     276            0 :   _app.add_flag("--latencies", _opts.latencies, "Saves latencies per TP into csv");
     277              : 
     278            0 :   _app.add_option("-s,--slice-start", _opts.slice_start, "Inclusive lower bound for TimeSlice ID to process");
     279            0 :   _app.add_option("-n,--num-slices", _opts.num_slices, "Number of TimeSlices to process");
     280            0 : }
     281              : 
     282            0 : int main(int argc, char const *argv[])
     283              : {
     284              :   // Do all the CLI processing first
     285            0 :   CLI::App app{"Offline trigger TriggerActivity & TriggerCandidate emulatior"};
     286            0 :   Options opts{};
     287              : 
     288            0 :   parse_app(app, opts);
     289              : 
     290            0 :   try {
     291            0 :     app.parse(argc, argv);
     292              :   }
     293            0 :   catch (const CLI::ParseError &e) {
     294            0 :     return app.exit(e);
     295            0 :   }
     296              : 
     297              :   // Get the configuration file
     298            0 :   std::ifstream config_stream(opts.config_name);
     299            0 :   nlohmann::json config = nlohmann::json::parse(config_stream);
     300              : 
     301            0 :   if (!opts.quiet) {
     302            0 :     std::cout << "Files to process:\n";
     303            0 :     for (const std::string& file : opts.input_files) {
     304            0 :       std::cout << "- " << file << "\n";
     305              :     }
     306              :   }
     307              : 
     308              :   // Sort the files into a map writer_id::vector<HDF5>
     309            0 :   std::map<std::string, std::vector<std::shared_ptr<hdf5libs::HDF5RawDataFile>>> sorted_files =
     310            0 :     sort_files_per_writer(opts.input_files);
     311              : 
     312              :   // Get the available record_id range
     313            0 :   std::pair<uint64_t, uint64_t> recordid_range = get_available_slice_id_range(sorted_files, opts.quiet);
     314            0 :   std::pair<uint64_t, uint64_t> processing_range = recordid_range;
     315              : 
     316            0 :   if (opts.slice_start.has_value()) {
     317            0 :     processing_range.first = std::max(processing_range.first, opts.slice_start.value());
     318              :   }
     319              : 
     320            0 :   if (opts.num_slices.has_value()) {
     321            0 :     if (opts.num_slices.value() == 0) {
     322            0 :       throw std::runtime_error("Invalid --num-slices value: must be greater than 0");
     323              :     }
     324              : 
     325            0 :     uint64_t requested_end = processing_range.first + (opts.num_slices.value() - 1);
     326            0 :     if (requested_end < processing_range.first) {
     327            0 :       requested_end = std::numeric_limits<uint64_t>::max();
     328              :     }
     329              : 
     330            0 :     processing_range.second = std::min(processing_range.second, requested_end);
     331              :   }
     332              : 
     333            0 :   if (processing_range.first > processing_range.second) {
     334            0 :     throw std::runtime_error(fmt::format(
     335              :       "Requested timeslice subset (slice-start={}, num-slices={}) does not overlap available range [{}, {}]",
     336            0 :       opts.slice_start.value_or(0),
     337            0 :       opts.num_slices.value_or(0),
     338              :       recordid_range.first,
     339            0 :       recordid_range.second));
     340              :   }
     341              : 
     342            0 :   if (!opts.quiet) {
     343            0 :     std::cout << "Processing TimeSliceID range: [" << processing_range.first << ", " << processing_range.second << "]" << std::endl;
     344              :   }
     345              : 
     346              :   // Create the file handlers
     347            0 :   std::vector<std::unique_ptr<TAEmulationWorker>> ta_emu_workers;
     348            0 :   for (auto [name, files] : sorted_files) {
     349            0 :     ta_emu_workers.push_back(std::make_unique<TAEmulationWorker>(files, config, processing_range, opts.run_parallel, opts.quiet));
     350            0 :   }
     351              : 
     352              :   // Start each file handler
     353            0 :   for (const auto& handler : ta_emu_workers) {
     354            0 :     handler->start_processing();
     355              :   }
     356              : 
     357              :   // Output map of TA vectors & function that appends TAs to that vector
     358            0 :   std::map<uint64_t, std::vector<triggeralgs::TriggerActivity>> tas;
     359            0 :   auto append_tas = [&tas](std::map<uint64_t, std::vector<triggeralgs::TriggerActivity>>&& _tas) {
     360            0 :     for (auto& [sliceid, src_vec] : _tas) {
     361            0 :       auto& dest_vec = tas[sliceid];
     362            0 :       dest_vec.reserve(dest_vec.size() + src_vec.size());
     363              : 
     364            0 :       dest_vec.insert(dest_vec.end(), std::make_move_iterator(src_vec.begin()), std::make_move_iterator(src_vec.end()));
     365            0 :       src_vec.clear();
     366              :     }
     367            0 :   };
     368              : 
     369              :   // Output map of Fragment vectors & function that appends TAs to that vector
     370            0 :   std::map<uint64_t, std::vector<std::unique_ptr<daqdataformats::Fragment>>> frags;
     371            0 :   auto append_frags = [&frags](std::map<uint64_t, std::vector<std::unique_ptr<daqdataformats::Fragment>>>&& _frags) {
     372            0 :     for (auto& [sliceid, src_vec] : _frags) {
     373            0 :       auto& dest_vec = frags[sliceid];
     374              : 
     375            0 :       dest_vec.reserve(dest_vec.size() + src_vec.size());
     376              : 
     377            0 :       dest_vec.insert(dest_vec.end(), std::make_move_iterator(src_vec.begin()), std::make_move_iterator(src_vec.end()));
     378            0 :       src_vec.clear();
     379              :     }
     380            0 :   };
     381              : 
     382              :   // Iterate over the handlers, wait for them to complete their job & append
     383              :   // their TAs to our vector when ready.
     384            0 :   hdf5libs::HDF5SourceIDHandler::source_id_geo_id_map_t sourceid_geoid_map;
     385            0 :   for (const auto& handler : ta_emu_workers) {
     386              :     // Wait for all TAs to be made
     387            0 :     handler->wait_to_complete_work();
     388              : 
     389              :     // Append output TA/fragments
     390            0 :     append_tas(std::move(handler->get_tas()));
     391            0 :     append_frags(std::move(handler->get_frags()));
     392              : 
     393              :     // Get and merge the source ID map
     394            0 :     hdf5libs::HDF5SourceIDHandler::source_id_geo_id_map_t map = handler->get_sourceid_geoid_map();
     395            0 :     sourceid_geoid_map.insert(map.begin(), map.end());
     396            0 :   }
     397              : 
     398              :   // Sort the TAs in each slice before pushing them into the TCMaker
     399            0 :   size_t n_tas = 0;
     400            0 :   for (auto& [sliceid, vec_tas]: tas) {
     401            0 :     std::sort(vec_tas.begin(), vec_tas.end(),
     402            0 :         [](const triggeralgs::TriggerActivity& a, const triggeralgs::TriggerActivity& b) {
     403            0 :         return std::tie(a.time_start, a.channel_start, a.time_end) <
     404            0 :                std::tie(b.time_start, b.channel_start, b.time_end);
     405              :         });
     406            0 :     n_tas += vec_tas.size();
     407              :   }
     408              : 
     409            0 :   if (!opts.quiet) {
     410            0 :     std::cout << "Total number of TAs made: " << n_tas << std::endl;
     411            0 :     std::cout << "Creating a TCMaker..." << std::endl;
     412              :   }
     413              :   // Create the TC emulator
     414            0 :   std::string algo_name = config["trigger_candidate_plugin"][0];
     415            0 :   nlohmann::json algo_config = config["trigger_candidate_config"][0];
     416              : 
     417            0 :   std::unique_ptr<triggeralgs::TriggerCandidateMaker> tc_maker =
     418            0 :     triggeralgs::TriggerCandidateFactory::get_instance()->build_maker(algo_name);
     419            0 :   tc_maker->configure(algo_config);
     420              : 
     421            0 :   trgtools::TCEmulationUnit tc_emulator;
     422            0 :   tc_emulator.set_maker(tc_maker);
     423              : 
     424              :   // Emulate the TriggerCandidates
     425            0 :   std::vector<triggeralgs::TriggerCandidate> tcs;
     426            0 :   for (auto& [sliceid, vec_tas]: tas) {
     427            0 :     std::unique_ptr<daqdataformats::Fragment> tc_frag = tc_emulator.emulate_vector(vec_tas);
     428            0 :     if (!tc_frag) {
     429            0 :       continue;
     430              :     }
     431              : 
     432              :     // Manipulate the fragment header
     433            0 :     daqdataformats::FragmentHeader frag_hdr = tc_frag->get_header();
     434            0 :     frag_hdr.element_id = daqdataformats::SourceID{daqdataformats::SourceID::Subsystem::kTrigger, tc_frag->get_element_id().id+10000};
     435              : 
     436            0 :     tc_frag->set_header_fields(frag_hdr);
     437            0 :     tc_frag->set_type(daqdataformats::FragmentType::kTriggerCandidate);
     438            0 :     tc_frag->set_trigger_number(sliceid);
     439            0 :     tc_frag->set_window_begin(frags[sliceid][0]->get_window_begin());
     440            0 :     tc_frag->set_window_end(frags[sliceid][0]->get_window_end());
     441              : 
     442              :     // Push the fragment to our list
     443            0 :     frags[sliceid].push_back(std::move(tc_frag));
     444              : 
     445            0 :     std::vector<triggeralgs::TriggerCandidate> tmp_tcs = tc_emulator.get_last_output_buffer();
     446            0 :     tcs.reserve(tcs.size() + tmp_tcs.size());
     447            0 :     tcs.insert(tcs.end(), std::make_move_iterator(tmp_tcs.begin()), std::make_move_iterator(tmp_tcs.end()));
     448            0 :   }
     449            0 :   if (!opts.quiet) {
     450            0 :     std::cout << "Total number of TCs made: " << tcs.size() << std::endl;
     451              :   }
     452              : 
     453            0 :   if (!frags.empty()) {
     454            0 :     save_fragments(opts.output_filename, sourceid_geoid_map, std::move(frags), opts.quiet);
     455              :   } else {
     456            0 :     std::cout << "No TA/TC fragments generated. Output file will not be generated" << std::endl;
     457              :   }
     458              : 
     459            0 :   return 0;
     460            0 : }
        

Generated by: LCOV version 2.0-1