LCOV - code coverage report
Current view: top level - dfmodules/plugins - DataWriterModule.cpp (source / functions) Coverage Total Hit
Test: code.result Lines: 0.0 % 227 0
Test Date: 2026-02-16 10:18:04 Functions: 0.0 % 30 0

            Line data    Source code
       1              : /**
       2              :  * @file DataWriterModule.cpp DataWriterModule class implementation
       3              :  *
       4              :  * This is part of the DUNE DAQ Software Suite, copyright 2020.
       5              :  * Licensing/copyright details are in the COPYING file that you should have
       6              :  * received with this code.
       7              :  */
       8              : 
       9              : #include "DataWriterModule.hpp"
      10              : #include "dfmodules/CommonIssues.hpp"
      11              : #include "dfmodules/opmon/DataWriter.pb.h"
      12              : 
      13              : #include "appmodel/DataStoreConf.hpp"
      14              : #include "appmodel/DataWriterModule.hpp"
      15              : #include "appmodel/TRBModule.hpp"
      16              : #include "confmodel/Application.hpp"
      17              : #include "confmodel/Connection.hpp"
      18              : #include "confmodel/Session.hpp"
      19              : #include "daqdataformats/Fragment.hpp"
      20              : #include "dfmessages/TriggerDecision.hpp"
      21              : #include "dfmessages/TriggerRecord_serialization.hpp"
      22              : #include "iomanager/IOManager.hpp"
      23              : #include "logging/Logging.hpp"
      24              : #include "rcif/cmd/Nljs.hpp"
      25              : 
      26              : #include <algorithm>
      27              : #include <cstdlib>
      28              : #include <memory>
      29              : #include <string>
      30              : #include <thread>
      31              : #include <utility>
      32              : #include <vector>
      33              : 
      34              : /**
      35              :  * @brief Name used by TRACE TLOG calls from this source file
      36              :  */
      37              : // #define TRACE_NAME "DataWriterModule"                   // NOLINT This is the default
      38              : enum
      39              : {
      40              :   TLVL_ENTER_EXIT_METHODS = 5,
      41              :   TLVL_CONFIG = 7,
      42              :   TLVL_WORK_STEPS = 10,
      43              :   TLVL_SEQNO_MAP_CONTENTS = 13,
      44              :   TLVL_FRAGMENT_HEADER_DUMP = 17
      45              : };
      46              : 
      47              : namespace dunedaq {
      48              : namespace dfmodules {
      49              : 
      50            0 : DataWriterModule::DataWriterModule(const std::string& name)
      51              :   : dunedaq::appfwk::DAQModule(name)
      52            0 :   , m_queue_timeout(100)
      53            0 :   , m_data_storage_is_enabled(true)
      54            0 :   , m_thread(std::bind(&DataWriterModule::do_work, this, std::placeholders::_1))
      55              : {
      56            0 :   register_command("conf", &DataWriterModule::do_conf);
      57            0 :   register_command("start", &DataWriterModule::do_start);
      58            0 :   register_command("stop", &DataWriterModule::do_stop);
      59            0 :   register_command("scrap", &DataWriterModule::do_scrap);
      60            0 : }
      61              : 
      62              : void
      63            0 : DataWriterModule::init(std::shared_ptr<appfwk::ConfigurationManager> mcfg)
      64              : {
      65            0 :   TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering init() method";
      66            0 :   auto mdal = mcfg->get_dal<appmodel::DataWriterModule>(get_name());
      67            0 :   if (!mdal) {
      68            0 :     throw appfwk::CommandFailed(ERS_HERE, "init", get_name(), "Unable to retrieve configuration object");
      69              :   }
      70            0 :   auto iom = iomanager::IOManager::get();
      71              : 
      72            0 :   auto inputs = mdal->get_inputs();
      73            0 :   auto outputs = mdal->get_outputs();
      74              : 
      75            0 :   if (inputs.size() != 1) {
      76            0 :     throw appfwk::CommandFailed(ERS_HERE, "init", get_name(), "Expected 1 input, got " + std::to_string(inputs.size()));
      77              :   }
      78            0 :   if (outputs.size() != 1) {
      79            0 :     throw appfwk::CommandFailed(
      80            0 :       ERS_HERE, "init", get_name(), "Expected 1 output, got " + std::to_string(outputs.size()));
      81              :   }
      82              : 
      83            0 :   m_module_configuration = mcfg;
      84            0 :   m_data_writer_conf = mdal->get_configuration();
      85            0 :   m_writer_identifier = mdal->get_writer_identifier();
      86              : 
      87            0 :   if (inputs[0]->get_data_type() != datatype_to_string<std::unique_ptr<daqdataformats::TriggerRecord>>()) {
      88            0 :     throw InvalidQueueFatalError(ERS_HERE, get_name(), "TriggerRecord Input queue");
      89              :   }
      90            0 :   if (outputs[0]->get_data_type() != datatype_to_string<dfmessages::TriggerDecisionToken>()) {
      91            0 :     throw InvalidQueueFatalError(ERS_HERE, get_name(), "TriggerDecisionToken Output queue");
      92              :   }
      93              : 
      94            0 :   m_trigger_record_connection = inputs[0]->UID();
      95              : 
      96            0 :   auto modules = mcfg->get_modules();
      97            0 :   std::string trb_uid = "";
      98            0 :   bool is_trmon = false;
      99            0 :   for (auto& mod : modules) {
     100            0 :     if (mod->class_name() == "TRBModule") {
     101            0 :       trb_uid = mod->UID();
     102              :       break;
     103              :     }
     104            0 :     if (mod->class_name() == "TRMonRequestorModule") {
     105              :       is_trmon = true;
     106              :       break;
     107              :     }
     108              :   }
     109              : 
     110            0 :   if (!is_trmon) {
     111            0 :     auto trbdal = mcfg->get_dal<appmodel::TRBModule>(trb_uid);
     112            0 :     if (!trbdal) {
     113            0 :       throw appfwk::CommandFailed(ERS_HERE, "init", get_name(), "Unable to retrieve TRB configuration object");
     114              :     }
     115            0 :     for (auto con : trbdal->get_inputs()) {
     116            0 :       if (con->get_data_type() == datatype_to_string<dfmessages::TriggerDecision>()) {
     117            0 :         m_trigger_decision_connection = con->UID();
     118              :       }
     119              :     }
     120              :   }
     121              : 
     122              :   // try to create the receiver to see test the connection anyway
     123            0 :   m_tr_receiver = iom->get_receiver<std::unique_ptr<daqdataformats::TriggerRecord>>(m_trigger_record_connection);
     124              : 
     125            0 :   m_token_output = iom->get_sender<dfmessages::TriggerDecisionToken>(outputs[0]->UID());
     126              : 
     127            0 :   TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting init() method";
     128            0 : }
     129              : 
     130              : void
     131            0 : DataWriterModule::generate_opmon_data()
     132              : {
     133              : 
     134            0 :   opmon::DataWriterInfo dwi;
     135              : 
     136            0 :   dwi.set_records_received(m_records_received_tot.load());
     137              :   //   dwi.new_records_received = m_records_received.exchange(0);
     138            0 :   dwi.set_records_written(m_records_written_tot.load());
     139            0 :   dwi.set_new_records_written(m_records_written.exchange(0));
     140              :   //   dwi.bytes_output = m_bytes_output_tot.load();  MR: byte writing to be delegated to DataStorage
     141              :   //   dwi.new_bytes_output = m_bytes_output.exchange(0);
     142            0 :   dwi.set_writing_time_us(m_writing_us.exchange(0));
     143              : 
     144            0 :   publish(std::move(dwi));
     145            0 : }
     146              : 
     147              : void
     148            0 : DataWriterModule::do_conf(const CommandData_t&)
     149              : {
     150            0 :   TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering do_conf() method";
     151              : 
     152            0 :   m_data_storage_prescale = m_data_writer_conf->get_data_storage_prescale();
     153            0 :   TLOG_DEBUG(TLVL_CONFIG) << get_name() << ": data_storage_prescale is " << m_data_storage_prescale;
     154            0 :   TLOG_DEBUG(TLVL_CONFIG) << get_name() << ": data_store_parameters are "
     155            0 :                           << m_data_writer_conf->get_data_store_params();
     156            0 :   m_min_write_retry_time_usec = m_data_writer_conf->get_min_write_retry_time_ms() * 1000;
     157            0 :   if (m_min_write_retry_time_usec < 1) {
     158            0 :     m_min_write_retry_time_usec = 1;
     159              :   }
     160            0 :   m_max_write_retry_time_usec = m_data_writer_conf->get_max_write_retry_time_ms() * 1000;
     161            0 :   m_write_retry_time_increase_factor = m_data_writer_conf->get_write_retry_time_increase_factor();
     162              : 
     163              :   // create the DataStore instance here
     164            0 :   try {
     165            0 :     m_data_writer = make_data_store(m_data_writer_conf->get_data_store_params()->get_type(),
     166            0 :                                     m_data_writer_conf->get_data_store_params()->UID(),
     167            0 :                                     m_module_configuration,
     168            0 :                                     m_writer_identifier);
     169            0 :     register_node("data_writer", m_data_writer);
     170            0 :   } catch (const ers::Issue& excpt) {
     171            0 :     throw UnableToConfigure(ERS_HERE, get_name(), excpt);
     172            0 :   }
     173              : 
     174              :   // ensure that we have a valid dataWriter instance
     175            0 :   if (m_data_writer.get() == nullptr) {
     176            0 :     throw InvalidDataWriterModule(ERS_HERE, get_name());
     177              :   }
     178              : 
     179            0 :   TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting do_conf() method";
     180            0 : }
     181              : 
     182              : void
     183            0 : DataWriterModule::do_start(const CommandData_t& payload)
     184              : {
     185            0 :   TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering do_start() method";
     186              : 
     187            0 :   rcif::cmd::StartParams start_params = payload.get<rcif::cmd::StartParams>();
     188            0 :   m_data_storage_is_enabled = (!start_params.disable_data_storage);
     189            0 :   m_run_number = start_params.run;
     190              : 
     191            0 :   TLOG_DEBUG(TLVL_WORK_STEPS) << get_name() << ": Sending initial TriggerDecisionToken to DFO to announce my presence";
     192            0 :   dfmessages::TriggerDecisionToken token;
     193            0 :   token.run_number = 0;
     194            0 :   token.trigger_number = 0;
     195            0 :   token.decision_destination = m_trigger_decision_connection;
     196              : 
     197              :   int wasSentSuccessfully = 5;
     198            0 :   do {
     199            0 :     try {
     200            0 :       m_token_output->send(std::move(token), m_queue_timeout);
     201              :       wasSentSuccessfully = 0;
     202            0 :     } catch (const ers::Issue& excpt) {
     203            0 :       std::ostringstream oss_warn;
     204            0 :       oss_warn << "Send with sender \"" << m_token_output->get_name() << "\" failed";
     205            0 :       ers::warning(iomanager::OperationFailed(ERS_HERE, oss_warn.str(), excpt));
     206            0 :       wasSentSuccessfully--;
     207            0 :       std::this_thread::sleep_for(std::chrono::microseconds(5000));
     208            0 :     }
     209            0 :   } while (wasSentSuccessfully);
     210              : 
     211              :   // 04-Feb-2021, KAB: added this call to allow DataStore to prepare for the run.
     212              :   // I've put this call fairly early in this method because it could throw an
     213              :   // exception and abort the run start.  And, it seems sensible to avoid starting
     214              :   // threads, etc. if we throw an exception.
     215            0 :   if (m_data_storage_is_enabled) {
     216              : 
     217              :     // ensure that we have a valid dataWriter instance
     218            0 :     if (m_data_writer.get() == nullptr) {
     219              :       // this check is done essentially to notify the user
     220              :       // in case the "start" has been called before the "conf"
     221            0 :       ers::fatal(InvalidDataWriterModule(ERS_HERE, get_name()));
     222              :     }
     223              : 
     224            0 :     try {
     225            0 :       m_data_writer->prepare_for_run(m_run_number, (start_params.production_vs_test == "TEST"));
     226            0 :     } catch (const ers::Issue& excpt) {
     227            0 :       throw UnableToStart(ERS_HERE, get_name(), m_run_number, excpt);
     228            0 :     }
     229              :   }
     230              : 
     231            0 :   m_seqno_counts.clear();
     232              : 
     233            0 :   m_records_received = 0;
     234            0 :   m_records_received_tot = 0;
     235            0 :   m_records_written = 0;
     236            0 :   m_records_written_tot = 0;
     237            0 :   m_bytes_output = 0;
     238            0 :   m_bytes_output_tot = 0;
     239              : 
     240            0 :   m_running.store(true);
     241              : 
     242            0 :   m_thread.start_working_thread(get_name());
     243              :   // iomanager::IOManager::get()->add_callback<std::unique_ptr<daqdataformats::TriggerRecord>>(
     244              :   // m_trigger_record_connection, bind( &DataWriterModule::receive_trigger_record, this, std::placeholders::_1) );
     245              : 
     246            0 :   TLOG() << get_name() << " successfully started for run number " << m_run_number;
     247            0 :   TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting do_start() method";
     248            0 : }
     249              : 
     250              : void
     251            0 : DataWriterModule::do_stop(const CommandData_t& /*args*/)
     252              : {
     253            0 :   TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering do_stop() method";
     254              : 
     255            0 :   m_running.store(false);
     256            0 :   m_thread.stop_working_thread();
     257              :   // iomanager::IOManager::get()->remove_callback<std::unique_ptr<daqdataformats::TriggerRecord>>(
     258              :   // m_trigger_record_connection );
     259              : 
     260              :   // 04-Feb-2021, KAB: added this call to allow DataStore to finish up with this run.
     261              :   // I've put this call fairly late in this method so that any draining of queues
     262              :   // (or whatever) can take place before we finalize things in the DataStore.
     263            0 :   if (m_data_storage_is_enabled) {
     264            0 :     try {
     265            0 :       m_data_writer->finish_with_run(m_run_number);
     266            0 :     } catch (const std::exception& excpt) {
     267            0 :       ers::error(ProblemDuringStop(ERS_HERE, get_name(), m_run_number, excpt));
     268            0 :     }
     269              :   }
     270              : 
     271            0 :   TLOG() << get_name() << " successfully stopped for run number " << m_run_number;
     272            0 :   TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting do_stop() method";
     273            0 : }
     274              : 
     275              : void
     276            0 : DataWriterModule::do_scrap(const CommandData_t& /*payload*/)
     277              : {
     278            0 :   TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering do_scrap() method";
     279              : 
     280              :   // clear/reset the DataStore instance here
     281            0 :   m_data_writer.reset();
     282              : 
     283            0 :   TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting do_scrap() method";
     284            0 : }
     285              : 
     286              : void
     287            0 : DataWriterModule::receive_trigger_record(std::unique_ptr<daqdataformats::TriggerRecord>& trigger_record_ptr)
     288              : {
     289            0 :   TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": receiving a new TR ptr";
     290              : 
     291            0 :   ++m_records_received;
     292            0 :   ++m_records_received_tot;
     293            0 :   TLOG_DEBUG(TLVL_WORK_STEPS) << get_name() << ": Obtained the TriggerRecord for trigger number "
     294            0 :                               << trigger_record_ptr->get_header_ref().get_trigger_number() << "."
     295            0 :                               << trigger_record_ptr->get_header_ref().get_sequence_number() << ", run number "
     296            0 :                               << trigger_record_ptr->get_header_ref().get_run_number() << " off the input connection";
     297              : 
     298            0 :   if (trigger_record_ptr->get_header_ref().get_run_number() != m_run_number) {
     299            0 :     ers::error(InvalidRunNumber(ERS_HERE,
     300            0 :                                 get_name(),
     301              :                                 "TriggerRecord",
     302              :                                 trigger_record_ptr->get_header_ref().get_run_number(),
     303              :                                 m_run_number,
     304              :                                 trigger_record_ptr->get_header_ref().get_trigger_number(),
     305            0 :                                 trigger_record_ptr->get_header_ref().get_sequence_number()));
     306            0 :     return;
     307              :   }
     308              : 
     309              :   // 03-Feb-2021, KAB: adding support for a data-storage prescale.
     310              :   // In this "if" statement, I deliberately compare the result of (N mod prescale) to 1
     311              :   // instead of zero, since I think that it would be nice to always get the first event
     312              :   // written out.
     313            0 :   if (m_data_storage_prescale <= 1 || ((m_records_received_tot.load() % m_data_storage_prescale) == 1)) {
     314              : 
     315            0 :     if (m_data_storage_is_enabled) {
     316              : 
     317            0 :       std::chrono::steady_clock::time_point start_time = std::chrono::steady_clock::now();
     318              : 
     319            0 :       bool should_retry = true;
     320            0 :       size_t retry_wait_usec = m_min_write_retry_time_usec;
     321            0 :       do {
     322            0 :         should_retry = false;
     323            0 :         try {
     324            0 :           m_data_writer->write(*trigger_record_ptr);
     325            0 :           ++m_records_written;
     326            0 :           ++m_records_written_tot;
     327            0 :           m_bytes_output += trigger_record_ptr->get_total_size_bytes();
     328            0 :           m_bytes_output_tot += trigger_record_ptr->get_total_size_bytes();
     329            0 :         } catch (const RetryableDataStoreProblem& excpt) {
     330            0 :           should_retry = true;
     331            0 :           ers::error(DataWritingProblem(ERS_HERE,
     332            0 :                                         get_name(),
     333              :                                         trigger_record_ptr->get_header_ref().get_trigger_number(),
     334              :                                         trigger_record_ptr->get_header_ref().get_sequence_number(),
     335            0 :                                         trigger_record_ptr->get_header_ref().get_run_number(),
     336            0 :                                         excpt));
     337            0 :           if (retry_wait_usec > m_max_write_retry_time_usec) {
     338              :             retry_wait_usec = m_max_write_retry_time_usec;
     339              :           }
     340            0 :           usleep(retry_wait_usec);
     341            0 :           retry_wait_usec *= m_write_retry_time_increase_factor;
     342            0 :         } catch (const std::exception& excpt) {
     343            0 :           ers::error(DataWritingProblem(ERS_HERE,
     344            0 :                                         get_name(),
     345              :                                         trigger_record_ptr->get_header_ref().get_trigger_number(),
     346              :                                         trigger_record_ptr->get_header_ref().get_sequence_number(),
     347            0 :                                         trigger_record_ptr->get_header_ref().get_run_number(),
     348            0 :                                         excpt));
     349            0 :         }
     350            0 :       } while (should_retry && m_running.load());
     351              : 
     352            0 :       std::chrono::steady_clock::time_point end_time = std::chrono::steady_clock::now();
     353            0 :       auto writing_time = std::chrono::duration_cast<std::chrono::microseconds>(end_time - start_time);
     354            0 :       m_writing_us += writing_time.count();
     355              :     } //  if m_data_storage_is_enabled
     356              :   }
     357              : 
     358            0 :   bool send_trigger_complete_message = m_running.load();
     359            0 :   if (trigger_record_ptr->get_header_ref().get_max_sequence_number() > 0) {
     360            0 :     daqdataformats::trigger_number_t trigno = trigger_record_ptr->get_header_ref().get_trigger_number();
     361            0 :     if (m_seqno_counts.count(trigno) > 0) {
     362            0 :       ++m_seqno_counts[trigno];
     363              :     } else {
     364            0 :       m_seqno_counts[trigno] = 1;
     365              :     }
     366              :     // in the following comparison GT (>) is used since the counts are one-based and the
     367              :     // max sequence number is zero-based.
     368            0 :     if (m_seqno_counts[trigno] > trigger_record_ptr->get_header_ref().get_max_sequence_number()) {
     369            0 :       m_seqno_counts.erase(trigno);
     370              :     } else {
     371              :       // Using const .count and .at to avoid reintroducing element to map
     372            0 :       TLOG_DEBUG(TLVL_SEQNO_MAP_CONTENTS)
     373            0 :         << get_name() << ": the sequence number count for trigger number " << trigno << " is "
     374            0 :         << (m_seqno_counts.count(trigno) ? m_seqno_counts.at(trigno) : 0) << " (number of entries "
     375            0 :         << "in the seqno map is " << m_seqno_counts.size() << ").";
     376            0 :       send_trigger_complete_message = false;
     377              :     }
     378              :   }
     379            0 :   if (send_trigger_complete_message) {
     380            0 :     TLOG_DEBUG(TLVL_WORK_STEPS) << get_name() << ": Pushing the TriggerDecisionToken for trigger number "
     381            0 :                                 << trigger_record_ptr->get_header_ref().get_trigger_number()
     382            0 :                                 << " onto the relevant output queue";
     383            0 :     dfmessages::TriggerDecisionToken token;
     384            0 :     token.run_number = m_run_number;
     385            0 :     token.trigger_number = trigger_record_ptr->get_header_ref().get_trigger_number();
     386            0 :     token.decision_destination = m_trigger_decision_connection;
     387              : 
     388              :     bool wasSentSuccessfully = false;
     389            0 :     do {
     390            0 :       try {
     391            0 :         m_token_output->send(std::move(token), m_queue_timeout);
     392              :         wasSentSuccessfully = true;
     393            0 :       } catch (const ers::Issue& excpt) {
     394            0 :         std::ostringstream oss_warn;
     395            0 :         oss_warn << "Send with sender \"" << m_token_output->get_name() << "\" failed";
     396            0 :         ers::warning(iomanager::OperationFailed(ERS_HERE, oss_warn.str(), excpt));
     397            0 :       }
     398            0 :     } while (!wasSentSuccessfully && m_running.load());
     399            0 :   }
     400              : 
     401            0 :   TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": operations completed for TR";
     402              : } // NOLINT(readability/fn_size)
     403              : 
     404              : void
     405            0 : DataWriterModule::do_work(std::atomic<bool>& running_flag)
     406              : {
     407              :   // 27-Jan-2026, KAB: we want this code to drain all pending TriggerRecords from
     408              :   // the input queue at end-run time. So, we check if there is data in the queue
     409              :   // and continue the while loop if there is any.
     410            0 :   while (running_flag.load() || m_tr_receiver->data_pending()) {
     411            0 :     try {
     412            0 :       std::unique_ptr<daqdataformats::TriggerRecord> tr = m_tr_receiver->receive(std::chrono::milliseconds(10));
     413            0 :       receive_trigger_record(tr);
     414            0 :     } catch (const iomanager::TimeoutExpired& excpt) {
     415            0 :     } catch (const ers::Issue& excpt) {
     416            0 :       ers::warning(excpt);
     417            0 :     }
     418              :   }
     419            0 : }
     420              : 
     421              : } // namespace dfmodules
     422              : } // namespace dunedaq
     423              : 
     424            0 : DEFINE_DUNE_DAQ_MODULE(dunedaq::dfmodules::DataWriterModule)
        

Generated by: LCOV version 2.0-1