LCOV - code coverage report
Current view: top level - trgtools/apps - process_tpstream.cxx (source / functions) Coverage Total Hit
Test: code.result Lines: 0.0 % 187 0
Test Date: 2025-12-21 13:07:08 Functions: 0.0 % 10 0

            Line data    Source code
       1              : #include "trgtools/EmulateTAUnit.hpp"
       2              : #include "trgtools/EmulateTCUnit.hpp"
       3              : 
       4              : #include "CLI/App.hpp"
       5              : #include "CLI/Config.hpp"
       6              : #include "CLI/Formatter.hpp"
       7              : 
       8              : #include <fmt/core.h>
       9              : #include <fmt/format.h>
      10              : #include <fmt/chrono.h>
      11              : #include <filesystem>
      12              : 
      13              : #include "hdf5libs/HDF5RawDataFile.hpp"
      14              : #include "trgdataformats/TriggerPrimitive.hpp"
      15              : #include "triggeralgs/TriggerActivityFactory.hpp"
      16              : #include "triggeralgs/TriggerCandidateFactory.hpp"
      17              : #include "triggeralgs/TriggerObjectOverlay.hpp"
      18              : #include "detchannelmaps/TPCChannelMap.hpp"
      19              : 
      20              : using namespace dunedaq;
      21              : 
      22              : class TimeSliceProcessor
      23              : {
      24              : private:
      25              :   /* data */
      26              :   std::unique_ptr<hdf5libs::HDF5RawDataFile> m_input_file;
      27              :   std::unique_ptr<hdf5libs::HDF5RawDataFile> m_output_file;
      28              : 
      29              :   void open_files(std::string input_path, std::string output_path);
      30              :   void close_files();
      31              : 
      32              :   void process( daqdataformats::TimeSlice& tls );
      33              : 
      34              :   // Can modify?
      35              :   std::function<void(daqdataformats::TimeSlice&)> m_processor;
      36              : 
      37              : public:
      38              : 
      39              :   TimeSliceProcessor(std::string input_path, std::string output_path);
      40              :   ~TimeSliceProcessor();
      41              : 
      42              :   void set_processor(std::function<void(daqdataformats::TimeSlice&)> processor);
      43              :   void loop(uint64_t num_records = 0, uint64_t offset = 0, bool quiet = false);
      44              : 
      45              : };
      46              : 
      47              : //-----------------------------------------------------------------------------
      48            0 : TimeSliceProcessor::TimeSliceProcessor(std::string input_path, std::string output_path)
      49              : {
      50            0 :   this->open_files(input_path, output_path);
      51            0 : }
      52              : 
      53              : //-----------------------------------------------------------------------------
      54            0 : TimeSliceProcessor::~TimeSliceProcessor()
      55              : {
      56            0 :   this->close_files();
      57            0 : }
      58              : 
      59              : //-----------------------------------------------------------------------------
      60              : void
      61            0 : TimeSliceProcessor::open_files(std::string input_path, std::string output_path) {
      62              :   // Open input file
      63            0 :   m_input_file = std::make_unique<hdf5libs::HDF5RawDataFile>(input_path);
      64              : 
      65            0 :   if (!m_input_file->is_timeslice_type()) {
      66            0 :     fmt::print("ERROR: input file '{}' not of type 'TimeSlice'\n", input_path);
      67            0 :     throw std::runtime_error(fmt::format("ERROR: input file '{}' not of type 'TimeSlice'", input_path));
      68              :   }
      69              : 
      70            0 :   auto run_number = m_input_file->get_attribute<daqdataformats::run_number_t>("run_number");
      71            0 :   auto file_index = m_input_file->get_attribute<size_t>("file_index");
      72            0 :   auto application_name = m_input_file->get_attribute<std::string>("application_name");
      73              : 
      74            0 :   fmt::print("Run Number: {}\nFile Index: {}\nApp name: '{}'\n", run_number, file_index, application_name);
      75              : 
      76            0 :   if (!output_path.empty()) {
      77              :     // Open output file
      78            0 :     m_output_file = std::make_unique<hdf5libs::HDF5RawDataFile>(
      79              :       output_path,
      80            0 :       m_input_file->get_attribute<daqdataformats::run_number_t>("run_number"),
      81            0 :       m_input_file->get_attribute<size_t>("file_index"),
      82            0 :       m_input_file->get_attribute<std::string>("application_name"),
      83            0 :       m_input_file->get_file_layout().get_file_layout_params(),
      84            0 :       m_input_file->get_srcid_geoid_map()
      85            0 :     );
      86              :   }
      87            0 : }
      88              : 
      89              : //-----------------------------------------------------------------------------
      90              : void
      91            0 : TimeSliceProcessor::close_files() {
      92              :   // Do something?
      93            0 : }
      94              : 
      95              : //-----------------------------------------------------------------------------
      96              : void
      97            0 : TimeSliceProcessor::set_processor(std::function<void(daqdataformats::TimeSlice& )> processor) {
      98            0 :   m_processor = processor;
      99            0 : }
     100              : 
     101              : //-----------------------------------------------------------------------------
     102              : void
     103            0 : TimeSliceProcessor::process( daqdataformats::TimeSlice& tls ) {
     104            0 :   if (m_processor)
     105            0 :     m_processor(tls);
     106            0 : }
     107              : 
     108              : //-----------------------------------------------------------------------------
     109              : void
     110            0 : TimeSliceProcessor::loop(uint64_t num_records, uint64_t offset, bool quiet) {
     111              : 
     112              :   // Replace with a record selection?
     113            0 :   auto records = m_input_file->get_all_record_ids();
     114              : 
     115            0 :   if (!num_records) {
     116            0 :     num_records = (records.size()-offset);
     117              :   }
     118              : 
     119            0 :   uint64_t first_rec = offset, last_rec = offset+num_records;
     120              : 
     121            0 :   uint64_t i_rec(0);
     122            0 :   for( const auto& rid : records ) {
     123              : 
     124            0 :     if (i_rec < first_rec || i_rec >= last_rec ) {
     125            0 :       ++i_rec;
     126            0 :       continue;
     127              :     }
     128              : 
     129            0 :     if (!quiet)
     130            0 :       fmt::print("\n-- Processing TSL {}:{}\n\n", rid.first, rid.second);
     131            0 :     auto tsl = m_input_file->get_timeslice(rid);
     132              :     // Or filter on a selection here using a lambda?
     133              : 
     134              :     // if (!quiet)
     135              :       // fmt::print("TSL number {}\n", tsl.get_header().timeslice_number);
     136              : 
     137              :     // Add a process method
     138            0 :     this->process(tsl);
     139              : 
     140            0 :     if (m_output_file)
     141            0 :       m_output_file->write(tsl);
     142              : 
     143            0 :     ++i_rec;
     144            0 :     if(!quiet)
     145            0 :       fmt::print("\n-- Finished TSL {}:{}\n\n", rid.first, rid.second);
     146              : 
     147            0 :   }
     148              : 
     149            0 : }
     150              : 
     151              : 
     152              : //-----------------------------------------------------------------------------
     153            0 : int main(int argc, char const *argv[])
     154              : {
     155              : 
     156            0 :   CLI::App app{"tapipe"};
     157              :   // argv = app.ensure_utf8(argv);
     158              : 
     159            0 :   std::string input_file_path;
     160            0 :   app.add_option("-i", input_file_path, "Input TPStream file path")->required();
     161            0 :   std::string output_file_path;
     162            0 :   app.add_option("-o", output_file_path, "Output TPStream file path");
     163            0 :   std::string channel_map_name = "VDColdboxTPCChannelMap";
     164            0 :   app.add_option("-m", channel_map_name, "Detector Channel Map");
     165            0 :   std::string config_name;
     166            0 :   app.add_option("-j", config_name, "Trigger Activity and Candidate config JSON to use.")->required();
     167            0 :   uint64_t skip_rec(0);
     168            0 :   app.add_option("-s", skip_rec, "Skip records");
     169            0 :   uint64_t num_rec(0);
     170            0 :   app.add_option("-n", num_rec, "Process records");
     171              : 
     172            0 :   bool quiet = false;
     173            0 :   app.add_flag("--quiet", quiet, "Quiet outputs.");
     174              : 
     175            0 :   bool latencies = false;
     176            0 :   app.add_flag("--latencies", latencies, "Saves latencies per TP into csv");
     177            0 :   CLI11_PARSE(app, argc, argv);
     178              : 
     179              : 
     180            0 :   if (!quiet)
     181            0 :     fmt::print("TPStream file: {}\n", input_file_path);
     182              : 
     183            0 :   TimeSliceProcessor rp(input_file_path, output_file_path);
     184              : 
     185              :   // TP source id (subsystem)
     186            0 :   auto tp_subsystem_requirement = daqdataformats::SourceID::Subsystem::kTrigger;
     187              : 
     188            0 :   auto channel_map = dunedaq::detchannelmaps::make_tpc_map(channel_map_name);
     189              : 
     190              :   // Read configuration
     191            0 :   std::ifstream config_stream(config_name);
     192            0 :   nlohmann::json config = nlohmann::json::parse(config_stream);
     193              : 
     194              :   // Only use the first plugin for now.
     195            0 :   nlohmann::json ta_algo = config["trigger_activity_plugin"][0];
     196            0 :   nlohmann::json ta_config = config["trigger_activity_config"][0];
     197              : 
     198            0 :   nlohmann::json tc_algo = config["trigger_candidate_plugin"][0];
     199            0 :   nlohmann::json tc_config = config["trigger_candidate_config"][0];
     200              : 
     201              : 
     202              :   // Finally create a TA maker
     203            0 :   std::unique_ptr<triggeralgs::TriggerActivityMaker> ta_maker =
     204            0 :     triggeralgs::TriggerActivityFactory::get_instance()->build_maker(ta_algo);
     205            0 :   ta_maker->configure(ta_config);
     206            0 :   std::unique_ptr<trgtools::EmulateTAUnit> ta_emulator = std::make_unique<trgtools::EmulateTAUnit>();
     207            0 :   ta_emulator->set_maker(ta_maker);
     208              :   // TODO: Use a better file naming scheme for CSV.
     209            0 :   if (latencies) {
     210            0 :     std::filesystem::path output_path(output_file_path);
     211            0 :     ta_emulator->set_timing_file((output_path.parent_path() / ("ta_timings_" + output_path.stem().string() + ".csv")).string());
     212            0 :     ta_emulator->write_csv_header("TP Time Start,TP ADC Integral,Time Diffs,Is Last TP In TA");
     213            0 :   }
     214              : 
     215              :   // Finally create a TA maker
     216            0 :   std::unique_ptr<triggeralgs::TriggerCandidateMaker> tc_maker =
     217            0 :     triggeralgs::TriggerCandidateFactory::get_instance()->build_maker(tc_algo);
     218            0 :   tc_maker->configure(tc_config);
     219            0 :   std::unique_ptr<trgtools::EmulateTCUnit> tc_emulator = std::make_unique<trgtools::EmulateTCUnit>();
     220            0 :   tc_emulator->set_maker(tc_maker);
     221              :   // TODO: Use a better file naming scheme for CSV.
     222            0 :   if (latencies) {
     223            0 :     std::filesystem::path output_path(output_file_path);
     224            0 :     tc_emulator->set_timing_file((output_path.parent_path() / ("tc_timings_" + output_path.stem().string() + ".csv")).string());
     225            0 :     tc_emulator->write_csv_header("Time Diffs");
     226            0 :   }
     227              : 
     228              :   // Generic filter hook
     229            0 :   std::function<bool(const trgdataformats::TriggerPrimitive&)> tp_filter;
     230              : 
     231            0 :   auto z_plane_filter = [&]( const trgdataformats::TriggerPrimitive& tp ) -> bool {
     232            0 :     return (channel_map->get_plane_from_offline_channel(tp.channel) != 2);
     233            0 :   };
     234              : 
     235            0 :   tp_filter = z_plane_filter;
     236              : 
     237            0 :   rp.set_processor([&]( daqdataformats::TimeSlice& tsl ) -> void {
     238            0 :     const std::vector<std::unique_ptr<daqdataformats::Fragment>>& frags = tsl.get_fragments_ref();
     239            0 :     const size_t num_frags = frags.size();
     240            0 :     if(!quiet)
     241            0 :       fmt::print("The number of fragments: {}\n", num_frags);
     242              : 
     243            0 :     uint64_t average_ta_time = 0;
     244            0 :     uint64_t average_tc_time = 0;
     245              : 
     246            0 :     size_t num_tas = 0;
     247            0 :     size_t num_tcs = 0;
     248              : 
     249              :     // Need a static for-loop: adding fragments to tsl will mutate frags even though it's const.
     250            0 :     for (size_t i = 0; i < num_frags; i++) {
     251            0 :       const auto& frag = frags[i];
     252              : 
     253              :       // The fragment has to be for the trigger (not e.g. for retreival from readout)
     254            0 :       if (frag->get_element_id().subsystem != tp_subsystem_requirement) {
     255            0 :         if(!quiet)
     256            0 :           fmt::print("  Warning, got non kTrigger SourceID {}\n", frag->get_element_id().to_string());
     257            0 :         continue;
     258              :       }
     259              : 
     260              :       // The fragment has to be TriggerPrimitive
     261            0 :       if(frag->get_fragment_type() != daqdataformats::FragmentType::kTriggerPrimitive){
     262            0 :         if(!quiet)
     263            0 :           fmt::print("  Error: FragmentType is: {}!\n", fragment_type_to_string(frag->get_fragment_type()));
     264            0 :         continue;
     265              :       }
     266              : 
     267              :       // This bit should be outside the loop
     268            0 :       if (!quiet)
     269            0 :         fmt::print("  Fragment id: {} [{}]\n", frag->get_element_id().to_string(), daqdataformats::fragment_type_to_string(frag->get_fragment_type()));
     270              : 
     271              :       // Pull tps out
     272            0 :       size_t n_tps = frag->get_data_size()/sizeof(trgdataformats::TriggerPrimitive);
     273            0 :       if (!quiet) {
     274            0 :         fmt::print("  TP fragment size: {}\n", frag->get_data_size());
     275            0 :         fmt::print("  Num TPs: {}\n", n_tps);
     276              :       }
     277              : 
     278              :       // Create a TP buffer
     279            0 :       std::vector<trgdataformats::TriggerPrimitive> tp_buffer;
     280              :       // Prepare the TP buffer, checking for time ordering
     281            0 :       tp_buffer.reserve(tp_buffer.size()+n_tps);
     282              : 
     283              :       // Populate the TP buffer
     284            0 :       trgdataformats::TriggerPrimitive* tp_array = static_cast<trgdataformats::TriggerPrimitive*>(frag->get_data());
     285            0 :       uint64_t last_ts = 0;
     286            0 :       for(size_t i(0); i<n_tps; ++i) {
     287            0 :         auto& tp = tp_array[i];
     288            0 :         if (tp.time_start <= last_ts && !quiet) {
     289            0 :           fmt::print("  ERROR: {} {} ", +tp.time_start, last_ts );
     290              :         }
     291            0 :         tp_buffer.push_back(tp);
     292              :       }
     293              : 
     294              :       // Print some useful info
     295            0 :       uint64_t d_ts = tp_array[n_tps-1].time_start - tp_array[0].time_start;
     296            0 :       if (!quiet)
     297            0 :         fmt::print("  TS gap: {} {} ms\n", d_ts, d_ts*16.0/1'000'000);
     298              : 
     299              :       //
     300              :       // TA Processing
     301              :       //
     302              : 
     303            0 :       const auto ta_start = std::chrono::steady_clock::now();
     304            0 :       std::unique_ptr<daqdataformats::Fragment> ta_frag = ta_emulator->emulate_vector(tp_buffer);
     305            0 :       const auto ta_end = std::chrono::steady_clock::now();
     306              : 
     307            0 :       if (ta_frag == nullptr) // Buffer was empty.
     308            0 :         continue;
     309            0 :       num_tas += ta_emulator->get_last_output_buffer().size();
     310              : 
     311              :       // TA time calculation.
     312            0 :       const uint64_t ta_diff = std::chrono::nanoseconds(ta_end - ta_start).count();
     313            0 :       average_ta_time += ta_diff;
     314            0 :       if (!quiet) {
     315            0 :         fmt::print("\tTA Time Process: {} ns.\n", ta_diff);
     316              :       }
     317              : 
     318            0 :       daqdataformats::FragmentHeader frag_hdr = frag->get_header();
     319              : 
     320              :       // Customise the source id (add 1000 to id)
     321            0 :       frag_hdr.element_id = daqdataformats::SourceID{daqdataformats::SourceID::Subsystem::kTrigger, frag->get_element_id().id+1000};
     322              : 
     323            0 :       ta_frag->set_header_fields(frag_hdr);
     324            0 :       ta_frag->set_type(daqdataformats::FragmentType::kTriggerActivity);
     325              : 
     326              : 
     327            0 :       tsl.add_fragment(std::move(ta_frag));
     328              :       //
     329              :       // TA Processing Ends
     330              :       //
     331              : 
     332              :       //
     333              :       // TC Processing
     334              :       //
     335              : 
     336            0 :       std::vector<triggeralgs::TriggerActivity> ta_buffer = ta_emulator->get_last_output_buffer();
     337            0 :       const auto tc_start = std::chrono::steady_clock::now();
     338            0 :       std::unique_ptr<daqdataformats::Fragment> tc_frag = tc_emulator->emulate_vector(ta_buffer);
     339            0 :       const auto tc_end = std::chrono::steady_clock::now();
     340              : 
     341            0 :       if (tc_frag == nullptr) // Buffer was empty.
     342            0 :         continue;
     343            0 :       num_tcs += tc_emulator->get_last_output_buffer().size();
     344              : 
     345              :       // TC time calculation.
     346            0 :       const uint64_t tc_diff = std::chrono::nanoseconds(tc_end - tc_start).count();
     347            0 :       average_tc_time += tc_diff;
     348            0 :       if (!quiet) {
     349            0 :         fmt::print("\tTC Time Process: {} ns.\n", tc_diff);
     350              :       }
     351              : 
     352              :       // Shares the same frag_hdr.
     353            0 :       tc_frag->set_header_fields(frag_hdr);
     354            0 :       tc_frag->set_type(daqdataformats::FragmentType::kTriggerCandidate);
     355              : 
     356            0 :       tsl.add_fragment(std::move(tc_frag));
     357              : 
     358            0 :     } // Fragment for loop
     359              : 
     360            0 :     if (num_tas == 0) average_ta_time = 0;
     361            0 :     else average_ta_time /= num_tas;
     362            0 :     if (num_tcs == 0) average_tc_time = 0;
     363            0 :     else average_tc_time /= num_tcs;
     364            0 :     if (!quiet) {
     365            0 :       fmt::print("\t\tAverage TA Time Process ({} TAs): {} ns.\n", num_tas, average_ta_time);
     366            0 :       fmt::print("\t\tAverage TC Time Process ({} TCs): {} ns.\n", num_tcs, average_tc_time);
     367              :     }
     368            0 :   });
     369              : 
     370            0 :   rp.loop(num_rec, skip_rec);
     371              : 
     372              :   /* code */
     373            0 :   return 0;
     374            0 : }
        

Generated by: LCOV version 2.0-1