LCOV - code coverage report
Current view: top level - dfmodules/plugins - FakeDataProdModule.cpp (source / functions) Coverage Total Hit
Test: code.result Lines: 0.0 % 113 0
Test Date: 2025-12-21 13:07:08 Functions: 0.0 % 21 0

            Line data    Source code
       1              : /**
       2              :  * @file FakeDataProdModule.cpp FakeDataProdModule class implementation
       3              :  *
       4              :  * This is part of the DUNE DAQ Software Suite, copyright 2020.
       5              :  * Licensing/copyright details are in the COPYING file that you should have
       6              :  * received with this code.
       7              :  */
       8              : 
       9              : #include "FakeDataProdModule.hpp"
      10              : #include "dfmodules/CommonIssues.hpp"
      11              : 
      12              : #include "appmodel/FakeDataProdModule.hpp"
      13              : #include "confmodel/Connection.hpp"
      14              : #include "dfmessages/Fragment_serialization.hpp"
      15              : #include "dfmessages/TimeSync.hpp"
      16              : #include "iomanager/IOManager.hpp"
      17              : #include "logging/Logging.hpp"
      18              : 
      19              : #include <chrono>
      20              : #include <cstdlib>
      21              : #include <memory>
      22              : #include <string>
      23              : #include <thread>
      24              : #include <utility>
      25              : #include <vector>
      26              : 
      27              : /**
      28              :  * @brief Name used by TRACE TLOG calls from this source file
      29              :  */
      30              : #define TRACE_NAME "FakeDataProdModule" // NOLINT
      31              : enum
      32              : {
      33              :   TLVL_ENTER_EXIT_METHODS = 5,
      34              :   TLVL_CONFIG = 7,
      35              :   TLVL_WORK_STEPS = 10,
      36              :   TLVL_TIME_SYNCS = 12
      37              : };
      38              : 
      39              : namespace dunedaq {
      40              : namespace dfmodules {
      41              : 
      42            0 : FakeDataProdModule::FakeDataProdModule(const std::string& name)
      43              :   : dunedaq::appfwk::DAQModule(name)
      44            0 :   , m_timesync_thread(std::bind(&FakeDataProdModule::do_timesync, this, std::placeholders::_1))
      45            0 :   , m_queue_timeout(100)
      46            0 :   , m_run_number(0)
      47              : {
      48            0 :   register_command("conf", &FakeDataProdModule::do_conf);
      49            0 :   register_command("start", &FakeDataProdModule::do_start);
      50            0 :   register_command("stop", &FakeDataProdModule::do_stop);
      51            0 : }
      52              : 
      53              : void
      54            0 : FakeDataProdModule::init(std::shared_ptr<appfwk::ConfigurationManager> mcfg)
      55              : {
      56            0 :   TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering init() method";
      57            0 :   auto mdal = mcfg->get_dal<appmodel::FakeDataProdModule>(get_name());
      58            0 :   if (!mdal) {
      59            0 :     throw appfwk::CommandFailed(ERS_HERE, "init", get_name(), "Unable to retrieve configuration object");
      60              :   }
      61              : 
      62            0 :   auto inputs = mdal->get_inputs();
      63            0 :   auto outputs = mdal->get_outputs();
      64              : 
      65            0 :   if (inputs[0]->get_data_type() != datatype_to_string<dfmessages::DataRequest>()) {
      66            0 :     throw InvalidQueueFatalError(ERS_HERE, get_name(), "DataRequest Input queue");
      67              :   }
      68            0 :   m_data_request_id = inputs[0]->UID();
      69              : 
      70            0 :   for (auto con : outputs) {
      71            0 :     if (con->get_data_type() == datatype_to_string<dfmessages::TimeSync>()) {
      72            0 :       m_timesync_id = con->UID();
      73              :     }
      74              :   }
      75            0 :   m_fake_data_prod_conf = mdal->get_configuration();
      76            0 :   TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting init() method";
      77            0 : }
      78              : 
      79              : void
      80            0 : FakeDataProdModule::do_conf(const CommandData_t&)
      81              : {
      82            0 :   TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering do_conf() method";
      83              : 
      84            0 :   m_sourceid.subsystem = daqdataformats::SourceID::string_to_subsystem(m_fake_data_prod_conf->get_system_type());
      85            0 :   m_sourceid.id = m_fake_data_prod_conf->get_source_id();
      86            0 :   m_time_tick_diff = m_fake_data_prod_conf->get_time_tick_diff();
      87            0 :   m_frame_size = m_fake_data_prod_conf->get_frame_size();
      88            0 :   m_response_delay = m_fake_data_prod_conf->get_response_delay();
      89            0 :   m_fragment_type = daqdataformats::string_to_fragment_type(m_fake_data_prod_conf->get_fragment_type());
      90              : 
      91            0 :   TLOG_DEBUG(TLVL_CONFIG) << get_name() << ": configured for link number " << m_sourceid.id;
      92              : 
      93            0 :   TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting do_conf() method";
      94            0 : }
      95              : 
      96              : void
      97            0 : FakeDataProdModule::do_start(const CommandData_t& payload)
      98              : {
      99            0 :   TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering do_start() method";
     100            0 :   m_sent_fragments = 0;
     101            0 :   m_received_requests = 0;
     102            0 :   m_run_number = payload.value<dunedaq::daqdataformats::run_number_t>("run", 0);
     103              : 
     104            0 :   m_timesync_thread.start_working_thread();
     105              : 
     106            0 :   auto iom = iomanager::IOManager::get();
     107            0 :   iom->add_callback<dfmessages::DataRequest>(
     108            0 :     m_data_request_id, std::bind(&FakeDataProdModule::process_data_request, this, std::placeholders::_1));
     109            0 :   TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting do_start() method";
     110            0 : }
     111              : 
     112              : void
     113            0 : FakeDataProdModule::do_stop(const CommandData_t& /*args*/)
     114              : {
     115            0 :   TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering do_stop() method";
     116            0 :   m_timesync_thread.stop_working_thread();
     117              : 
     118            0 :   auto iom = iomanager::IOManager::get();
     119            0 :   iom->remove_callback<dfmessages::DataRequest>(m_data_request_id);
     120            0 :   TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting do_stop() method";
     121            0 : }
     122              : 
     123              : // void
     124              : // FakeDataProdModule::get_info(opmonlib::InfoCollector& ci, int /*level*/)
     125              : // {
     126              : //   fakedataprodinfo::Info info;
     127              : //   info.requests_received = m_received_requests;
     128              : //   info.fragments_sent = m_sent_fragments;
     129              : //   ci.add(info);
     130              : // }
     131              : 
     132              : void
     133            0 : FakeDataProdModule::do_timesync(std::atomic<bool>& running_flag)
     134              : {
     135              : 
     136            0 :   auto iom = iomanager::IOManager::get();
     137            0 :   auto sender_ptr = iom->get_sender<dfmessages::TimeSync>(m_timesync_id);
     138              : 
     139            0 :   int sent_count = 0;
     140            0 :   uint64_t msg_seqno = 0; // NOLINT (build/unsigned)
     141            0 :   while (running_flag.load()) {
     142            0 :     auto time_now = std::chrono::system_clock::now().time_since_epoch();
     143            0 :     uint64_t current_timestamp = // NOLINT (build/unsigned)
     144            0 :       std::chrono::duration_cast<std::chrono::nanoseconds>(time_now).count();
     145            0 :     auto timesyncmsg = dfmessages::TimeSync(current_timestamp);
     146            0 :     ++msg_seqno;
     147            0 :     timesyncmsg.run_number = m_run_number;
     148            0 :     timesyncmsg.sequence_number = msg_seqno;
     149            0 :     timesyncmsg.source_id = m_sourceid.id;
     150            0 :     TLOG_DEBUG(TLVL_TIME_SYNCS) << "New timesync: daq=" << timesyncmsg.daq_time << " wall=" << timesyncmsg.system_time
     151            0 :                                 << " run=" << timesyncmsg.run_number << " seqno=" << timesyncmsg.sequence_number
     152            0 :                                 << " source_id=" << timesyncmsg.source_id;
     153            0 :     try {
     154            0 :       sender_ptr->send(std::move(timesyncmsg), std::chrono::milliseconds(500));
     155            0 :       ++sent_count;
     156            0 :     } catch (ers::Issue& excpt) {
     157            0 :       ers::warning(TimeSyncTransmissionFailed(ERS_HERE, get_name(), m_timesync_id, excpt));
     158            0 :     }
     159            0 :     std::this_thread::sleep_for(std::chrono::milliseconds(100));
     160              :   }
     161            0 :   TLOG() << get_name() << ": sent " << sent_count << " TimeSync messages.";
     162            0 : }
     163              : 
     164              : void
     165            0 : FakeDataProdModule::process_data_request(dfmessages::DataRequest& data_request)
     166              : {
     167              : 
     168            0 :   TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": processsing request " << data_request.request_number;
     169              : 
     170            0 :   m_received_requests++;
     171              : 
     172              :   // num_frames_to_send = ⌈window_size / tick_diff⌉
     173            0 :   size_t num_frames_to_send = (data_request.request_information.window_end -
     174            0 :                                data_request.request_information.window_begin + m_time_tick_diff - 1) /
     175              :                               m_time_tick_diff;
     176            0 :   size_t num_bytes_to_send = num_frames_to_send * m_frame_size;
     177              : 
     178              :   // We don't care about the content of the data, but the size should be correct
     179              : 
     180            0 :   std::vector<uint8_t> fake_data; // NOLINT (build/unsigned)
     181            0 :   try {
     182            0 :     fake_data.resize(num_bytes_to_send);
     183            0 :   } catch (const std::bad_alloc&) {
     184            0 :     throw dunedaq::dfmodules::MemoryAllocationFailed(ERS_HERE, get_name(), num_bytes_to_send);
     185            0 :   }
     186              : 
     187            0 :   auto data_fragment_ptr = std::make_unique<daqdataformats::Fragment>(fake_data.data(), num_bytes_to_send);
     188              : 
     189            0 :   data_fragment_ptr->set_trigger_number(data_request.trigger_number);
     190            0 :   data_fragment_ptr->set_run_number(m_run_number);
     191            0 :   data_fragment_ptr->set_element_id(m_sourceid);
     192            0 :   data_fragment_ptr->set_error_bits(0);
     193            0 :   data_fragment_ptr->set_type(m_fragment_type);
     194            0 :   data_fragment_ptr->set_trigger_timestamp(data_request.trigger_timestamp);
     195            0 :   data_fragment_ptr->set_window_begin(data_request.request_information.window_begin);
     196            0 :   data_fragment_ptr->set_window_end(data_request.request_information.window_end);
     197            0 :   data_fragment_ptr->set_sequence_number(data_request.sequence_number);
     198              : 
     199            0 :   if (m_response_delay > 0) {
     200            0 :     std::this_thread::sleep_for(std::chrono::nanoseconds(m_response_delay));
     201              :   }
     202              : 
     203            0 :   try {
     204            0 :     auto iom = iomanager::IOManager::get();
     205            0 :     iom->get_sender<std::unique_ptr<daqdataformats::Fragment>>(data_request.data_destination)
     206            0 :       ->send(std::move(data_fragment_ptr), std::chrono::milliseconds(1000));
     207            0 :   } catch (ers::Issue& e) {
     208            0 :     ers::warning(FragmentTransmissionFailed(ERS_HERE, get_name(), data_request.trigger_number, e));
     209            0 :   }
     210              : 
     211            0 :   TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": finishing processing request " << data_request.request_number;
     212            0 : }
     213              : 
     214              : } // namespace dfmodules
     215              : } // namespace dunedaq
     216              : 
     217            0 : DEFINE_DUNE_DAQ_MODULE(dunedaq::dfmodules::FakeDataProdModule)
        

Generated by: LCOV version 2.0-1