LCOV - code coverage report
Current view: top level - dfmodules/plugins - TPStreamWriterModule.cpp (source / functions) Coverage Total Hit
Test: code.result Lines: 0.0 % 193 0
Test Date: 2025-12-21 13:07:08 Functions: 0.0 % 26 0

            Line data    Source code
       1              : /**
       2              :  * @file TPStreamWriterModule.cpp TPStreamWriterModule class implementation
       3              :  *
       4              :  * This is part of the DUNE DAQ Software Suite, copyright 2020.
       5              :  * Licensing/copyright details are in the COPYING file that you should have
       6              :  * received with this code.
       7              :  */
       8              : 
       9              : #include "TPStreamWriterModule.hpp"
      10              : #include "dfmodules/CommonIssues.hpp"
      11              : #include "dfmodules/TPBundleHandler.hpp"
      12              : #include "dfmodules/opmon/TPStreamWriter.pb.h"
      13              : 
      14              : #include "appmodel/DataStoreConf.hpp"
      15              : #include "appmodel/TPStreamWriterModule.hpp"
      16              : #include "confmodel/Connection.hpp"
      17              : #include "confmodel/Session.hpp"
      18              : #include "iomanager/IOManager.hpp"
      19              : #include "daqdataformats/Fragment.hpp"
      20              : #include "daqdataformats/Types.hpp"
      21              : #include "logging/Logging.hpp"
      22              : #include "rcif/cmd/Nljs.hpp"
      23              : 
      24              : #include "boost/date_time/posix_time/posix_time.hpp"
      25              : 
      26              : #include <chrono>
      27              : #include <memory>
      28              : #include <sstream>
      29              : #include <string>
      30              : #include <utility>
      31              : #include <vector>
      32              : 
      33              : enum
      34              : {
      35              :   TLVL_ENTER_EXIT_METHODS = 5,
      36              :   TLVL_CONFIG = 7,
      37              : };
      38              : 
      39              : namespace dunedaq {
      40              : namespace dfmodules {
      41              : 
      42            0 : TPStreamWriterModule::TPStreamWriterModule(const std::string& name)
      43              :   : dunedaq::appfwk::DAQModule(name)
      44            0 :   , m_thread(std::bind(&TPStreamWriterModule::do_work, this, std::placeholders::_1))
      45            0 :   , m_queue_timeout(100)
      46              : {
      47            0 :   register_command("conf", &TPStreamWriterModule::do_conf);
      48            0 :   register_command("start", &TPStreamWriterModule::do_start);
      49            0 :   register_command("stop", &TPStreamWriterModule::do_stop);
      50            0 :   register_command("scrap", &TPStreamWriterModule::do_scrap);
      51            0 : }
      52              : 
      53              : void
      54            0 : TPStreamWriterModule::init(std::shared_ptr<appfwk::ConfigurationManager> mcfg)
      55              : {
      56            0 :   TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering init() method";
      57            0 :   auto mdal = mcfg->get_dal<appmodel::TPStreamWriterModule>(get_name());
      58            0 :   if (!mdal) {
      59            0 :     throw appfwk::CommandFailed(ERS_HERE, "init", get_name(), "Unable to retrieve configuration object");
      60              :   }
      61            0 :   assert(mdal->get_inputs().size() == 1);
      62            0 :   m_module_configuration = mcfg;
      63            0 :   m_tpset_source = iomanager::IOManager::get()->get_receiver<trigger::TPSet>(mdal->get_inputs()[0]->UID());
      64            0 :   m_writer_identifier = mdal->get_writer_identifier();
      65            0 :   m_tp_writer_conf = mdal->get_configuration();
      66            0 :   m_source_id = mdal->get_source_id();
      67            0 :   TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting init() method";
      68            0 : }
      69              : 
      70              : void
      71            0 : TPStreamWriterModule::generate_opmon_data() {
      72            0 :   opmon::TPStreamWriterInfo info;
      73              : 
      74            0 :   info.set_heartbeat_tpsets_received(m_heartbeat_tpsets.exchange(0));
      75            0 :   info.set_tpsets_with_tps_received(m_tpsets_with_tps.exchange(0));
      76            0 :   info.set_tps_received(m_tps_received.exchange(0));
      77            0 :   info.set_tps_written(m_tps_written.exchange(0));
      78            0 :   info.set_tps_discarded(m_tps_discarded.exchange(0));
      79            0 :   info.set_total_tps_received(m_total_tps_received.load());
      80            0 :   info.set_total_tps_written(m_total_tps_written.load());
      81            0 :   info.set_total_tps_discarded(m_total_tps_discarded.load());
      82            0 :   info.set_tardy_timeslice_max_seconds(m_tardy_timeslice_max_seconds.exchange(0.0));
      83            0 :   info.set_timeslices_written(m_timeslices_written.exchange(0));
      84            0 :   info.set_bytes_output(m_bytes_output.exchange(0));
      85              : 
      86            0 :   publish(std::move(info));
      87            0 : }
      88              : 
      89              : void
      90            0 : TPStreamWriterModule::do_conf(const CommandData_t&)
      91              : {
      92            0 :   TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering do_conf() method";
      93            0 :   m_accumulation_interval_ticks = m_tp_writer_conf->get_tp_accumulation_interval();
      94            0 :   m_accumulation_inactivity_time_before_write =
      95            0 :     std::chrono::milliseconds(static_cast<int>(1000*m_tp_writer_conf->get_tp_accumulation_inactivity_time_before_write_sec()));
      96            0 :   m_warn_user_when_tardy_tps_are_discarded = m_tp_writer_conf->get_warn_user_when_tardy_tps_are_discarded();
      97            0 :   m_accumulation_interval_seconds = ((double) m_accumulation_interval_ticks) / 62500000.0;
      98              : 
      99              :   // create the DataStore instance here
     100            0 :   try {
     101            0 :     m_data_writer = make_data_store(m_tp_writer_conf->get_data_store_params()->get_type(),
     102            0 :                                     m_tp_writer_conf->get_data_store_params()->UID(),
     103            0 :                                     m_module_configuration, m_writer_identifier);
     104            0 :     register_node("data_writer", m_data_writer);
     105            0 :   } catch (const ers::Issue& excpt) {
     106            0 :     throw UnableToConfigure(ERS_HERE, get_name(), excpt);
     107            0 :   }
     108              : 
     109              :   // ensure that we have a valid dataWriter instance
     110            0 :   if (m_data_writer.get() == nullptr) {
     111            0 :     throw InvalidDataWriterModule(ERS_HERE, get_name());
     112              :   }
     113              : 
     114            0 :   TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting do_conf() method";
     115            0 : }
     116              : 
     117              : void
     118            0 : TPStreamWriterModule::do_start(const CommandData_t& payload)
     119              : {
     120            0 :   TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering do_start() method";
     121            0 :   rcif::cmd::StartParams start_params = payload.get<rcif::cmd::StartParams>();
     122            0 :   m_run_number = start_params.run;
     123            0 :   m_total_tps_received.store(0);
     124            0 :   m_total_tps_written.store(0);
     125            0 :   m_total_tps_discarded.store(0);
     126              : 
     127              :   // 06-Mar-2022, KAB: added this call to allow DataStore to prepare for the run.
     128              :   // I've put this call fairly early in this method because it could throw an
     129              :   // exception and abort the run start.  And, it seems sensible to avoid starting
     130              :   // threads, etc. if we throw an exception.
     131            0 :   try {
     132            0 :     m_data_writer->prepare_for_run(m_run_number, (start_params.production_vs_test == "TEST"));
     133            0 :   } catch (const ers::Issue& excpt) {
     134            0 :     throw UnableToStart(ERS_HERE, get_name(), m_run_number, excpt);
     135            0 :   }
     136              : 
     137            0 :   m_thread.start_working_thread(get_name());
     138              : 
     139            0 :   TLOG() << get_name() << " successfully started for run number " << m_run_number;
     140            0 :   TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting do_start() method";
     141            0 : }
     142              : 
     143              : void
     144            0 : TPStreamWriterModule::do_stop(const CommandData_t& /*payload*/)
     145              : {
     146            0 :   TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering do_stop() method";
     147            0 :   m_thread.stop_working_thread();
     148              : 
     149              :   // 06-Mar-2022, KAB: added this call to allow DataStore to finish up with this run.
     150              :   // I've put this call fairly late in this method so that any draining of queues
     151              :   // (or whatever) can take place before we finalize things in the DataStore.
     152            0 :   try {
     153            0 :     m_data_writer->finish_with_run(m_run_number);
     154            0 :   } catch (const std::exception& excpt) {
     155            0 :     ers::error(ProblemDuringStop(ERS_HERE, get_name(), m_run_number, excpt));
     156            0 :   }
     157              : 
     158            0 :   TLOG() << get_name() << " successfully stopped for run number " << m_run_number;
     159            0 :   TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting do_stop() method";
     160            0 : }
     161              : 
     162              : void
     163            0 : TPStreamWriterModule::do_scrap(const CommandData_t& /*payload*/)
     164              : {
     165            0 :   TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering do_scrap() method";
     166              : 
     167              :   // clear/reset the DataStore instance here
     168            0 :   m_data_writer.reset();
     169              : 
     170            0 :   TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting do_scrap() method";
     171            0 : }
     172              : 
     173              : void
     174            0 : TPStreamWriterModule::do_work(std::atomic<bool>& running_flag)
     175              : {
     176            0 :   TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering do_work() method";
     177              : 
     178            0 :   using namespace std::chrono;
     179            0 :   size_t n_tpset_received = 0;
     180            0 :   auto start_time = steady_clock::now();
     181            0 :   daqdataformats::timestamp_t first_timestamp = 0;
     182            0 :   daqdataformats::timestamp_t last_timestamp = 0;
     183              : 
     184            0 :   TPBundleHandler tp_bundle_handler(m_accumulation_interval_ticks, m_run_number, m_accumulation_inactivity_time_before_write);
     185              : 
     186            0 :   bool possible_pending_data = true;
     187            0 :   size_t largest_timeslice_number = 0;
     188            0 :   while (running_flag.load() || possible_pending_data) {
     189            0 :     trigger::TPSet tpset;
     190            0 :     try {
     191            0 :       tpset = m_tpset_source->receive(m_queue_timeout);
     192            0 :       ++n_tpset_received;
     193              : 
     194            0 :       if (tpset.type == trigger::TPSet::Type::kHeartbeat) {
     195            0 :         ++m_heartbeat_tpsets;
     196            0 :         continue;
     197              :       }
     198              : 
     199            0 :       TLOG_DEBUG(21) << "Number of TPs in TPSet is " << tpset.objects.size() << ", Source ID is " << tpset.origin
     200            0 :                      << ", seqno is " << tpset.seqno << ", start timestamp is " << tpset.start_time << ", run number is "
     201            0 :                      << tpset.run_number << ", slice id is " << (tpset.start_time / m_accumulation_interval_ticks);
     202              : 
     203              :       // 30-Mar-2022, KAB: added test for matching run number.  This is to avoid getting
     204              :       // confused by TPSets that happen to be leftover in transit from one run to the
     205              :       // next (which we have observed in v2.10.x systems).
     206            0 :       if (tpset.run_number != m_run_number) {
     207            0 :         TLOG_DEBUG(22) << "Discarding TPSet with invalid run number " << tpset.run_number << " (current is "
     208            0 :                        << m_run_number << "),  Source ID is " << tpset.origin << ", seqno is " << tpset.seqno;
     209            0 :         continue;
     210            0 :       }
     211            0 :       ++m_tpsets_with_tps;
     212              : 
     213            0 :       size_t num_tps_in_tpset = tpset.objects.size();
     214            0 :       tp_bundle_handler.add_tpset(std::move(tpset));
     215            0 :       m_tps_received += num_tps_in_tpset;
     216            0 :       m_total_tps_received += num_tps_in_tpset;
     217            0 :       possible_pending_data = true;
     218            0 :     } catch (iomanager::ConnectionInstanceNotFound&) {
     219              :       // sleep for a little bit; and indicate no pending data, in case we never get a connection
     220              :       // and the run ends - we don't want to believe that there is pending data in that case.
     221            0 :       usleep(1000 * m_queue_timeout.count());
     222            0 :       possible_pending_data = false;
     223            0 :     } catch (iomanager::TimeoutExpired&) {
     224              :       // nothing special to do here, we'll simply let the rest of the code in this
     225              :       // while loop do its job
     226            0 :     }
     227              : 
     228            0 :     std::vector<std::unique_ptr<daqdataformats::TimeSlice>> list_of_timeslices;
     229            0 :     if (running_flag.load()) {
     230            0 :       list_of_timeslices = tp_bundle_handler.get_properly_aged_timeslices();
     231              :     } else {
     232            0 :       list_of_timeslices = tp_bundle_handler.get_all_remaining_timeslices();
     233            0 :       possible_pending_data = false;
     234              :     }
     235              : 
     236              :     // keep track of the largest timeslice number (for reporting on tardy ones)
     237            0 :     for (auto& timeslice_ptr : list_of_timeslices) {
     238            0 :       largest_timeslice_number = std::max(timeslice_ptr->get_header().timeslice_number, largest_timeslice_number);
     239              :     }
     240              : 
     241              :     // attempt to write out each TimeSlice
     242            0 :     for (auto& timeslice_ptr : list_of_timeslices) {
     243            0 :       daqdataformats::SourceID sid(daqdataformats::SourceID::Subsystem::kTRBuilder, m_source_id);
     244            0 :       timeslice_ptr->set_element_id(sid);
     245              : 
     246              :       // write the TSH and the fragments as a set of data blocks
     247            0 :       bool should_retry = true;
     248            0 :       size_t retry_wait_usec = 1000;
     249            0 :       do {
     250            0 :         should_retry = false;
     251            0 :         size_t number_of_tps = (timeslice_ptr->get_sum_of_fragment_payload_sizes() / sizeof(trgdataformats::TriggerPrimitive));
     252            0 :         try {
     253            0 :           m_data_writer->write(*timeslice_ptr);
     254            0 :           ++m_timeslices_written;
     255            0 :           m_bytes_output += timeslice_ptr->get_total_size_bytes();
     256            0 :           m_tps_written += number_of_tps;
     257            0 :           m_total_tps_written += number_of_tps;
     258            0 :         } catch (const RetryableDataStoreProblem& excpt) {
     259            0 :           should_retry = true;
     260            0 :           ers::error(DataWritingProblem(ERS_HERE,
     261            0 :                                         get_name(),
     262            0 :                                         timeslice_ptr->get_header().timeslice_number,
     263            0 :                                         timeslice_ptr->get_header().run_number,
     264              :                                         excpt));
     265            0 :           usleep(retry_wait_usec);
     266            0 :           retry_wait_usec = std::min(retry_wait_usec * 2, 1000000UL);
     267            0 :         } catch (const IgnorableDataStoreProblem& excpt) {
     268            0 :           int timeslice_number_diff = largest_timeslice_number - timeslice_ptr->get_header().timeslice_number;
     269            0 :           double seconds_too_late = m_accumulation_interval_seconds * timeslice_number_diff;
     270            0 :           m_tardy_timeslice_max_seconds = std::max(m_tardy_timeslice_max_seconds.load(), seconds_too_late);
     271            0 :           m_tps_discarded += number_of_tps;
     272            0 :           m_total_tps_discarded += number_of_tps;
     273            0 :           if (m_warn_user_when_tardy_tps_are_discarded) {
     274            0 :             std::ostringstream sid_list;
     275            0 :             bool first_frag = true;
     276            0 :             for (auto const& frag_ptr : timeslice_ptr->get_fragments_ref()) {
     277            0 :               if (first_frag) {first_frag = false;}
     278            0 :               else {sid_list << ",";}
     279            0 :               sid_list << frag_ptr->get_element_id().to_string();
     280              :             }
     281            0 :             ers::warning(TardyTPsDiscarded(ERS_HERE,
     282            0 :                                            get_name(),
     283            0 :                                            sid_list.str(),
     284            0 :                                            timeslice_ptr->get_header().timeslice_number,
     285              :                                            seconds_too_late));
     286            0 :           }
     287            0 :         } catch (const std::exception& excpt) {
     288            0 :           m_tps_discarded += number_of_tps;
     289            0 :           m_total_tps_discarded += number_of_tps;
     290            0 :           ers::error(DataWritingProblem(ERS_HERE,
     291            0 :                                         get_name(),
     292            0 :                                         timeslice_ptr->get_header().timeslice_number,
     293            0 :                                         timeslice_ptr->get_header().run_number,
     294              :                                         excpt));
     295            0 :         }
     296            0 :       } while (should_retry && running_flag.load());
     297              :     }
     298              : 
     299            0 :     if (first_timestamp == 0) {
     300            0 :       first_timestamp = tpset.start_time;
     301              :     }
     302            0 :     last_timestamp = tpset.start_time;
     303            0 :   } // while(running)
     304              : 
     305            0 :   auto end_time = steady_clock::now();
     306            0 :   auto time_ms = duration_cast<milliseconds>(end_time - start_time).count();
     307            0 :   float rate_hz = 1e3 * static_cast<float>(n_tpset_received) / time_ms;
     308            0 :   float inferred_clock_frequency = 1e3 * (last_timestamp - first_timestamp) / time_ms;
     309              : 
     310            0 :   TLOG() << "Received " << n_tpset_received << " TPSets in " << time_ms << "ms. " << rate_hz
     311            0 :          << " TPSet/s. Inferred clock frequency " << inferred_clock_frequency << "Hz";
     312            0 :   TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting do_work() method";
     313            0 : } // NOLINT Function length
     314              : 
     315              : } // namespace dfmodules
     316              : } // namespace dunedaq
     317              : 
     318            0 : DEFINE_DUNE_DAQ_MODULE(dunedaq::dfmodules::TPStreamWriterModule)
        

Generated by: LCOV version 2.0-1