LCOV - code coverage report
Current view: top level - trgtools/apps - emulate_from_tpstream.cxx (source / functions) Coverage Total Hit
Test: code.result Lines: 0.0 % 186 0
Test Date: 2025-12-21 13:07:08 Functions: 0.0 % 9 0

            Line data    Source code
       1              : #include "trgtools/EmulateTCUnit.hpp"
       2              : #include "trgtools/TAFileHandler.hpp"
       3              : 
       4              : #include "CLI/App.hpp"
       5              : #include "CLI/Config.hpp"
       6              : #include "CLI/Formatter.hpp"
       7              : 
       8              : #include <fmt/core.h>
       9              : #include <fmt/format.h>
      10              : #include <fmt/chrono.h>
      11              : #include <filesystem>
      12              : 
      13              : #include "hdf5libs/HDF5RawDataFile.hpp"
      14              : #include "hdf5libs/HDF5SourceIDHandler.hpp"
      15              : 
      16              : #include "triggeralgs/TriggerCandidateFactory.hpp"
      17              : 
      18              : //-----------------------------------------------------------------------------
      19              : 
      20              : using namespace dunedaq;
      21              : using namespace trgtools;
      22              : 
      23              : /**
      24              :  * @brief Saves fragments in timeslice format into output HDF5 file
      25              :  *
      26              :  * @param _outputfilename Name of the hdf5 file to save the output into
      27              :  * @param _sourceid_geoid_map sourceid--geoid map required to create a HDF5 file
      28              :  * @param _frags Map of fragments, with a vector of fragment pointers for each slice id
      29              :  * @param _quiet Do we want to quiet down the cout?
      30              :  */
      31            0 : void SaveFragments(const std::string& _outputfilename,
      32              :                    const hdf5libs::HDF5SourceIDHandler::source_id_geo_id_map_t& _sourceid_geoid_map,
      33              :                    std::map<uint64_t, std::vector<std::unique_ptr<daqdataformats::Fragment>>> _frags,
      34              :                    bool _quiet)
      35              : {
      36              :   // Create layout parameter object required for HDF5 creation
      37            0 :   hdf5libs::HDF5FileLayoutParameters layout_params;
      38              : 
      39              :   // Create HDF5 parameter path required for the layout
      40            0 :   hdf5libs::HDF5PathParameters params_trigger;
      41            0 :   params_trigger.detector_group_type = "Detector_Readout";
      42              :   /// @todo Maybe in the future we will want to emulate PDS TPs. 
      43            0 :   params_trigger.detector_group_name = "TPC";
      44            0 :   params_trigger.element_name_prefix = "Link";
      45            0 :   params_trigger.digits_for_element_number = 5;
      46              : 
      47              :   // Fill the HDF5 layout
      48            0 :   std::vector<hdf5libs::HDF5PathParameters> params;
      49            0 :   params.push_back(params_trigger);
      50            0 :   layout_params.record_name_prefix = "TimeSlice";
      51            0 :   layout_params.digits_for_record_number = 6;
      52            0 :   layout_params.digits_for_sequence_number = 0;
      53            0 :   layout_params.record_header_dataset_name = "TimeSliceHeader";
      54            0 :   layout_params.raw_data_group_name = "RawData";
      55            0 :   layout_params.view_group_name = "Views";
      56            0 :   layout_params.path_params_list = {params};
      57              : 
      58              :   // Create pointer to a new output HDF5 file
      59            0 :   std::unique_ptr<hdf5libs::HDF5RawDataFile> output_file = std::make_unique<hdf5libs::HDF5RawDataFile>(
      60            0 :       _outputfilename + ".hdf5",
      61            0 :       _frags.begin()->second[0]->get_run_number(),
      62            0 :       0,
      63              :       "emulate_from_tpstream",
      64              :       layout_params,
      65            0 :       _sourceid_geoid_map);
      66              : 
      67              :   // Iterate over the time slices & save all the fragments
      68            0 :   for (auto& [slice_id, vec_frags]: _frags) {
      69              :     // Create a new timeslice header
      70            0 :     daqdataformats::TimeSliceHeader tsh;
      71            0 :     tsh.timeslice_number = slice_id;
      72            0 :     tsh.run_number = vec_frags[0]->get_run_number();
      73            0 :     tsh.element_id = dunedaq::daqdataformats::SourceID(dunedaq::daqdataformats::SourceID::Subsystem::kTRBuilder, 0);
      74              : 
      75              :     // Create a new timeslice
      76            0 :     dunedaq::daqdataformats::TimeSlice ts(tsh);
      77            0 :     if (!_quiet) {
      78            0 :       std::cout << "Time slice number: " << slice_id << std::endl;
      79              :     }
      80              :     // Add the fragments to the timeslices
      81            0 :     for (std::unique_ptr<daqdataformats::Fragment>& frag_ptr: vec_frags) {
      82            0 :       if (!_quiet) {
      83            0 :         std::cout << "  Writing elementid: " << frag_ptr->get_element_id()  << " trigger number: " << frag_ptr->get_trigger_number() << " trigger_timestamp: " << frag_ptr->get_trigger_timestamp() << " window_begin: " << frag_ptr->get_window_begin() << " sequence_no: " << frag_ptr->get_sequence_number() << std::endl;
      84              :       }
      85            0 :       ts.add_fragment(std::move(frag_ptr));
      86              :     }
      87              : 
      88              :     // Write the timeslice to output file
      89            0 :     output_file->write(ts);
      90            0 :   }
      91            0 : }
      92              : 
      93              : /**
      94              :  * @brief Returns sorted map of HDF5 files per datawriter application
      95              :  *
      96              :  * @param _files: vector of strings corresponding to the input file paths
      97              :  */
      98              : std::map<std::string, std::vector<std::shared_ptr<hdf5libs::HDF5RawDataFile>>>
      99            0 : SortFilesPerWriter(const std::vector<std::string>& _files)
     100              : {
     101            0 :   std::map<std::string, std::vector<std::shared_ptr<hdf5libs::HDF5RawDataFile>>> files_sorted;
     102              : 
     103              :   // Put files into per-writer app groups
     104            0 :   for (const std::string& file : _files) {
     105            0 :     std::shared_ptr<hdf5libs::HDF5RawDataFile> fl = std::make_shared<hdf5libs::HDF5RawDataFile>(file);
     106            0 :     if (!fl->is_timeslice_type()) {
     107            0 :       throw std::runtime_error(fmt::format("ERROR: input file '{}' not of type 'TimeSlice'", file));
     108              :     }
     109              : 
     110            0 :     std::string application_name = fl->get_attribute<std::string>("application_name");
     111            0 :     files_sorted[application_name].push_back(fl);
     112            0 :   }
     113              : 
     114              :   // Sort files for each writer application individually
     115            0 :   for (auto& [app_name, vec_files]: files_sorted) {
     116              :     // Don't sort if we have 0 or 1 files in the application...
     117            0 :     if (vec_files.size() <= 1) {
     118            0 :       continue;
     119              :     }
     120              : 
     121              :     // Sort w.r.t. file index attribute
     122            0 :     std::sort(vec_files.begin(), vec_files.end(),
     123            0 :         [](const std::shared_ptr<hdf5libs::HDF5RawDataFile>& a, const std::shared_ptr<hdf5libs::HDF5RawDataFile>& b) {
     124            0 :         return a->get_attribute<size_t>("file_index") <
     125            0 :                b->get_attribute<size_t>("file_index");
     126              :         });
     127              :   }
     128              : 
     129            0 :   return files_sorted;
     130            0 : };
     131              : 
     132              : /**
     133              :  * @brief Retrieves the available slice ID range
     134              :  *
     135              :  * Finds the overlap in the slice ID range between the provided files, and
     136              :  * returns that overlap as an available range -- or crashes if there is a file
     137              :  * with a range that does not overlap.
     138              :  *
     139              :  * @todo: Rather than returning the sliceID range, should try to return a time range -- and have processors go off that.
     140              :  *
     141              :  * @param _files: a map of writer app names & vectors of HDF5 files from that application.
     142              :  * @param _quiet Do we want to quiet down the cout?
     143              :  */
     144              : std::pair<uint64_t, uint64_t>
     145            0 : GetAvailableSliceIDRange(const std::map<std::string, std::vector<std::shared_ptr<hdf5libs::HDF5RawDataFile>>>& _files,
     146              :                          bool _quiet)
     147              : {
     148            0 :   if (_files.empty()) {
     149            0 :     throw std::runtime_error("No files provided");
     150              :   }
     151              : 
     152            0 :   uint64_t global_start = std::numeric_limits<uint64_t>::min();
     153            0 :   uint64_t global_end = std::numeric_limits<uint64_t>::max();
     154              : 
     155              :   // Get the min & max record id per application, and the global
     156            0 :   for (auto& [appid, vec_files]: _files) {
     157            0 :     uint64_t app_start = std::numeric_limits<uint64_t>::max();
     158            0 :     uint64_t app_end = std::numeric_limits<uint64_t>::min();
     159              : 
     160              :     // Find min / max record id for this application
     161            0 :     for (auto& file : vec_files) {
     162            0 :       auto record_ids = file->get_all_record_ids();
     163            0 :       if (record_ids.empty()) {
     164            0 :         throw std::runtime_error(fmt::format("File from application {} contains no records.", appid));
     165              :       }
     166            0 :       app_start = std::min(app_start, record_ids.begin()->first);
     167            0 :       app_end = std::max(app_end, record_ids.rbegin()->first);
     168            0 :     }
     169              : 
     170              :     // Update the global min / max record id
     171            0 :     global_start = std::max(global_start, app_start);
     172            0 :     global_end = std::min(global_end, app_end);
     173              : 
     174            0 :     if (!_quiet) {
     175            0 :       std::cout << "Application: " << appid << " " << " TimeSliceID start: " << app_start << " end: " << app_end << std::endl;
     176              :     }
     177              :   }
     178              : 
     179            0 :   if (!_quiet) {
     180            0 :     std::cout << "Global start: " << global_start << " Global end: " << global_end << std::endl;
     181              :   }
     182            0 :   if (global_start > global_end) {
     183            0 :     throw std::runtime_error("One of the provided files' id range did not overlap with the rest. Please select files with overlapping TimeSlice IDs");
     184              :   }
     185              : 
     186              :   // Extra validation / error handling
     187            0 :   for (auto& [appid, vec_files]: _files) {
     188            0 :     for (auto& file : vec_files) {
     189            0 :       auto record_ids = file->get_all_record_ids();
     190              : 
     191            0 :       uint64_t file_start = record_ids.begin()->first;
     192            0 :       uint64_t file_end = record_ids.rbegin()->first;
     193            0 :       if ((file_start > global_end || file_end < global_start)) {
     194            0 :         uint64_t file_index = file->get_attribute<size_t>("file_index");
     195            0 :         throw std::runtime_error(fmt::format(
     196              :           "File from TPStreamWrite application '{}' (index '{}') has record id range [{}, {}], which does not overlap with global range [{}, {}].",
     197              :            appid, file_index, file_start, file_end, global_start, global_end
     198            0 :           ));
     199              :       }
     200            0 :     }
     201              :   }
     202              : 
     203            0 :   return {global_start, global_end};
     204              : }
     205              : 
     206              : /**
     207              :  * @brief Struct with available cli application options
     208              :  */
     209              : struct Options
     210              : {
     211              :   /// @brief vector of input filenames
     212              :   std::vector<std::string> input_files;
     213              :   /// @brief output filename
     214              :   std::string output_filename;
     215              :   /// @brief the configuration filename
     216              :   std::string config_name;
     217              :   /// @brief do we want to quiet down the cout? Default: no
     218              :   bool quiet = false;
     219              :   /// @brief do we want to measure latencies? Default: no
     220              :   /// @todo: Latencies currently not supported!
     221              :   bool latencies = false;
     222              :   /// @brief runs each TAMaker on a separate thread
     223              :   bool run_parallel = false;
     224              : };
     225              : 
     226              : /**
     227              :  * @brief Adds options to our CLI application
     228              :  * 
     229              :  * @param _app CLI application
     230              :  * @param _opts Struct with the available options
     231              :  */
     232            0 : void ParseApp(CLI::App& _app, Options& _opts)
     233              : {
     234            0 :   _app.add_option("-i,--input-files", _opts.input_files, "List of input files (required)")
     235              :     ->required()
     236            0 :     ->check(CLI::ExistingFile); // Validate that each file exists
     237              : 
     238            0 :   _app.add_option("-o,--output-file", _opts.output_filename, "Output file (required)")
     239            0 :     ->required(); // make the argument required
     240              : 
     241            0 :   _app.add_option("-j,--json-config", _opts.config_name, "Trigger Activity and Candidate config JSON to use (required)")
     242              :     ->required()
     243            0 :     ->check(CLI::ExistingFile);
     244              :   
     245            0 :   _app.add_flag("--parallel", _opts.run_parallel, "Run the TAMakers in parallel");
     246              : 
     247            0 :   _app.add_flag("--quiet", _opts.quiet, "Quiet outputs.");
     248              : 
     249            0 :   _app.add_flag("--latencies", _opts.latencies, "Saves latencies per TP into csv");
     250            0 : }
     251              : 
     252            0 : int main(int argc, char const *argv[])
     253              : {
     254              :   // Do all the CLI processing first
     255            0 :   CLI::App app{"Offline trigger TriggerActivity & TriggerCandidate emulatior"};
     256            0 :   Options opts{};
     257              : 
     258            0 :   ParseApp(app, opts);
     259              : 
     260            0 :   try {
     261            0 :     app.parse(argc, argv);
     262              :   }
     263            0 :   catch (const CLI::ParseError &e) {
     264            0 :     return app.exit(e);
     265            0 :   }
     266              : 
     267              :   // Get the configuration file
     268            0 :   std::ifstream config_stream(opts.config_name);
     269            0 :   nlohmann::json config = nlohmann::json::parse(config_stream);
     270              : 
     271            0 :   if (!opts.quiet) {
     272            0 :     std::cout << "Files to process:\n";
     273            0 :     for (const std::string& file : opts.input_files) {
     274            0 :       std::cout << "- " << file << "\n";
     275              :     }
     276              :   }
     277              : 
     278              :   // Sort the files into a map writer_id::vector<HDF5>
     279            0 :   std::map<std::string, std::vector<std::shared_ptr<hdf5libs::HDF5RawDataFile>>> sorted_files =
     280            0 :     SortFilesPerWriter(opts.input_files);
     281              : 
     282              :   // Get the available record_id range
     283            0 :   std::pair<uint64_t, uint64_t> recordid_range = GetAvailableSliceIDRange(sorted_files, opts.quiet);
     284              : 
     285              :   // Create the file handlers
     286            0 :   std::vector<std::unique_ptr<TAFileHandler>> file_handlers;
     287            0 :   for (auto [name, files] : sorted_files) {
     288            0 :     file_handlers.push_back(std::make_unique<TAFileHandler>(files, config, recordid_range, opts.run_parallel, opts.quiet));
     289            0 :   }
     290              : 
     291              :   // Start each file handler
     292            0 :   for (const auto& handler : file_handlers) {
     293            0 :     handler->start_processing();
     294              :   }
     295              : 
     296              :   // Output map of TA vectors & function that appends TAs to that vector
     297            0 :   std::map<uint64_t, std::vector<triggeralgs::TriggerActivity>> tas;
     298            0 :   auto append_tas = [&tas](std::map<uint64_t, std::vector<triggeralgs::TriggerActivity>>&& _tas) {
     299            0 :     for (auto& [sliceid, src_vec] : _tas) {
     300            0 :       auto& dest_vec = tas[sliceid];
     301            0 :       dest_vec.reserve(dest_vec.size() + src_vec.size());
     302              : 
     303            0 :       dest_vec.insert(dest_vec.end(), std::make_move_iterator(src_vec.begin()), std::make_move_iterator(src_vec.end()));
     304            0 :       src_vec.clear();
     305              :     }
     306            0 :   };
     307              : 
     308              :   // Output map of Fragment vectors & function that appends TAs to that vector
     309            0 :   std::map<uint64_t, std::vector<std::unique_ptr<daqdataformats::Fragment>>> frags;
     310            0 :   auto append_frags = [&frags](std::map<uint64_t, std::vector<std::unique_ptr<daqdataformats::Fragment>>>&& _frags) {
     311            0 :     for (auto& [sliceid, src_vec] : _frags) {
     312            0 :       auto& dest_vec = frags[sliceid];
     313              : 
     314            0 :       dest_vec.reserve(dest_vec.size() + src_vec.size());
     315              : 
     316            0 :       dest_vec.insert(dest_vec.end(), std::make_move_iterator(src_vec.begin()), std::make_move_iterator(src_vec.end()));
     317            0 :       src_vec.clear();
     318              :     }
     319            0 :   };
     320              : 
     321              :   // Iterate over the handlers, wait for them to complete their job & append
     322              :   // their TAs to our vector when ready.
     323            0 :   hdf5libs::HDF5SourceIDHandler::source_id_geo_id_map_t sourceid_geoid_map;
     324            0 :   for (const auto& handler : file_handlers) {
     325              :     // Wait for all TAs to be made
     326            0 :     handler->wait_to_complete_work();
     327              : 
     328              :     // Append output TA/fragments
     329            0 :     append_tas(std::move(handler->get_tas()));
     330            0 :     append_frags(std::move(handler->get_frags()));
     331              : 
     332              :     // Get and merge the source ID map
     333            0 :     hdf5libs::HDF5SourceIDHandler::source_id_geo_id_map_t map = handler->get_sourceid_geoid_map();
     334            0 :     sourceid_geoid_map.insert(map.begin(), map.end());
     335            0 :   }
     336              : 
     337              :   // Sort the TAs in each slice before pushing them into the TCMaker
     338            0 :   size_t n_tas = 0;
     339            0 :   for (auto& [sliceid, vec_tas]: tas) {
     340            0 :     std::sort(vec_tas.begin(), vec_tas.end(),
     341            0 :         [](const triggeralgs::TriggerActivity& a, const triggeralgs::TriggerActivity& b) {
     342            0 :         return std::tie(a.time_start, a.channel_start, a.time_end) <
     343            0 :                std::tie(b.time_start, b.channel_start, b.time_end);
     344              :         });
     345            0 :     n_tas += vec_tas.size();
     346              :   }
     347            0 :   if (!opts.quiet) {
     348            0 :     std::cout << "Total number of TAs made: " << n_tas << std::endl;
     349            0 :     std::cout << "Creating a TCMaker..." << std::endl;
     350              :   }
     351              :   // Create the TC emulator
     352            0 :   std::string algo_name = config["trigger_candidate_plugin"][0];
     353            0 :   nlohmann::json algo_config = config["trigger_candidate_config"][0];
     354              : 
     355            0 :   std::unique_ptr<triggeralgs::TriggerCandidateMaker> tc_maker =
     356            0 :     triggeralgs::TriggerCandidateFactory::get_instance()->build_maker(algo_name);
     357            0 :   tc_maker->configure(algo_config);
     358              : 
     359            0 :   trgtools::EmulateTCUnit tc_emulator;
     360            0 :   tc_emulator.set_maker(tc_maker);
     361              : 
     362              :   // Emulate the TriggerCandidates
     363            0 :   std::vector<triggeralgs::TriggerCandidate> tcs;
     364            0 :   for (auto& [sliceid, vec_tas]: tas) {
     365            0 :     std::unique_ptr<daqdataformats::Fragment> tc_frag = tc_emulator.emulate_vector(vec_tas);
     366            0 :     if (!tc_frag) {
     367            0 :       continue;
     368              :     }
     369              : 
     370              :     // Manipulate the fragment header
     371            0 :     daqdataformats::FragmentHeader frag_hdr = tc_frag->get_header();
     372            0 :     frag_hdr.element_id = daqdataformats::SourceID{daqdataformats::SourceID::Subsystem::kTrigger, tc_frag->get_element_id().id+10000};
     373              : 
     374            0 :     tc_frag->set_header_fields(frag_hdr);
     375            0 :     tc_frag->set_type(daqdataformats::FragmentType::kTriggerCandidate);
     376            0 :     tc_frag->set_trigger_number(sliceid);
     377            0 :     tc_frag->set_window_begin(frags[sliceid][0]->get_window_begin());
     378            0 :     tc_frag->set_window_end(frags[sliceid][0]->get_window_end());
     379              : 
     380              :     // Push the fragment to our list
     381            0 :     frags[sliceid].push_back(std::move(tc_frag));
     382              : 
     383            0 :     std::vector<triggeralgs::TriggerCandidate> tmp_tcs = tc_emulator.get_last_output_buffer();
     384            0 :     tcs.reserve(tcs.size() + tmp_tcs.size());
     385            0 :     tcs.insert(tcs.end(), std::make_move_iterator(tmp_tcs.begin()), std::make_move_iterator(tmp_tcs.end()));
     386            0 :   }
     387            0 :   if (!opts.quiet) {
     388            0 :     std::cout << "Total number of TCs made: " << tcs.size() << std::endl;
     389              :   }
     390              : 
     391            0 :   SaveFragments(opts.output_filename, sourceid_geoid_map, std::move(frags), opts.quiet);
     392              : 
     393            0 :   return 0;
     394            0 : }
        

Generated by: LCOV version 2.0-1