LCOV - code coverage report
Current view: top level - trgtools/src - TAFileHandler.cpp (source / functions) Coverage Total Hit
Test: code.result Lines: 0.0 % 166 0
Test Date: 2025-12-21 13:07:08 Functions: 0.0 % 15 0

            Line data    Source code
       1              : #ifndef TRGTOOLS_TAFILEHANDLER_CPP_
       2              : #define TRGTOOLS_TAFILEHANDLER_CPP_
       3              : 
       4              : #include "trgtools/TAFileHandler.hpp"
       5              : 
       6              : namespace dunedaq::trgtools 
       7              : {
       8              : 
       9              : uint16_t TAFileHandler::m_id_next = 0;
      10              : 
      11            0 : TAFileHandler::TAFileHandler(std::vector<std::shared_ptr<hdf5libs::HDF5RawDataFile>> input_files,
      12              :                              nlohmann::json config,
      13              :                              std::pair<uint64_t, uint64_t> sliceid_range,
      14              :                              bool run_parallel,
      15            0 :                              bool quiet)
      16            0 :   : m_input_files(input_files),
      17            0 :     m_sliceid_range(sliceid_range),
      18            0 :     m_run_parallel(run_parallel),
      19            0 :     m_quiet(quiet),
      20            0 :     m_id(m_id_next++)
      21              : {
      22            0 :   std::string algo_name = config["trigger_activity_plugin"][0];
      23            0 :   nlohmann::json algo_config = config["trigger_activity_config"][0];
      24              : 
      25              :   // Get the input file
      26              :   // Extract the run number etc
      27            0 :   std::vector<daqdataformats::run_number_t> run_numbers;
      28            0 :   std::vector<size_t> file_indices;
      29            0 :   for (const auto& input_file : input_files) {
      30            0 :     if (std::find(run_numbers.begin(), run_numbers.end(),
      31            0 :         input_file->get_attribute<daqdataformats::run_number_t>("run_number")) ==
      32            0 :         run_numbers.end()) {
      33            0 :           run_numbers.push_back(input_file->get_attribute<daqdataformats::run_number_t>("run_number"));
      34              :     }
      35              : 
      36            0 :     if (std::find(file_indices.begin(), file_indices.end(),
      37            0 :         input_file->get_attribute<daqdataformats::run_number_t>("run_number")) ==
      38            0 :         file_indices.end()) {
      39            0 :           file_indices.push_back(input_file->get_attribute<size_t>("file_index"));
      40              :     }
      41              :   }
      42              : 
      43            0 :   std::string application_name = m_input_files.front()->get_attribute<std::string>("application_name");
      44              : 
      45            0 :   if (!m_quiet) {
      46            0 :     fmt::print("Run Numbers: {}\nFile Indices: {}\nApp name: '{}'\n", fmt::join(run_numbers, ","), fmt::join(file_indices, ","), application_name);
      47              :   }
      48              : 
      49              :   // std::set of record IDs (pair of record number & sequence number)
      50            0 :   auto records = m_input_files.front()->get_all_record_ids();
      51              : 
      52              :   // Extract the number of TAMakers to create
      53            0 :   daqdataformats::TimeSlice first_timeslice = m_input_files.front()->get_timeslice(*records.begin());
      54            0 :   std::vector<daqdataformats::SourceID> valid_sources = get_valid_sourceids(first_timeslice);
      55            0 :   fmt::print("Number of makers to make: {}\n", valid_sources.size());
      56              : 
      57            0 :   for (const daqdataformats::SourceID& sid : valid_sources) {
      58              :     // Create TAMaker
      59            0 :     std::unique_ptr<triggeralgs::TriggerActivityMaker> ta_maker =
      60            0 :       triggeralgs::TriggerActivityFactory::get_instance()->build_maker(algo_name);
      61            0 :     ta_maker->configure(algo_config);
      62              : 
      63              :     // Add it to the enulators
      64            0 :     m_ta_emulators[sid] = std::make_unique<trgtools::EmulateTAUnit>();
      65            0 :     m_ta_emulators[sid]->set_maker(ta_maker);
      66              : 
      67              :     // Create a worker thread per emulator
      68            0 :     if (m_run_parallel) {
      69            0 :       m_thread_pool.emplace_back(&TAFileHandler::worker_thread, this);
      70              :     }
      71            0 :   }
      72            0 : }
      73              : 
      74              : std::vector<daqdataformats::SourceID> 
      75            0 : TAFileHandler::get_valid_sourceids(daqdataformats::TimeSlice& _timeslice)
      76              : {
      77            0 :   const auto& fragments = _timeslice.get_fragments_ref();
      78              : 
      79            0 :   std::vector<daqdataformats::SourceID> ret;
      80            0 :   for (const auto& fragment : fragments) {
      81            0 :     if (fragment->get_fragment_type() != daqdataformats::FragmentType::kTriggerPrimitive) {
      82            0 :       continue;
      83              :     }
      84              : 
      85            0 :     daqdataformats::SourceID sourceid = fragment->get_element_id();
      86              : 
      87            0 :     ret.push_back(sourceid);
      88              :   }
      89              : 
      90            0 :   return ret;
      91            0 : }
      92              : 
      93              : hdf5libs::HDF5SourceIDHandler::source_id_geo_id_map_t
      94            0 : TAFileHandler::get_sourceid_geoid_map()
      95              : {
      96            0 :   if (!m_input_files.size()) {
      97            0 :     throw "Files not set yet!";
      98              :   }
      99              : 
     100            0 :   return m_input_files.front()->get_srcid_geoid_map();
     101              : }
     102              : 
     103            0 : void TAFileHandler::worker_thread()
     104              : {
     105            0 :   while (true) {
     106              :     /// Get a task from the queue (with locking)
     107            0 :     std::function<void()> task;
     108            0 :     {
     109            0 :       std::unique_lock<std::mutex> lock(m_queue_mutex);
     110            0 :       m_condition.wait(lock, [this]() {return m_stop || !m_task_queue.empty(); });
     111              : 
     112            0 :       if (m_stop && m_task_queue.empty()) {
     113            0 :         return;
     114              :       }
     115              : 
     116            0 :       task = std::move(m_task_queue.front());
     117            0 :       m_task_queue.pop();
     118            0 :     }
     119              : 
     120              :     // Run & complete a task
     121            0 :     task();
     122              : 
     123              :     // Notify that task was completed
     124            0 :     {
     125            0 :       std::lock_guard<std::mutex> lock(m_queue_mutex);
     126            0 :       --m_active_tasks;
     127            0 :       if (m_active_tasks == 0) {
     128            0 :         m_task_complete_condition.notify_all();
     129              :       }
     130            0 :     }
     131            0 :   }
     132              : }
     133              : 
     134            0 : void TAFileHandler::process_tasks()
     135              : {
     136              :   // Iterate over the input files
     137            0 :   for (auto& input_file: m_input_files) {
     138              :     // std::set of record IDs (pair of record number & sequence number)
     139            0 :     auto records = input_file->get_all_record_ids();
     140              : 
     141            0 :     for (const auto& record : records) {
     142            0 :       if (record.first < m_sliceid_range.first || record.first > m_sliceid_range.second) {
     143            0 :         if (!m_quiet)
     144            0 :           fmt::print("  Will not process RecordID {} because it's outside of our range!", record.first);
     145            0 :         continue;
     146              :       }
     147              : 
     148              :       // Get all the fragments
     149            0 :       daqdataformats::TimeSlice timeslice = input_file->get_timeslice(record);
     150            0 :       const auto& fragments = timeslice.get_fragments_ref();
     151              : 
     152              :       // Iterate over the fragments & process each fragment
     153            0 :       for (const auto& fragment : fragments) {
     154            0 :         daqdataformats::SourceID sid = fragment->get_element_id();
     155              : 
     156            0 :         if (!m_ta_emulators.contains(sid)) {
     157            0 :           continue;
     158              :         }
     159              : 
     160              :         // Pull tps out
     161            0 :         size_t n_tps = fragment->get_data_size()/SIZE_TP;
     162            0 :         if (!m_quiet) {
     163            0 :           fmt::print("  TP fragment size: {}\n", fragment->get_data_size());
     164            0 :           fmt::print("  Num TPs: {}\n", n_tps);
     165              :         }
     166              : 
     167              :         // Create a TP buffer
     168            0 :         std::vector<trgdataformats::TriggerPrimitive> tp_buffer;
     169              :         // Prepare the TP buffer, checking for time ordering
     170            0 :         tp_buffer.reserve(n_tps);
     171              : 
     172              :         // Populate the TP buffer
     173            0 :         trgdataformats::TriggerPrimitive* tp_array = static_cast<trgdataformats::TriggerPrimitive*>(fragment->get_data());
     174            0 :         uint64_t last_ts = 0;
     175            0 :         for(size_t tpid(0); tpid<n_tps; ++tpid) {
     176            0 :           auto& tp = tp_array[tpid];
     177            0 :           if (tp.time_start <= last_ts && !m_quiet) {
     178            0 :             fmt::print("  ERROR: {} {} ", +tp.time_start, last_ts );
     179              :           }
     180            0 :           tp_buffer.push_back(tp);
     181              :         }
     182              : 
     183            0 :         daqdataformats::FragmentHeader frag_hdr = fragment->get_header();
     184              : 
     185              :         // Customise the source id (add 1000 to id)
     186            0 :         frag_hdr.element_id = daqdataformats::SourceID{daqdataformats::SourceID::Subsystem::kTrigger, fragment->get_element_id().id+1000};
     187              : 
     188              :         // Either enqueue the task if using parallel processing, or execute the task now
     189            0 :         if (m_run_parallel) {
     190            0 :           enqueue_task([this, sid, record, frag_hdr, tp_buffer = std::move(tp_buffer)]() mutable {
     191            0 :             this->process_task(sid, record.first, frag_hdr, std::move(tp_buffer));
     192            0 :           });
     193              :         }
     194              :         else {
     195            0 :           this->process_task(sid, record.first, frag_hdr, std::move(tp_buffer));
     196              :         }
     197            0 :       }
     198              :       // If running in parallel, wait to process entire slice before we move to
     199              :       // the next one
     200            0 :       if (m_run_parallel) {
     201            0 :         wait_to_complete_tasks();
     202              :       }
     203            0 :     }
     204              : 
     205            0 :     size_t total = 0;
     206            0 :     for (auto& [key, vec_tas]: m_tas) {
     207            0 :       total += vec_tas.size();
     208              :     }
     209            0 :     std::cout << "We have a total of " << total << " TAs!" << std::endl;
     210            0 :   }
     211            0 : }
     212              : 
     213            0 : void TAFileHandler::start_processing()
     214              : {
     215            0 :   m_main_thread = std::thread(&TAFileHandler::process_tasks, this);
     216            0 : }
     217              : 
     218              : 
     219            0 : void TAFileHandler::enqueue_task(std::function<void()> task)
     220              : {
     221            0 :   {
     222            0 :     std::lock_guard<std::mutex> lock(m_queue_mutex);
     223            0 :     m_task_queue.push(std::move(task));
     224            0 :     ++m_active_tasks;
     225            0 :   }
     226            0 :   m_condition.notify_one();
     227            0 : }
     228              : 
     229            0 : void TAFileHandler::wait_to_complete_tasks()
     230              : {
     231            0 :   std::unique_lock<std::mutex> lock(m_queue_mutex);
     232            0 :   m_task_complete_condition.wait(lock, [this]() { return m_active_tasks == 0; });
     233            0 : }
     234              : 
     235            0 : void TAFileHandler::wait_to_complete_work()
     236              : {
     237              :   // Wait for the main threads to join
     238            0 :   m_main_thread.join();
     239            0 :   fmt::print("TAFileHandler_{} work completed\n", m_id);
     240              : 
     241              :   // Wait for the tasks to complete
     242            0 :   if (m_run_parallel) {
     243            0 :     wait_to_complete_tasks();
     244              : 
     245            0 :     {
     246            0 :       std::lock_guard<std::mutex> lock(m_queue_mutex);
     247            0 :       m_stop = true;
     248            0 :     }
     249            0 :     m_condition.notify_all();
     250              : 
     251            0 :     fmt::print("m_stop issued\n");
     252            0 :     for (std::thread& thread : m_thread_pool) {
     253            0 :       thread.join();
     254              :     }
     255              :   }
     256            0 : }
     257              : 
     258            0 : void TAFileHandler::process_task(daqdataformats::SourceID _source_id,
     259              :                                  uint64_t _rec,
     260              :                                  daqdataformats::FragmentHeader _header,
     261              :                                  std::vector<trgdataformats::TriggerPrimitive>&& _tps)
     262              : {
     263              :   // Get te last fragment
     264            0 :   std::unique_ptr<daqdataformats::Fragment> frag = m_ta_emulators[_source_id]->emulate_vector(_tps);
     265              : 
     266              :   // Don't do anything if no fragments found
     267            0 :   if (!frag) {
     268              :     return;
     269              :   }
     270              : 
     271              :   // Get all the TriggerActivities from the TA Emulator buffer
     272            0 :   std::vector<triggeralgs::TriggerActivity> ta_buffer = m_ta_emulators[_source_id]->get_last_output_buffer();
     273              : 
     274              :   // Don't continue if no TAs found
     275            0 :   size_t n_tas = ta_buffer.size();
     276            0 :   if (!n_tas) {
     277            0 :     return;
     278              :   }
     279              : 
     280            0 :   if (!m_quiet && n_tas) {
     281            0 :     fmt::print(" Found {} TAs!\n", n_tas);
     282              :   }
     283              : 
     284              :   // Set the fragment header & push into our output (with locking!)
     285            0 :   {
     286            0 :     if (m_run_parallel) {
     287            0 :       std::lock_guard<std::mutex> lock(m_savetps_mutex);
     288            0 :     }
     289            0 :     m_tas[_rec].reserve(m_tas[_rec].size() + ta_buffer.size());
     290            0 :     m_tas[_rec].insert(m_tas[_rec].end(), std::make_move_iterator(ta_buffer.begin()), std::make_move_iterator(ta_buffer.end()));
     291              : 
     292            0 :     frag->set_header_fields(_header);
     293            0 :     frag->set_type(daqdataformats::FragmentType::kTriggerActivity);
     294              : 
     295            0 :     m_ta_fragments[_rec].push_back(std::move(frag));
     296              :   }
     297            0 : }
     298              : 
     299            0 : std::map<uint64_t, std::vector<triggeralgs::TriggerActivity>> TAFileHandler::get_tas()
     300              : {
     301            0 :   return std::move(m_tas);
     302              : }
     303              : 
     304            0 : std::map<uint64_t, std::vector<std::unique_ptr<daqdataformats::Fragment>>> TAFileHandler::get_frags()
     305              : {
     306            0 :   return std::move(m_ta_fragments);
     307              : }
     308              : 
     309              : 
     310              : }; // namespace dunedaq::trgtools
     311              : 
     312              : #endif //TRGTOOLS_TAFILEHANDLER_CXX_
        

Generated by: LCOV version 2.0-1