LCOV - code coverage report
Current view: top level - trigger/plugins - TPReplayModule.cpp (source / functions) Coverage Total Hit
Test: code.result Lines: 0.0 % 296 0
Test Date: 2025-12-21 13:07:08 Functions: 0.0 % 61 0

            Line data    Source code
       1              : /**
       2              :  * @file TPReplayModule.cpp
       3              :  *
       4              :  * This is part of the DUNE DAQ Application Framework, copyright 2020.
       5              :  * Licensing/copyright details are in the COPYING file that you should have
       6              :  * received with this code.
       7              :  */
       8              : 
       9              : #include "TPReplayModule.hpp"
      10              : 
      11              : #include "trigger/Issues.hpp" // For TLVL_*
      12              : #include "trigger/TriggerPrimitiveTypeAdapter.hpp"
      13              : 
      14              : #include "iomanager/IOManager.hpp"
      15              : #include "logging/Logging.hpp"
      16              : #include "rcif/cmd/Nljs.hpp"
      17              : 
      18              : #include <algorithm>
      19              : #include <chrono>
      20              : #include <fstream>
      21              : #include <limits>
      22              : #include <memory>
      23              : #include <string>
      24              : #include <thread>
      25              : #include <vector>
      26              : 
      27              : using namespace triggeralgs;
      28              : 
      29              : namespace dunedaq::trigger {
      30              : 
      31            0 : TPReplayModule::TPReplayModule(const std::string& name)
      32              :   : DAQModule(name)
      33            0 :   , m_queue_timeout(100)
      34              : {
      35              :   // clang-format off
      36            0 :   register_command("conf",  &TPReplayModule::do_configure);
      37            0 :   register_command("start", &TPReplayModule::do_start);
      38            0 :   register_command("stop_trigger_sources",  &TPReplayModule::do_stop);
      39            0 :   register_command("scrap", &TPReplayModule::do_scrap);
      40              :   // clang-format on
      41            0 : }
      42              : 
      43              : void
      44            0 : TPReplayModule::init(std::shared_ptr<appfwk::ConfigurationManager> mcfg)
      45              : {
      46            0 :   TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering init() method";
      47              : 
      48              :   // ### Access configuration
      49            0 :   m_mtrg = mcfg->get_dal<appmodel::TPReplayModule>(get_name());
      50              : 
      51              :   // ### Extract relevant objects
      52              :   // Clock speed
      53            0 :   m_clocks_per_us = mcfg->get_session()->get_detector_configuration()->get_clock_speed_hz() /
      54              :                     double(1'000'000.0); // this is redundant but safer...
      55              : 
      56            0 :   TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting init() method";
      57            0 : }
      58              : 
      59              : void
      60            0 : TPReplayModule::do_configure(const CommandData_t& /*obj*/)
      61              : {
      62              : 
      63            0 :   TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering conf() method";
      64              : 
      65            0 :   m_conf = m_mtrg->get_configuration();
      66            0 :   if (!m_conf) {
      67            0 :     throw ReplayConfigurationProblem(ERS_HERE, get_name(), "Missing configuration!");
      68              :   }
      69              : 
      70              :   // Channel map
      71            0 :   m_channel_map_name = m_conf->get_channel_map();
      72            0 :   if (m_channel_map_name.empty()) {
      73            0 :     throw ReplayConfigurationProblem(ERS_HERE, get_name(), "No Channel map provided!");
      74              :   }
      75              : 
      76              :   // Valid Subdetectors (for now)
      77            0 :   m_validSubdetectors = { detdataformats::DetID::Subdetector::kHD_TPC,
      78              :                           detdataformats::DetID::Subdetector::kVD_BottomTPC,
      79              :                           detdataformats::DetID::Subdetector::kVD_TopTPC,
      80            0 :                           detdataformats::DetID::Subdetector::kNDLAr_TPC };
      81              : 
      82            0 :   TLOG() << "### REPLAY CONFIGURATION ###";
      83            0 :   TLOG() << "Will use channel map: " << m_channel_map_name;
      84            0 :   try {
      85            0 :     m_channel_map = dunedaq::detchannelmaps::make_tpc_map(m_channel_map_name);
      86            0 :   } catch (const detchannelmaps::ChannelMapCreationFailed& e) {
      87            0 :     ers::error(dunedaq::trigger::ReplayChannelMapProblem(ERS_HERE, get_name(), m_channel_map_name));
      88            0 :   }
      89              : 
      90              :   // Loops
      91            0 :   m_loops = m_conf->get_number_of_loops();
      92              : 
      93              :   // Plane filtering
      94            0 :   m_filter_planes_ids = std::set<int>(m_conf->get_filter_out_plane().begin(), m_conf->get_filter_out_plane().end());
      95            0 :   m_filter_planes = (m_filter_planes_ids.size() > 0) ? true : false;
      96              : 
      97            0 :   TLOG() << "Plane filtering: " << m_filter_planes;
      98            0 :   if (m_filter_planes) {
      99            0 :     TLOG() << "Planes to filter: ";
     100            0 :     for (auto plane : m_filter_planes_ids) {
     101            0 :       TLOG() << plane;
     102              :     }
     103              :   }
     104              : 
     105              :   // For each of the files that are specified in the config, we extract data and sort them.
     106              :   // Data is sorted by ROU -> Plane (if not filtered). Multiple files can contribute to each.
     107              :   // Then we create an outgoing sink for each unique ROU + plane combination.
     108              :   // We also keep track of the total timestamp range of all the streams, so we can keep
     109              :   // the timestamps of the multiple streams in sync when replaying,
     110              :   // even when they don't all start or end at the same time.
     111              : 
     112              :   // Output queues
     113            0 :   auto con = m_mtrg->get_outputs();
     114              : 
     115              :   // Global times
     116            0 :   m_earliest_first_tp_timestamp = std::numeric_limits<triggeralgs::timestamp_t>::max();
     117            0 :   m_latest_last_tp_timestamp = 0;
     118              : 
     119              :   // Loading sorted TP stream files
     120            0 :   for (auto& stream : m_conf->get_tp_streams()) {
     121            0 :     auto result = m_tpstream_files.insert(std::make_pair(stream->get_index(), stream->get_filename()));
     122            0 :     if (!result.second) {
     123            0 :       ers::error(dunedaq::trigger::ReplayStreamFileError(
     124            0 :         ERS_HERE, get_name(), stream->get_index(), stream->get_filename(), result.first->second));
     125              :     }
     126              :   }
     127              : 
     128              :   // Print grouped files
     129            0 :   TLOG() << "Files to use:";
     130            0 :   for (const auto& pair : m_tpstream_files) {
     131            0 :     std::cout << "Index: " << pair.first << ", Filename: " << pair.second << std::endl;
     132              :   }
     133              : 
     134              :   // Load data here
     135            0 :   m_all_tp_data = read_tps(m_tpstream_files);
     136            0 :   if (m_tpstream_files.size() == 0 || m_all_tp_data.size() == 0) {
     137            0 :     ers::error(dunedaq::trigger::ReplayNoValidFiles(ERS_HERE, get_name()));
     138              :   }
     139              : 
     140              :   // Data loaded and sorted.
     141              :   // Now we create streams.
     142            0 :   int global_iter = 0;
     143              :   // Loop over ROUs
     144            0 :   for (auto& [ROU, plane_map] : m_all_tp_data) {
     145            0 :     int plane_iter = 0;
     146              :     // Loop over Planes
     147            0 :     for (auto& [plane, vector_of_tps] : plane_map) {
     148              : 
     149            0 :       TLOG_DEBUG(1) << "Stream: " << (global_iter + plane_iter) << "; ROU: " << ROU << "; plane: " << plane
     150            0 :                     << "; TP sink is " << con[global_iter + plane_iter]->class_name() << "@"
     151            0 :                     << con[global_iter + plane_iter]->UID();
     152              : 
     153            0 :       TPStream this_stream{ get_iom_sender<std::vector<trigger::TriggerPrimitiveTypeAdapter>>(
     154            0 :                               con[global_iter + plane_iter]->UID()),
     155            0 :                             vector_of_tps };
     156              : 
     157            0 :       m_earliest_first_tp_timestamp =
     158            0 :         std::min(m_earliest_first_tp_timestamp, this_stream.tpvs.front().front().tp.time_start);
     159            0 :       m_latest_last_tp_timestamp = std::max(m_latest_last_tp_timestamp, this_stream.tpvs.back().back().tp.time_start);
     160              : 
     161            0 :       m_tp_streams.push_back(std::move(this_stream));
     162            0 :       plane_iter++;
     163            0 :     }
     164            0 :     global_iter += plane_iter;
     165              :   }
     166              : 
     167            0 :   TLOG() << "Total of " << m_tp_streams.size() << " TP streams.";
     168              : 
     169            0 :   TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting conf() method";
     170            0 : }
     171              : 
     172              : void
     173            0 : TPReplayModule::do_start(const CommandData_t& /*obj*/)
     174              : {
     175            0 :   TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering start() method";
     176              : 
     177            0 :   m_running_flag.store(true);
     178              : 
     179              :   // Reset opmon
     180            0 :   m_tp_made_count.store(0);
     181            0 :   m_tpv_made_count.store(0);
     182            0 :   m_tpv_failed_sent_count.store(0);
     183              : 
     184              :   // We need the wall-clock time at which we'll send out the TPs
     185              :   // with the earliest timestamp, so we can keep all of the output
     186              :   // streams in sync. We pick "now" plus a bit, to allow time for all
     187              :   // of the threads to start up
     188            0 :   auto earliest_timestamp_time = std::chrono::steady_clock::now() + std::chrono::milliseconds(100);
     189            0 :   m_run_start_time = std::chrono::steady_clock::now();
     190              : 
     191              :   // Start threads for each stream
     192            0 :   for (auto& stream : m_tp_streams) {
     193            0 :     m_threads.push_back(std::make_unique<std::thread>(&TPReplayModule::do_work,
     194            0 :                                                       this,
     195            0 :                                                       std::ref(m_running_flag),
     196            0 :                                                       std::ref(stream.tpvs),
     197            0 :                                                       std::ref(stream.tp_sink),
     198              :                                                       earliest_timestamp_time));
     199              :   }
     200            0 :   for (size_t i = 0; i < m_threads.size(); i++) {
     201            0 :     std::string name("tpreplay-");
     202            0 :     name += std::to_string(i);
     203            0 :     pthread_setname_np(m_threads[i]->native_handle(), name.c_str());
     204            0 :   }
     205            0 :   TLOG() << "Total of " << m_threads.size() << " replay threads.";
     206              : 
     207            0 :   TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting start() method";
     208            0 : }
     209              : 
     210              : void
     211            0 : TPReplayModule::do_stop(const CommandData_t& /*args*/)
     212              : {
     213            0 :   TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering stop() method";
     214              : 
     215            0 :   m_running_flag.store(false);
     216            0 :   for (auto& thr : m_threads) {
     217            0 :     if (thr != nullptr && thr->joinable()) {
     218            0 :       thr->join();
     219              :     }
     220              :   }
     221            0 :   m_threads.clear();
     222              : 
     223            0 :   auto run_end_time = std::chrono::steady_clock::now();
     224            0 :   auto time_ms = std::chrono::duration_cast<std::chrono::milliseconds>(run_end_time - m_run_start_time).count();
     225            0 :   float rate_hz = 1e3 * static_cast<float>(m_tpv_made_count) / time_ms;
     226              : 
     227            0 :   TLOG() << "### SUMMARY ###";
     228            0 :   TLOG() << "------------------------------";
     229            0 :   TLOG() << "Generated TP vectors: " << m_tpv_made_count;
     230            0 :   TLOG() << "Generated TPs: " << m_tp_made_count;
     231            0 :   TLOG() << "Time taken: " << time_ms << " ms";
     232            0 :   TLOG() << "Rate: " << rate_hz << " TP vectors/s";
     233            0 :   TLOG() << "Failed to push TP vectors: " << m_tpv_failed_sent_count;
     234            0 :   TLOG();
     235              : 
     236            0 :   TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting stop() method";
     237            0 : }
     238              : 
     239              : void
     240            0 : TPReplayModule::do_scrap(const CommandData_t& /*args*/)
     241              : {
     242            0 :   TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering scrap() method";
     243              : 
     244            0 :   m_tp_streams.clear();
     245            0 :   m_threads.clear();
     246            0 :   m_tpstream_files.clear();
     247            0 :   m_all_tp_data.clear();
     248            0 :   m_filter_planes_ids.clear();
     249            0 :   m_validSubdetectors.clear();
     250              : 
     251            0 :   m_channel_map.reset();
     252              : 
     253            0 :   TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting scrap() method";
     254            0 : }
     255              : 
     256              : void
     257            0 : TPReplayModule::generate_opmon_data()
     258              : {
     259            0 :   opmon::TPReplayModuleInfo info;
     260              : 
     261            0 :   info.set_tp_made_count(m_tp_made_count);
     262            0 :   info.set_tpv_made_count(m_tpv_made_count);
     263            0 :   info.set_tpv_failed_sent_count(m_tpv_failed_sent_count);
     264              : 
     265            0 :   this->publish(std::move(info));
     266            0 : }
     267              : 
     268              : // This is the heavy-lifting function of TPRM
     269              : // Goes over all provided TPstream files
     270              : // Does basic file checks
     271              : // Extracts needed data
     272              : // Sorts data per ROU and plane
     273              : // Additional data-related checks
     274              : // Plane filtering happens here
     275              : std::map<std::string, std::map<int, std::deque<std::vector<TriggerPrimitiveTypeAdapter>>>>
     276            0 : TPReplayModule::read_tps(std::map<int, std::string> m_tpstream_files)
     277              : {
     278              : 
     279            0 :   std::map<std::string, std::map<int, std::deque<std::vector<TriggerPrimitiveTypeAdapter>>>> all_data;
     280              : 
     281              :   // Loop over each file
     282            0 :   for (const auto& a_file : m_tpstream_files) {
     283            0 :     std::string filename = a_file.second;
     284              : 
     285              :     // Check file exists
     286            0 :     std::unique_ptr<hdf5libs::HDF5RawDataFile> input_file;
     287            0 :     try {
     288            0 :       input_file = std::make_unique<hdf5libs::HDF5RawDataFile>(filename);
     289            0 :     } catch (const hdf5libs::FileOpenFailed& e) {
     290            0 :       ers::error(dunedaq::trigger::ReplayFileProblem(ERS_HERE, get_name(), filename));
     291            0 :       return {};
     292            0 :     }
     293              : 
     294              :     // Check that the file is a TimeSlice type
     295            0 :     if (!input_file->is_timeslice_type()) {
     296            0 :       ers::error(dunedaq::trigger::BadTPInputFile(ERS_HERE, get_name(), filename));
     297            0 :       return {};
     298              :     }
     299              : 
     300            0 :     std::vector<std::string> fragment_paths = input_file->get_all_fragment_dataset_paths();
     301              :     // Check there are fragments
     302            0 :     if (fragment_paths.empty()) {
     303            0 :       ers::error(dunedaq::trigger::ReplayNoFragments(ERS_HERE, get_name(), filename));
     304            0 :       return {};
     305              :     }
     306              : 
     307              :     // Local counters
     308            0 :     std::set<std::string> local_rous;
     309            0 :     std::set<int> local_planes;
     310            0 :     int local_tp_vectors = 0;
     311            0 :     int local_tps = 0;
     312              : 
     313              :     // Loop over paths/fragments
     314            0 :     for (const auto& path : fragment_paths) {
     315            0 :       std::unique_ptr<daqdataformats::Fragment> frag = input_file->get_frag_ptr(path);
     316              : 
     317              :       // Check fragment has data
     318            0 :       auto frag_size = frag->get_data_size();
     319            0 :       if (frag_size == 0) {
     320            0 :         ers::error(dunedaq::trigger::ReplayEmptyFrag(ERS_HERE, get_name(), filename));
     321            0 :         continue;
     322              :       }
     323              : 
     324            0 :       trgdataformats::TriggerPrimitive* tp_array = static_cast<trgdataformats::TriggerPrimitive*>(frag->get_data());
     325            0 :       size_t num_tps = frag_size / sizeof(trgdataformats::TriggerPrimitive);
     326              : 
     327              :       // Check there is TP data
     328            0 :       if (num_tps < 1) {
     329            0 :         ers::error(dunedaq::trigger::ReplayNoValidTPs(ERS_HERE, get_name(), filename));
     330            0 :         continue;
     331              :       }
     332              : 
     333              :       // Store TPs
     334            0 :       auto& tp = tp_array[0];
     335              : 
     336              :       // Only select TPC TPs (for now)
     337            0 :       dunedaq::detdataformats::DetID::Subdetector subdet =
     338            0 :         static_cast<dunedaq::detdataformats::DetID::Subdetector>(tp.detid);
     339            0 :       if (!m_validSubdetectors.count(subdet)) {
     340            0 :         continue;
     341              :       }
     342              : 
     343              :       // Get ROU and plane
     344            0 :       std::string ROU;
     345            0 :       try {
     346            0 :         ROU = m_channel_map->get_element_name_from_offline_channel(tp.channel);
     347            0 :         local_rous.insert(ROU);
     348            0 :       } catch (...) {
     349            0 :         ers::error(dunedaq::trigger::ReplayROUError(ERS_HERE, get_name(), filename));
     350            0 :         continue;
     351            0 :       }
     352              : 
     353            0 :       int plane;
     354            0 :       try {
     355            0 :         plane = m_channel_map->get_plane_from_offline_channel(tp.channel);
     356            0 :         local_planes.insert(plane);
     357            0 :       } catch (...) {
     358            0 :         ers::error(dunedaq::trigger::ReplayPlaneError(ERS_HERE, get_name(), filename));
     359            0 :         continue;
     360            0 :       }
     361              : 
     362              :       // hack for APA1, basically making plane 1 collection plane
     363              :       // decide whether we want this here long term
     364            0 :       if (ROU == "APA_P02SU") {
     365            0 :         plane = (plane == 1) ? 2 : (plane == 2) ? 1 : plane;
     366              :       }
     367              : 
     368              :       // Check if this plane should be filtered
     369            0 :       if (std::find(m_filter_planes_ids.begin(), m_filter_planes_ids.end(), plane) != m_filter_planes_ids.end()) {
     370            0 :         continue;
     371              :       }
     372              : 
     373              :       // Extract trigger primitives
     374              :       // Create a vector of the correct size, and directly associate it with tp_array
     375            0 :       std::vector<TriggerPrimitiveTypeAdapter> tps(reinterpret_cast<TriggerPrimitiveTypeAdapter*>(tp_array),
     376            0 :                                                    reinterpret_cast<TriggerPrimitiveTypeAdapter*>(tp_array) + num_tps);
     377              : 
     378            0 :       if (tps.empty()) {
     379            0 :         continue;
     380              :       }
     381              : 
     382              :       // Efficient insertion into deque
     383            0 :       auto& data_deque = all_data[ROU][plane];
     384              : 
     385              :       // First insertion if deque is empty
     386            0 :       if (data_deque.empty()) {
     387            0 :         data_deque.push_back(std::move(tps));
     388              :       }
     389              :       // Append if already in order
     390            0 :       else if (data_deque.back().front().tp.time_start <= tps.front().tp.time_start) {
     391            0 :         data_deque.push_back(std::move(tps));
     392              :       }
     393              :       // Prepend if this vector is older than the first element
     394            0 :       else if (data_deque.front().front().tp.time_start >= tps.front().tp.time_start) {
     395            0 :         data_deque.push_front(std::move(tps));
     396              :       }
     397              :       // General case: Insert using binary search (O(log N) search + O(1) insertion)
     398              :       else {
     399            0 :         auto insert_pos = std::lower_bound(
     400            0 :           data_deque.begin(),
     401            0 :           data_deque.end(),
     402              :           tps,
     403            0 :           [](const std::vector<TriggerPrimitiveTypeAdapter>& a, const std::vector<TriggerPrimitiveTypeAdapter>& b) {
     404            0 :             return a.front().tp.time_start < b.front().tp.time_start;
     405            0 :           });
     406              : 
     407            0 :         data_deque.insert(insert_pos, std::move(tps));
     408              :       }
     409            0 :       data_deque.back().shrink_to_fit(); // Reclaims unused memory
     410            0 :       frag.reset();
     411              : 
     412            0 :       local_tp_vectors++;
     413            0 :       local_tps += num_tps;
     414            0 :     } // frags loop
     415              : 
     416            0 :     TLOG() << "Data loading summary (end of file):";
     417            0 :     TLOG() << "------------------------------";
     418            0 :     TLOG() << "File: " << filename;
     419            0 :     TLOG() << "ROUs: " << local_rous.size();
     420            0 :     TLOG() << "Planes: " << local_planes.size();
     421            0 :     TLOG() << "TP vectors: " << local_tp_vectors;
     422            0 :     TLOG() << "Total read TPs: " << local_tps;
     423            0 :     TLOG();
     424              : 
     425            0 :   } // files loop
     426              : 
     427            0 :   TLOG() << "Data loading summary (all):";
     428            0 :   TLOG() << "------------------------------";
     429            0 :   TLOG() << "Files: " << m_tpstream_files.size();
     430              :   // Loop through the map and print sizes
     431            0 :   for (const auto& [ROU, plane_map] : all_data) {
     432            0 :     TLOG() << "ROU: " << ROU << ", Number of planes: " << plane_map.size();
     433              : 
     434              :     // Loop through each plane for the current ROU
     435            0 :     for (const auto& [plane, vector_of_tps] : plane_map) {
     436            0 :       TLOG() << "  Plane: " << plane << ", Number of vectors: " << vector_of_tps.size();
     437              :     }
     438              :   }
     439              : 
     440            0 :   return all_data;
     441            0 : }
     442              : 
     443              : void
     444            0 : TPReplayModule::do_work(
     445              :   std::atomic<bool>& running_flag,
     446              :   std::deque<std::vector<TriggerPrimitiveTypeAdapter>>& tpvs,
     447              :   std::shared_ptr<iomanager::SenderConcept<std::vector<trigger::TriggerPrimitiveTypeAdapter>>>& tp_sink,
     448              :   std::chrono::steady_clock::time_point earliest_timestamp_time)
     449              : {
     450            0 :   TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering do_work() method";
     451              : 
     452            0 :   int current_iteration = 0; // NOLINT(build/unsigned)
     453              : 
     454            0 :   uint64_t prev_tpv_start_time = 0; // NOLINT(build/unsigned)
     455            0 :   auto prev_tpv_send_time = std::chrono::steady_clock::now();
     456              : 
     457            0 :   auto const total_stream_duration = m_latest_last_tp_timestamp - m_earliest_first_tp_timestamp;
     458            0 :   auto run_start_time = std::chrono::steady_clock::now();
     459              : 
     460              :   // Local counters
     461            0 :   int local_tp_made = 0;
     462            0 :   int local_tpv_made = 0;
     463            0 :   int local_tpv_failed = 0;
     464              : 
     465            0 :   while (running_flag.load()) {
     466              : 
     467              :     // Looping logic: exit if m_loops is set and we've reached the limit
     468            0 :     if ((m_loops != -1) && (current_iteration >= m_loops)) {
     469              :       break;
     470              :     }
     471              : 
     472              :     // Going over TP vectors
     473            0 :     for (auto& tpv : tpvs) {
     474              : 
     475            0 :       if (!running_flag.load()) {
     476              :         break;
     477              :       }
     478              : 
     479              :       // The argument `earliest_timestamp_time` is the wall-clock time
     480              :       // of the earliest first tpset timestamp in _any_ of the input
     481              :       // streams. So for the first TP vector we send out, we wait until
     482              :       // _this_ stream's first timestamp comes up
     483            0 :       auto wait_time_us = 0;
     484            0 :       std::chrono::steady_clock::time_point next_tpv_send_time;
     485            0 :       if (prev_tpv_start_time == 0) {
     486            0 :         wait_time_us = (tpv.front().tp.time_start - m_earliest_first_tp_timestamp) / m_clocks_per_us;
     487            0 :         next_tpv_send_time = earliest_timestamp_time + std::chrono::microseconds(wait_time_us);
     488              :       } else {
     489            0 :         wait_time_us = (tpv.front().tp.time_start - prev_tpv_start_time) / m_clocks_per_us;
     490            0 :         next_tpv_send_time = prev_tpv_send_time + std::chrono::microseconds(wait_time_us);
     491              :       }
     492              : 
     493              :       // Check running_flag periodically so we can stop punctually
     494            0 :       auto slice_period = std::chrono::microseconds(m_conf->get_maximum_wait_time_us());
     495            0 :       auto next_slice_send_time = prev_tpv_send_time + slice_period;
     496            0 :       bool break_flag = false;
     497            0 :       while (next_tpv_send_time > next_slice_send_time + slice_period) {
     498            0 :         if (!running_flag.load()) {
     499              :           break_flag = true;
     500              :           break;
     501              :         }
     502            0 :         std::this_thread::sleep_until(next_slice_send_time);
     503            0 :         next_slice_send_time = next_slice_send_time + slice_period;
     504              :       }
     505            0 :       if (!break_flag) {
     506            0 :         std::this_thread::sleep_until(next_tpv_send_time);
     507              :       }
     508              : 
     509              :       // Update times
     510            0 :       prev_tpv_send_time = next_tpv_send_time;
     511            0 :       prev_tpv_start_time = tpv.front().tp.time_start;
     512              : 
     513              :       // Update counters
     514            0 :       m_tpv_made_count++;
     515            0 :       m_tp_made_count += tpv.size();
     516            0 :       local_tpv_made++;
     517            0 :       local_tp_made += tpv.size();
     518              : 
     519              :       // Actually send data
     520            0 :       try {
     521              :         // Decide whether to move or copy based on loop count
     522            0 :         if (m_loops == 1) {
     523              :           // Only one loop: safe to move original data
     524            0 :           tp_sink->send(std::move(tpv), m_queue_timeout);
     525              :         } else {
     526              :           // Multiple or infinite loops: send a copy to preserve original
     527            0 :           auto tpv_copy = tpv;
     528            0 :           tp_sink->send(std::move(tpv_copy), m_queue_timeout);
     529            0 :         }
     530            0 :       } catch (const dunedaq::iomanager::TimeoutExpired& e) {
     531            0 :         ers::warning(e);
     532            0 :         m_tpv_failed_sent_count++;
     533            0 :         local_tpv_failed++;
     534            0 :       }
     535              : 
     536              :       // Increase timestamps in the TPs so they don't
     537              :       // repeat when we do multiple loops over the file
     538            0 :       bool will_repeat = (m_loops == -1) || (current_iteration + 1 < m_loops);
     539              :       if (will_repeat) {
     540            0 :         for (auto& tpa : tpv) {
     541            0 :           tpa.tp.time_start += total_stream_duration;
     542              :         }
     543              :       }
     544              : 
     545              :     } // end loop over tpsets
     546            0 :     ++current_iteration;
     547              : 
     548              :   } // end while(running_flag.load())
     549              : 
     550            0 :   auto run_end_time = std::chrono::steady_clock::now();
     551            0 :   auto time_ms = std::chrono::duration_cast<std::chrono::milliseconds>(run_end_time - run_start_time).count();
     552            0 :   float rate_hz = 1e3 * static_cast<float>(local_tpv_made) / time_ms;
     553              : 
     554            0 :   TLOG() << "Thread summary:";
     555            0 :   TLOG() << "------------------------------";
     556            0 :   TLOG() << "Sent TPs: " << local_tp_made;
     557            0 :   TLOG() << "TP vectors: " << local_tpv_made;
     558            0 :   TLOG() << "Time taken: " << time_ms << " ms";
     559            0 :   TLOG() << "Rate: " << rate_hz << " TP vectors/s";
     560            0 :   TLOG() << "Failed to push TP vectors: " << local_tpv_failed;
     561            0 :   TLOG();
     562              : 
     563            0 :   TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting do_work() method";
     564            0 : }
     565              : 
     566              : } // namespace dunedaq::trigger
     567              : 
     568            0 : DEFINE_DUNE_DAQ_MODULE(dunedaq::trigger::TPReplayModule)
        

Generated by: LCOV version 2.0-1