LCOV - code coverage report
Current view: top level - dfmodules/plugins - DataWriterModule.cpp (source / functions) Coverage Total Hit
Test: code.result Lines: 0.0 % 224 0
Test Date: 2025-12-21 13:07:08 Functions: 0.0 % 30 0

            Line data    Source code
       1              : /**
       2              :  * @file DataWriterModule.cpp DataWriterModule class implementation
       3              :  *
       4              :  * This is part of the DUNE DAQ Software Suite, copyright 2020.
       5              :  * Licensing/copyright details are in the COPYING file that you should have
       6              :  * received with this code.
       7              :  */
       8              : 
       9              : #include "DataWriterModule.hpp"
      10              : #include "dfmodules/CommonIssues.hpp"
      11              : #include "dfmodules/opmon/DataWriter.pb.h"
      12              : 
      13              : #include "confmodel/Application.hpp"
      14              : #include "confmodel/Session.hpp"
      15              : #include "appmodel/DataWriterModule.hpp"
      16              : #include "appmodel/TRBModule.hpp"
      17              : #include "appmodel/DataStoreConf.hpp"
      18              : #include "confmodel/Connection.hpp"
      19              : #include "daqdataformats/Fragment.hpp"
      20              : #include "dfmessages/TriggerDecision.hpp"
      21              : #include "dfmessages/TriggerRecord_serialization.hpp"
      22              : #include "logging/Logging.hpp"
      23              : #include "iomanager/IOManager.hpp"
      24              : #include "rcif/cmd/Nljs.hpp"
      25              : 
      26              : #include <algorithm>
      27              : #include <cstdlib>
      28              : #include <memory>
      29              : #include <string>
      30              : #include <thread>
      31              : #include <utility>
      32              : #include <vector>
      33              : 
      34              : /**
      35              :  * @brief Name used by TRACE TLOG calls from this source file
      36              :  */
      37              : //#define TRACE_NAME "DataWriterModule"                   // NOLINT This is the default
      38              : enum
      39              : {
      40              :   TLVL_ENTER_EXIT_METHODS = 5,
      41              :   TLVL_CONFIG = 7,
      42              :   TLVL_WORK_STEPS = 10,
      43              :   TLVL_SEQNO_MAP_CONTENTS = 13,
      44              :   TLVL_FRAGMENT_HEADER_DUMP = 17
      45              : };
      46              : 
      47              : namespace dunedaq {
      48              : namespace dfmodules {
      49              : 
      50            0 : DataWriterModule::DataWriterModule(const std::string& name)
      51              :   : dunedaq::appfwk::DAQModule(name)
      52            0 :   , m_queue_timeout(100)
      53            0 :   , m_data_storage_is_enabled(true)
      54            0 :   , m_thread(std::bind(&DataWriterModule::do_work, this, std::placeholders::_1))
      55              : {
      56            0 :   register_command("conf", &DataWriterModule::do_conf);
      57            0 :   register_command("start", &DataWriterModule::do_start);
      58            0 :   register_command("stop", &DataWriterModule::do_stop);
      59            0 :   register_command("scrap", &DataWriterModule::do_scrap);
      60            0 : }
      61              : 
      62              : void
      63            0 : DataWriterModule::init(std::shared_ptr<appfwk::ConfigurationManager> mcfg)
      64              : {
      65            0 :   TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering init() method";
      66            0 :   auto mdal = mcfg->get_dal<appmodel::DataWriterModule>(get_name());
      67            0 :   if (!mdal) {
      68            0 :     throw appfwk::CommandFailed(ERS_HERE, "init", get_name(), "Unable to retrieve configuration object");
      69              :   }
      70            0 :   auto iom = iomanager::IOManager::get();
      71              : 
      72            0 :   auto inputs = mdal->get_inputs();
      73            0 :   auto outputs = mdal->get_outputs();
      74              : 
      75            0 :   if (inputs.size() != 1) {
      76            0 :     throw appfwk::CommandFailed(ERS_HERE, "init", get_name(), "Expected 1 input, got " + std::to_string(inputs.size()));
      77              :   }
      78            0 :   if (outputs.size() != 1) {
      79            0 :     throw appfwk::CommandFailed(
      80            0 :       ERS_HERE, "init", get_name(), "Expected 1 output, got " + std::to_string(outputs.size()));  
      81              :   }
      82              : 
      83            0 :   m_module_configuration = mcfg;
      84            0 :   m_data_writer_conf = mdal->get_configuration();
      85            0 :   m_writer_identifier = mdal->get_writer_identifier();
      86              : 
      87            0 :   if (inputs[0]->get_data_type() != datatype_to_string<std::unique_ptr<daqdataformats::TriggerRecord>>()) {
      88            0 :     throw InvalidQueueFatalError(ERS_HERE, get_name(), "TriggerRecord Input queue"); 
      89              :   }
      90            0 :   if (outputs[0]->get_data_type() != datatype_to_string<dfmessages::TriggerDecisionToken>()) {
      91            0 :     throw InvalidQueueFatalError(ERS_HERE, get_name(), "TriggerDecisionToken Output queue"); 
      92              :   }
      93              : 
      94            0 :   m_trigger_record_connection = inputs[0]->UID();
      95              : 
      96            0 :   auto modules = mcfg->get_modules();
      97            0 :   std::string trb_uid = "";
      98            0 :   bool is_trmon = false;
      99            0 :   for (auto& mod : modules) {
     100            0 :     if (mod->class_name() == "TRBModule") {
     101            0 :       trb_uid = mod->UID();
     102              :       break;
     103              :     }
     104            0 :     if (mod->class_name() == "TRMonRequestorModule") {
     105              :         is_trmon = true;
     106              :       break;
     107              :     }
     108              :   }
     109              : 
     110            0 :   if (!is_trmon) {
     111            0 :     auto trbdal = mcfg->get_dal<appmodel::TRBModule>(trb_uid);
     112            0 :     if (!trbdal) {
     113            0 :       throw appfwk::CommandFailed(ERS_HERE, "init", get_name(), "Unable to retrieve TRB configuration object");
     114              :     }
     115            0 :     for (auto con : trbdal->get_inputs()) {
     116            0 :       if (con->get_data_type() == datatype_to_string<dfmessages::TriggerDecision>()) {
     117            0 :         m_trigger_decision_connection = con->UID();
     118              :       }
     119              :     }
     120              :   }
     121              : 
     122              :   // try to create the receiver to see test the connection anyway
     123            0 :   m_tr_receiver = iom -> get_receiver<std::unique_ptr<daqdataformats::TriggerRecord>>(m_trigger_record_connection);
     124              : 
     125            0 :   m_token_output = iom->get_sender<dfmessages::TriggerDecisionToken>(outputs[0]->UID());
     126              :   
     127            0 :   TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting init() method";
     128            0 : }
     129              : 
     130              : void
     131            0 : DataWriterModule::generate_opmon_data() {
     132              : 
     133            0 :   opmon::DataWriterInfo dwi;
     134              : 
     135            0 :   dwi.set_records_received(m_records_received_tot.load());
     136              : //   dwi.new_records_received = m_records_received.exchange(0);
     137            0 :   dwi.set_records_written(m_records_written_tot.load());
     138            0 :   dwi.set_new_records_written(m_records_written.exchange(0));
     139              : //   dwi.bytes_output = m_bytes_output_tot.load();  MR: byte writing to be delegated to DataStorage
     140              : //   dwi.new_bytes_output = m_bytes_output.exchange(0);  
     141            0 :   dwi.set_writing_time_us(m_writing_us.exchange(0));
     142              : 
     143            0 :   publish(std::move(dwi));
     144            0 : }
     145              :   
     146              : void
     147            0 : DataWriterModule::do_conf(const CommandData_t&)
     148              : {
     149            0 :   TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering do_conf() method";
     150              : 
     151            0 :   m_data_storage_prescale = m_data_writer_conf->get_data_storage_prescale();
     152            0 :   TLOG_DEBUG(TLVL_CONFIG) << get_name() << ": data_storage_prescale is " << m_data_storage_prescale;
     153            0 :   TLOG_DEBUG(TLVL_CONFIG) << get_name() << ": data_store_parameters are " << m_data_writer_conf->get_data_store_params();
     154            0 :   m_min_write_retry_time_usec = m_data_writer_conf->get_min_write_retry_time_ms() * 1000;
     155            0 :   if (m_min_write_retry_time_usec < 1) {
     156            0 :     m_min_write_retry_time_usec = 1;
     157              :   }
     158            0 :   m_max_write_retry_time_usec = m_data_writer_conf->get_max_write_retry_time_ms() * 1000;
     159            0 :   m_write_retry_time_increase_factor = m_data_writer_conf->get_write_retry_time_increase_factor();
     160              : 
     161              :   // create the DataStore instance here
     162            0 :   try {
     163            0 :     m_data_writer = make_data_store(m_data_writer_conf->get_data_store_params()->get_type(),
     164            0 :                                     m_data_writer_conf->get_data_store_params()->UID(),
     165            0 :                                     m_module_configuration, m_writer_identifier);
     166            0 :     register_node("data_writer", m_data_writer);
     167            0 :   } catch (const ers::Issue& excpt) {
     168            0 :     throw UnableToConfigure(ERS_HERE, get_name(), excpt);
     169            0 :   }
     170              : 
     171              :   // ensure that we have a valid dataWriter instance
     172            0 :   if (m_data_writer.get() == nullptr) {
     173            0 :     throw InvalidDataWriterModule(ERS_HERE, get_name());
     174              :   }
     175              : 
     176            0 :   TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting do_conf() method";
     177            0 : }
     178              : 
     179              : void
     180            0 : DataWriterModule::do_start(const CommandData_t& payload)
     181              : {
     182            0 :   TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering do_start() method";
     183              :   
     184            0 :   rcif::cmd::StartParams start_params = payload.get<rcif::cmd::StartParams>();
     185            0 :   m_data_storage_is_enabled = (!start_params.disable_data_storage);
     186            0 :   m_run_number = start_params.run;
     187              : 
     188            0 :   TLOG_DEBUG(TLVL_WORK_STEPS) << get_name() << ": Sending initial TriggerDecisionToken to DFO to announce my presence";
     189            0 :   dfmessages::TriggerDecisionToken token;
     190            0 :   token.run_number = 0;
     191            0 :   token.trigger_number = 0;
     192            0 :   token.decision_destination = m_trigger_decision_connection;
     193              : 
     194              :   int wasSentSuccessfully = 5;
     195            0 :   do {
     196            0 :     try {
     197            0 :       m_token_output->send(std::move(token), m_queue_timeout);
     198              :       wasSentSuccessfully = 0;
     199            0 :     } catch (const ers::Issue& excpt) {
     200            0 :       std::ostringstream oss_warn;
     201            0 :       oss_warn << "Send with sender \"" << m_token_output->get_name() << "\" failed";
     202            0 :       ers::warning(iomanager::OperationFailed(ERS_HERE, oss_warn.str(), excpt));
     203            0 :       wasSentSuccessfully--;
     204            0 :       std::this_thread::sleep_for(std::chrono::microseconds(5000));
     205            0 :     }
     206            0 :   } while (wasSentSuccessfully);
     207              :  
     208              :   // 04-Feb-2021, KAB: added this call to allow DataStore to prepare for the run.
     209              :   // I've put this call fairly early in this method because it could throw an
     210              :   // exception and abort the run start.  And, it seems sensible to avoid starting
     211              :   // threads, etc. if we throw an exception.
     212            0 :   if (m_data_storage_is_enabled) {
     213              : 
     214              :     // ensure that we have a valid dataWriter instance
     215            0 :     if (m_data_writer.get() == nullptr) {
     216              :       // this check is done essentially to notify the user
     217              :       // in case the "start" has been called before the "conf"
     218            0 :       ers::fatal(InvalidDataWriterModule(ERS_HERE, get_name()));
     219              :     }
     220              :     
     221            0 :     try {
     222            0 :       m_data_writer->prepare_for_run(m_run_number, (start_params.production_vs_test == "TEST"));
     223            0 :     } catch (const ers::Issue& excpt) {
     224            0 :       throw UnableToStart(ERS_HERE, get_name(), m_run_number, excpt);
     225            0 :     }
     226              :   }
     227              : 
     228            0 :   m_seqno_counts.clear();
     229              :   
     230            0 :   m_records_received = 0;
     231            0 :   m_records_received_tot = 0;
     232            0 :   m_records_written = 0;
     233            0 :   m_records_written_tot = 0;
     234            0 :   m_bytes_output = 0;
     235            0 :   m_bytes_output_tot = 0;
     236              : 
     237            0 :   m_running.store(true);
     238              : 
     239            0 :   m_thread.start_working_thread(get_name());
     240              :   //iomanager::IOManager::get()->add_callback<std::unique_ptr<daqdataformats::TriggerRecord>>( m_trigger_record_connection,
     241              :   //                                                                                         bind( &DataWriterModule::receive_trigger_record, this, std::placeholders::_1) );
     242              : 
     243            0 :   TLOG() << get_name() << " successfully started for run number " << m_run_number;
     244            0 :   TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting do_start() method";
     245            0 : }
     246              : 
     247              : void
     248            0 : DataWriterModule::do_stop(const CommandData_t& /*args*/)
     249              : {
     250            0 :   TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering do_stop() method";
     251              : 
     252            0 :   m_running.store(false);
     253            0 :   m_thread.stop_working_thread(); 
     254              :   //iomanager::IOManager::get()->remove_callback<std::unique_ptr<daqdataformats::TriggerRecord>>( m_trigger_record_connection );
     255              : 
     256              :   // 04-Feb-2021, KAB: added this call to allow DataStore to finish up with this run.
     257              :   // I've put this call fairly late in this method so that any draining of queues
     258              :   // (or whatever) can take place before we finalize things in the DataStore.
     259            0 :   if (m_data_storage_is_enabled) {
     260            0 :     try {
     261            0 :       m_data_writer->finish_with_run(m_run_number);
     262            0 :     } catch (const std::exception& excpt) {
     263            0 :       ers::error(ProblemDuringStop(ERS_HERE, get_name(), m_run_number, excpt));
     264            0 :     }
     265              :   }
     266              : 
     267            0 :   TLOG() << get_name() << " successfully stopped for run number " << m_run_number;
     268            0 :   TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting do_stop() method";
     269            0 : }
     270              : 
     271              : void
     272            0 : DataWriterModule::do_scrap(const CommandData_t& /*payload*/)
     273              : {
     274            0 :   TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering do_scrap() method";
     275              : 
     276              :   // clear/reset the DataStore instance here
     277            0 :   m_data_writer.reset();
     278              : 
     279            0 :   TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting do_scrap() method";
     280            0 : }
     281              : 
     282              : void
     283            0 : DataWriterModule::receive_trigger_record(std::unique_ptr<daqdataformats::TriggerRecord> & trigger_record_ptr)
     284              : {
     285            0 :   TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": receiving a new TR ptr";
     286              : 
     287            0 :   ++m_records_received;
     288            0 :   ++m_records_received_tot;
     289            0 :   TLOG_DEBUG(TLVL_WORK_STEPS) << get_name() << ": Obtained the TriggerRecord for trigger number "
     290            0 :                               << trigger_record_ptr->get_header_ref().get_trigger_number() << "."
     291            0 :                               << trigger_record_ptr->get_header_ref().get_sequence_number()
     292            0 :                               << ", run number " << trigger_record_ptr->get_header_ref().get_run_number()
     293            0 :                               << " off the input connection";
     294              : 
     295            0 :   if (trigger_record_ptr->get_header_ref().get_run_number() != m_run_number) {
     296            0 :     ers::error(InvalidRunNumber(ERS_HERE, get_name(), "TriggerRecord", trigger_record_ptr->get_header_ref().get_run_number(),
     297              :                                 m_run_number, trigger_record_ptr->get_header_ref().get_trigger_number(),
     298            0 :                                 trigger_record_ptr->get_header_ref().get_sequence_number()));
     299            0 :     return;
     300              :   }
     301              : 
     302              :   // 03-Feb-2021, KAB: adding support for a data-storage prescale.
     303              :   // In this "if" statement, I deliberately compare the result of (N mod prescale) to 1
     304              :   // instead of zero, since I think that it would be nice to always get the first event
     305              :   // written out.
     306            0 :   if (m_data_storage_prescale <= 1 || ((m_records_received_tot.load() % m_data_storage_prescale) == 1)) {
     307              :     
     308            0 :     if (m_data_storage_is_enabled) {
     309              : 
     310            0 :       std::chrono::steady_clock::time_point start_time = std::chrono::steady_clock::now();
     311              :       
     312            0 :       bool should_retry = true;
     313            0 :       size_t retry_wait_usec = m_min_write_retry_time_usec;
     314            0 :       do {
     315            0 :         should_retry = false;
     316            0 :         try {
     317            0 :           m_data_writer->write(*trigger_record_ptr);
     318            0 :           ++m_records_written;
     319            0 :           ++m_records_written_tot;
     320            0 :           m_bytes_output += trigger_record_ptr->get_total_size_bytes();
     321            0 :           m_bytes_output_tot += trigger_record_ptr->get_total_size_bytes();
     322            0 :         } catch (const RetryableDataStoreProblem& excpt) {
     323            0 :           should_retry = true;
     324            0 :           ers::error(DataWritingProblem(ERS_HERE,
     325            0 :                                         get_name(),
     326              :                                         trigger_record_ptr->get_header_ref().get_trigger_number(),
     327              :                                         trigger_record_ptr->get_header_ref().get_sequence_number(),
     328            0 :                                         trigger_record_ptr->get_header_ref().get_run_number(),
     329              :                                         excpt));
     330            0 :           if (retry_wait_usec > m_max_write_retry_time_usec) {
     331              :             retry_wait_usec = m_max_write_retry_time_usec;
     332              :           }
     333            0 :           usleep(retry_wait_usec);
     334            0 :           retry_wait_usec *= m_write_retry_time_increase_factor;
     335            0 :         } catch (const std::exception& excpt) {
     336            0 :           ers::error(DataWritingProblem(ERS_HERE,
     337            0 :                                         get_name(),
     338              :                                         trigger_record_ptr->get_header_ref().get_trigger_number(),
     339              :                                         trigger_record_ptr->get_header_ref().get_sequence_number(),
     340            0 :                                         trigger_record_ptr->get_header_ref().get_run_number(),
     341              :                                         excpt));
     342            0 :         }
     343            0 :       } while (should_retry && m_running.load());
     344              : 
     345            0 :       std::chrono::steady_clock::time_point end_time = std::chrono::steady_clock::now();
     346            0 :       auto writing_time = std::chrono::duration_cast<std::chrono::microseconds>(end_time - start_time);
     347            0 :       m_writing_us += writing_time.count();
     348              :     } //  if m_data_storage_is_enabled
     349              :   }
     350              :   
     351            0 :   bool send_trigger_complete_message = m_running.load();
     352            0 :   if (trigger_record_ptr->get_header_ref().get_max_sequence_number() > 0) {
     353            0 :     daqdataformats::trigger_number_t trigno = trigger_record_ptr->get_header_ref().get_trigger_number();
     354            0 :     if (m_seqno_counts.count(trigno) > 0) {
     355            0 :       ++m_seqno_counts[trigno];
     356              :     } else {
     357            0 :       m_seqno_counts[trigno] = 1;
     358              :     }
     359              :     // in the following comparison GT (>) is used since the counts are one-based and the
     360              :     // max sequence number is zero-based.
     361            0 :     if (m_seqno_counts[trigno] > trigger_record_ptr->get_header_ref().get_max_sequence_number()) {
     362            0 :       m_seqno_counts.erase(trigno);
     363              :     } else {
     364              :       // Using const .count and .at to avoid reintroducing element to map
     365            0 :       TLOG_DEBUG(TLVL_SEQNO_MAP_CONTENTS) << get_name() << ": the sequence number count for trigger number " << trigno
     366            0 :                                           << " is " << (m_seqno_counts.count(trigno) ? m_seqno_counts.at(trigno) : 0) << " (number of entries "
     367            0 :                                           << "in the seqno map is " << m_seqno_counts.size() << ").";
     368            0 :       send_trigger_complete_message = false;
     369              :     }
     370              :   }
     371            0 :   if (send_trigger_complete_message) {
     372            0 :     TLOG_DEBUG(TLVL_WORK_STEPS) << get_name() << ": Pushing the TriggerDecisionToken for trigger number "
     373            0 :                                 << trigger_record_ptr->get_header_ref().get_trigger_number()
     374            0 :                                 << " onto the relevant output queue";
     375            0 :     dfmessages::TriggerDecisionToken token;
     376            0 :     token.run_number = m_run_number;
     377            0 :     token.trigger_number = trigger_record_ptr->get_header_ref().get_trigger_number();
     378            0 :     token.decision_destination = m_trigger_decision_connection;
     379              : 
     380              :     bool wasSentSuccessfully = false;
     381            0 :     do { 
     382            0 :       try {
     383            0 :         m_token_output -> send( std::move(token), m_queue_timeout );
     384              :         wasSentSuccessfully = true;
     385            0 :       } catch (const ers::Issue& excpt) {
     386            0 :         std::ostringstream oss_warn;
     387            0 :         oss_warn << "Send with sender \"" << m_token_output -> get_name() << "\" failed";
     388            0 :         ers::warning(iomanager::OperationFailed(ERS_HERE, oss_warn.str(), excpt));
     389            0 :       }
     390            0 :     } while (!wasSentSuccessfully && m_running.load());
     391              : 
     392            0 :   }
     393              :   
     394            0 :   TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": operations completed for TR";
     395              : } // NOLINT(readability/fn_size)
     396              : 
     397              : void
     398            0 : DataWriterModule::do_work(std::atomic<bool>& running_flag) {
     399            0 :   while (running_flag.load()) {
     400            0 :           try {
     401            0 :                 std::unique_ptr<daqdataformats::TriggerRecord> tr = m_tr_receiver-> receive(std::chrono::milliseconds(10));   
     402            0 :                 receive_trigger_record(tr);
     403            0 :           }
     404            0 :           catch(const iomanager::TimeoutExpired& excpt) {
     405            0 :           }
     406            0 :           catch(const ers::Issue & excpt) {
     407            0 :                 ers::warning(excpt);
     408            0 :           }
     409              :   }
     410            0 : }
     411              : 
     412              : } // namespace dfmodules
     413              : } // namespace dunedaq
     414              : 
     415            0 : DEFINE_DUNE_DAQ_MODULE(dunedaq::dfmodules::DataWriterModule)
        

Generated by: LCOV version 2.0-1