LCOV - code coverage report
Current view: top level - dfmodules/plugins - FragmentAggregatorModule.cpp (source / functions) Coverage Total Hit
Test: code.result Lines: 0.0 % 168 0
Test Date: 2025-12-21 13:07:08 Functions: 0.0 % 12 0

            Line data    Source code
       1              : /**
       2              :  * @file FragmentAggregatorModule.cpp FragmentAggregatorModule implementation
       3              :  *
       4              :  * This is part of the DUNE DAQ , copyright 2020.
       5              :  * Licensing/copyright details are in the COPYING file that you should have
       6              :  * received with this code.
       7              :  */
       8              : 
       9              : #include "FragmentAggregatorModule.hpp"
      10              : #include "dfmodules/CommonIssues.hpp"
      11              : #include "dfmodules/opmon/FragmentAggregatorModule.pb.h"
      12              : 
      13              : #include "appmodel/FragmentAggregatorModule.hpp"
      14              : #include "appmodel/FragmentAggregatorConf.hpp"
      15              : #include "confmodel/Connection.hpp"
      16              : #include "confmodel/QueueWithSourceId.hpp"
      17              : #include "daqdataformats/FragmentHeader.hpp"
      18              : #include "dfmessages/Fragment_serialization.hpp"
      19              : #include "logging/Logging.hpp"
      20              : 
      21              : #include "iomanager/IOManager.hpp"
      22              : 
      23              : #include <iostream>
      24              : #include <string>
      25              : 
      26              : namespace dunedaq {
      27              : namespace dfmodules {
      28              : 
      29            0 : FragmentAggregatorModule::FragmentAggregatorModule(const std::string& name)
      30              :   : DAQModule(name)
      31            0 :   , m_fragment_send_timeout(1000)
      32              : {
      33            0 :   register_command("start", &FragmentAggregatorModule::do_start);
      34            0 :   register_command("stop_trigger_sources", &FragmentAggregatorModule::do_stop);
      35            0 : }
      36              : 
      37              : void
      38            0 : FragmentAggregatorModule::init(std::shared_ptr<appfwk::ConfigurationManager> mcfg)
      39              : {
      40            0 :   auto mdal = mcfg->get_dal<appmodel::FragmentAggregatorModule>(get_name());
      41            0 :   if (!mdal) {
      42            0 :     throw appfwk::CommandFailed(ERS_HERE, "init", get_name(), "Unable to retrieve configuration object");
      43              :   }
      44              : 
      45            0 :   auto inputs = mdal->get_inputs();
      46            0 :   for (auto con : mdal->get_inputs()) {
      47            0 :     if (con->get_data_type() == datatype_to_string<dfmessages::DataRequest>()) {
      48            0 :       m_data_req_input = con->UID();
      49              :     }
      50            0 :     if (con->get_data_type() == datatype_to_string<daqdataformats::Fragment>()) {
      51            0 :       m_fragment_input = con->UID();
      52              :     }
      53              :   }
      54              : 
      55            0 :   m_producer_conn_ids.clear();
      56            0 :   for (const auto cr : mdal->get_outputs()) {
      57            0 :     if (cr->get_data_type() == datatype_to_string<dfmessages::DataRequest>()) {
      58            0 :       auto qid = cr->cast<confmodel::QueueWithSourceId>();
      59            0 :       m_producer_conn_ids[qid->get_source_id()] = cr->UID();
      60              :     }
      61            0 :     if (cr->get_data_type() == datatype_to_string<std::unique_ptr<daqdataformats::Fragment>>()) {
      62            0 :       m_trb_conn_ids.push_back(cr->UID());
      63              :     }
      64              :   }
      65              : 
      66              :   // this is just to get the data request receiver registered early (before Start)
      67            0 :   auto iom = iomanager::IOManager::get();
      68            0 :   iom->get_receiver<dfmessages::DataRequest>(m_data_req_input);
      69              : 
      70            0 :   m_fragment_send_timeout = std::chrono::milliseconds(mdal->get_configuration()->get_fragment_send_timeout_ms());
      71            0 : }
      72              : 
      73              : void
      74            0 : FragmentAggregatorModule::generate_opmon_data()
      75              : {
      76            0 :   if (m_data_requests_processed > 0) {
      77            0 :     opmon::FragmentAggregatorTimeInfo dr_times;
      78            0 :     dr_times.set_min_us(m_data_requests_time_min_us.exchange(std::numeric_limits<uint64_t>::max()));
      79            0 :     dr_times.set_max_us(m_data_requests_time_max_us.exchange(0));
      80            0 :     dr_times.set_average_us(m_data_requests_time_average_us.exchange(0) / m_data_requests_processed);
      81            0 :     this->publish(std::move(dr_times), { { "data", "DataRequest" } });
      82            0 :   }
      83              : 
      84            0 :   if (m_fragments_processed > 0) {
      85            0 :     opmon::FragmentAggregatorTimeInfo frag_times;
      86            0 :     frag_times.set_min_us(m_fragments_time_min_us.exchange(std::numeric_limits<uint64_t>::max()));
      87            0 :     frag_times.set_max_us(m_fragments_time_max_us.exchange(0));
      88            0 :     frag_times.set_average_us(m_fragments_time_average_us.exchange(0) / m_fragments_processed);
      89            0 :     this->publish(std::move(frag_times), { { "data", "Fragment" } });
      90            0 :   }
      91              : 
      92            0 :   opmon::FADataRequestsCounterInfo dr_info;
      93            0 :   dr_info.set_data_requests_received(m_data_requests_received.exchange(0));
      94            0 :   dr_info.set_data_requests_processed(m_data_requests_processed.exchange(0));
      95            0 :   dr_info.set_data_requests_failed(m_data_requests_failed.load()); // the failed counters are meant NOT to reset
      96            0 :   this->publish(std::move(dr_info));
      97              : 
      98            0 :   opmon::FAFragmentsCounterInfo frag_info;
      99            0 :   frag_info.set_fragments_received(m_fragments_received.exchange(0));
     100            0 :   frag_info.set_fragments_processed(m_fragments_processed.exchange(0));
     101            0 :   frag_info.set_fragments_failed(m_fragments_failed.load());
     102            0 :   frag_info.set_fragments_empty(m_fragments_empty.exchange(0));
     103            0 :   frag_info.set_fragments_incomplete(m_fragments_incomplete.exchange(0));
     104            0 :   frag_info.set_fragments_invalid(m_fragments_invalid.exchange(0));
     105            0 :   this->publish(std::move(frag_info));
     106            0 : }
     107              : 
     108              : void
     109            0 : FragmentAggregatorModule::do_start(const CommandData_t& /* args */)
     110              : {
     111              : 
     112            0 :   m_data_requests_received.store(0);
     113            0 :   m_data_requests_processed.store(0);
     114            0 :   m_data_requests_failed.store(0);
     115            0 :   m_fragments_received.store(0);
     116            0 :   m_fragments_processed.store(0);
     117            0 :   m_fragments_failed.store(0);
     118            0 :   m_fragments_empty.store(0);
     119            0 :   m_fragments_incomplete.store(0);
     120            0 :   m_fragments_invalid.store(0);
     121            0 :   m_fragments_time_average_us.store(0);
     122            0 :   m_fragments_time_min_us.store(std::numeric_limits<uint64_t>::max());
     123            0 :   m_fragments_time_max_us.store(0);
     124            0 :   m_data_requests_time_average_us.store(0);
     125            0 :   m_data_requests_time_min_us.store(std::numeric_limits<uint64_t>::max());
     126            0 :   m_data_requests_time_max_us.store(0);
     127              : 
     128              :   // 19-Dec-2024, KAB: check that Fragment senders are ready to send. This is done so
     129              :   // that the IOManager infrastructure fetches the necessary connection details from
     130              :   // the ConnectivityService at 'start' time, instead of the first time that the sender
     131              :   // is used to send data.  This avoids delays in the sending of the first fragment in
     132              :   // the first data-taking run in a DAQ session. Such delays can lead to undesirable
     133              :   // system behavior like trigger inhibits.
     134            0 :   auto iom = iomanager::IOManager::get();
     135            0 :   for (auto trb_conn : m_trb_conn_ids) {
     136            0 :     auto sender = iom->get_sender<std::unique_ptr<daqdataformats::Fragment>>(trb_conn);
     137            0 :     if (sender != nullptr) {
     138            0 :       bool is_ready = sender->is_ready_for_sending(std::chrono::milliseconds(100));
     139            0 :       TLOG_DEBUG(0) << "The Fragment sender for " << trb_conn << " " << (is_ready ? "is" : "is not") << " ready.";
     140              :     }
     141            0 :   }
     142            0 :   iom->add_callback<dfmessages::DataRequest>(
     143            0 :     m_data_req_input, std::bind(&FragmentAggregatorModule::process_data_request, this, std::placeholders::_1));
     144            0 :   iom->add_callback<std::unique_ptr<daqdataformats::Fragment>>(
     145            0 :     m_fragment_input, std::bind(&FragmentAggregatorModule::process_fragment, this, std::placeholders::_1));
     146            0 : }
     147              : 
     148              : void
     149            0 : FragmentAggregatorModule::do_stop(const CommandData_t& /* args */)
     150              : {
     151            0 :   auto iom = iomanager::IOManager::get();
     152            0 :   iom->remove_callback<dfmessages::DataRequest>(m_data_req_input);
     153            0 :   iom->remove_callback<std::unique_ptr<daqdataformats::Fragment>>(m_fragment_input);
     154            0 :   m_data_req_map.clear();
     155            0 : }
     156              : 
     157              : void
     158            0 : FragmentAggregatorModule::process_data_request(dfmessages::DataRequest& data_request)
     159              : {
     160              : 
     161            0 :   {
     162            0 :     std::scoped_lock lock(m_mutex);
     163              : 
     164            0 :     m_timestamp_before_dr = get_current_time_us();
     165            0 :     m_data_requests_received++;
     166              : 
     167            0 :     std::tuple<dfmessages::trigger_number_t, dfmessages::sequence_number_t, daqdataformats::SourceID> triplet = {
     168            0 :       data_request.trigger_number, data_request.sequence_number, data_request.request_information.component
     169            0 :     };
     170            0 :     m_data_req_map[triplet] = data_request.data_destination;
     171            0 :   }
     172              :   // Forward Data Request to the right DLH
     173            0 :   try {
     174              :     // std::string component_name = "inputReqToDLH-" + data_request.request_information.component.to_string();
     175            0 :     auto uid_elem = m_producer_conn_ids.find(data_request.request_information.component.id);
     176            0 :     if (uid_elem == m_producer_conn_ids.end()) {
     177            0 :       ers::error(dunedaq::dfmodules::DRSenderLookupFailed(ERS_HERE,
     178              :                                                           data_request.request_information.component,
     179              :                                                           data_request.run_number,
     180              :                                                           data_request.trigger_number,
     181            0 :                                                           data_request.sequence_number));
     182              :     } else {
     183            0 :       TLOG_DEBUG(30) << "Send data request to " << uid_elem->second;
     184            0 :       auto sender = get_iom_sender<dfmessages::DataRequest>(uid_elem->second);
     185            0 :       data_request.data_destination = m_fragment_input;
     186            0 :       sender->send(std::move(data_request), iomanager::Sender::s_no_block);
     187              : 
     188            0 :       m_data_requests_processed++;
     189            0 :       auto timestamp_total = get_current_time_us() - m_timestamp_before_dr;
     190            0 :       if (timestamp_total < m_data_requests_time_min_us) {
     191            0 :         m_data_requests_time_min_us = timestamp_total;
     192              :       }
     193            0 :       if (timestamp_total > m_data_requests_time_max_us) {
     194            0 :         m_data_requests_time_max_us = timestamp_total;
     195              :       }
     196            0 :       m_data_requests_time_average_us += timestamp_total;
     197            0 :     }
     198            0 :   } catch (const ers::Issue& excpt) {
     199            0 :     ers::error(dunedaq::dfmodules::DRSenderSendFailed(ERS_HERE,
     200              :                                                       data_request.run_number,
     201              :                                                       data_request.trigger_number,
     202            0 :                                                       data_request.sequence_number,
     203              :                                                       data_request.request_information.component));
     204            0 :     m_data_requests_failed++;
     205            0 :   }
     206            0 : }
     207              : 
     208              : void
     209            0 : FragmentAggregatorModule::process_fragment(std::unique_ptr<daqdataformats::Fragment>& fragment)
     210              : {
     211              :   // Forward Fragment to the right TRB
     212            0 :   std::string trb_identifier;
     213            0 :   {
     214            0 :     std::scoped_lock lock(m_mutex);
     215              : 
     216            0 :     m_timestamp_before_frag = get_current_time_us();
     217            0 :     m_fragments_received++;
     218              : 
     219            0 :     std::bitset<32> error_bits = fragment->get_error_bits();
     220            0 :     if (error_bits[static_cast<size_t>(dunedaq::daqdataformats::FragmentErrorBits::kDataNotFound)])
     221            0 :       m_fragments_empty++;
     222            0 :     if (error_bits[static_cast<size_t>(dunedaq::daqdataformats::FragmentErrorBits::kIncomplete)])
     223            0 :       m_fragments_incomplete++;
     224            0 :     if (error_bits[static_cast<size_t>(dunedaq::daqdataformats::FragmentErrorBits::kInvalidWindow)])
     225            0 :       m_fragments_invalid++;
     226              : 
     227            0 :     auto dr_iter = m_data_req_map.find(
     228            0 :       std::make_tuple<dfmessages::trigger_number_t, dfmessages::sequence_number_t, daqdataformats::SourceID>(
     229            0 :         fragment->get_trigger_number(), fragment->get_sequence_number(), fragment->get_element_id()));
     230            0 :     if (dr_iter != m_data_req_map.end()) {
     231            0 :       trb_identifier = dr_iter->second;
     232            0 :       m_data_req_map.erase(dr_iter);
     233              :     } else {
     234            0 :       ers::error(UnknownFragmentDestination(
     235            0 :         ERS_HERE, fragment->get_trigger_number(), fragment->get_sequence_number(), fragment->get_element_id()));
     236            0 :       return;
     237              :     }
     238            0 :   }
     239            0 :   try {
     240            0 :     TLOG_DEBUG(27) << get_name() << " Sending fragment for trigger/sequence_number " << fragment->get_trigger_number()
     241            0 :                    << "." << fragment->get_sequence_number() << " and SourceID " << fragment->get_element_id() << " to "
     242            0 :                    << trb_identifier;
     243            0 :     auto sender = get_iom_sender<std::unique_ptr<daqdataformats::Fragment>>(trb_identifier);
     244            0 :     sender->send(std::move(fragment), m_fragment_send_timeout);
     245              : 
     246            0 :     m_fragments_processed++;
     247            0 :     auto timestamp_total = get_current_time_us() - m_timestamp_before_frag;
     248            0 :     if (timestamp_total < m_fragments_time_min_us) {
     249            0 :       m_fragments_time_min_us = timestamp_total;
     250              :     }
     251            0 :     if (timestamp_total > m_fragments_time_max_us) {
     252            0 :       m_fragments_time_max_us = timestamp_total;
     253              :     }
     254            0 :     m_fragments_time_average_us += timestamp_total;
     255              : 
     256            0 :   } catch (const ers::Issue& excpt) {
     257            0 :     ers::error(AbandonedFragment(ERS_HERE,
     258              :                                  fragment->get_run_number(),
     259              :                                  fragment->get_trigger_number(),
     260            0 :                                  fragment->get_sequence_number(),
     261              :                                  fragment->get_element_id(),
     262              :                                  excpt));
     263            0 :     m_fragments_failed++;
     264            0 :   }
     265            0 : }
     266              : 
     267              : uint64_t
     268            0 : FragmentAggregatorModule::get_current_time_us()
     269              : {
     270            0 :   return std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::steady_clock::now().time_since_epoch())
     271            0 :     .count();
     272              : }
     273              : 
     274              : } // namespace dfmodules
     275              : } // namespace dunedaq
     276              : 
     277            0 : DEFINE_DUNE_DAQ_MODULE(dunedaq::dfmodules::FragmentAggregatorModule)
        

Generated by: LCOV version 2.0-1