LCOV - code coverage report
Current view: top level - dfmodules/plugins - TRBModule.cpp (source / functions) Coverage Total Hit
Test: code.result Lines: 0.0 % 415 0
Test Date: 2025-12-21 13:07:08 Functions: 0.0 % 46 0

            Line data    Source code
       1              : /**
       2              :  * @file TRBModule.cpp TRBModule class implementation
       3              :  *
       4              :  * This is part of the DUNE DAQ Software Suite, copyright 2020.
       5              :  * Licensing/copyright details are in the COPYING file that you should have
       6              :  * received with this code.
       7              :  */
       8              : 
       9              : #include "TRBModule.hpp"
      10              : #include "dfmodules/CommonIssues.hpp"
      11              : 
      12              : #include "appmodel/NetworkConnectionDescriptor.hpp"
      13              : #include "appmodel/NetworkConnectionRule.hpp"
      14              : #include "appmodel/ReadoutApplication.hpp"
      15              : #include "appmodel/SourceIDConf.hpp"
      16              : #include "appmodel/SourceIDToNetworkConnection.hpp"
      17              : #include "appmodel/TRBModule.hpp"
      18              : #include "appmodel/TriggerApplication.hpp"
      19              : #include "confmodel/Application.hpp"
      20              : #include "confmodel/Connection.hpp"
      21              : #include "confmodel/DetectorStream.hpp"
      22              : #include "confmodel/DetectorToDaqConnection.hpp"
      23              : #include "confmodel/NetworkConnection.hpp"
      24              : #include "confmodel/Session.hpp"
      25              : #include "dfmessages/TriggerRecord_serialization.hpp"
      26              : #include "logging/Logging.hpp"
      27              : 
      28              : #include "iomanager/IOManager.hpp"
      29              : 
      30              : #include <algorithm>
      31              : #include <chrono>
      32              : #include <cmath>
      33              : #include <cstdlib>
      34              : #include <limits>
      35              : #include <memory>
      36              : #include <numeric>
      37              : #include <string>
      38              : #include <thread>
      39              : #include <utility>
      40              : #include <vector>
      41              : 
      42              : /**
      43              :  * @brief TRACE debug levels used in this source file
      44              :  */
      45              : enum
      46              : {
      47              :   TLVL_ENTER_EXIT_METHODS = 5,
      48              :   TLVL_INIT = 8,
      49              :   TLVL_WORK_STEPS = 10,
      50              :   TLVL_BOOKKEEPING = 15,
      51              :   TLVL_DISPATCH_DATAREQ = 21,
      52              :   TLVL_FRAGMENT_RECEIVE = 22
      53              : };
      54              : 
      55              : namespace dunedaq {
      56              : namespace dfmodules {
      57              : 
      58              : using daqdataformats::TriggerRecordErrorBits;
      59              : 
      60            0 : TRBModule::TRBModule(const std::string& name)
      61              :   : dunedaq::appfwk::DAQModule(name)
      62            0 :   , m_stop_requested(true)
      63            0 :   , m_tr_queue_timeout(100)
      64            0 :   , m_dreq_queue_timeout(100)
      65              : {
      66              : 
      67            0 :   register_command("conf", &TRBModule::do_conf);
      68            0 :   register_command("scrap", &TRBModule::do_scrap);
      69            0 :   register_command("start", &TRBModule::do_start);
      70            0 :   register_command("stop", &TRBModule::do_stop);
      71            0 : }
      72              : 
      73              : void
      74            0 : TRBModule::init(std::shared_ptr<appfwk::ConfigurationManager> mcfg)
      75              : {
      76              : 
      77            0 :   TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering init() method";
      78              : 
      79              :   //--------------------------------
      80              :   // Get single queues
      81              :   //---------------------------------
      82              : 
      83            0 :   auto mdal = mcfg->get_dal<appmodel::TRBModule>(get_name());
      84            0 :   if (!mdal) {
      85            0 :     throw appfwk::CommandFailed(ERS_HERE, "init", get_name(), "Unable to retrieve configuration object");
      86              :   }
      87              : 
      88            0 :   auto iom = iomanager::IOManager::get();
      89            0 :   for (auto con : mdal->get_inputs()) {
      90            0 :     if (con->get_data_type() == datatype_to_string<dfmessages::TriggerDecision>()) {
      91            0 :       m_trigger_decision_input = iom->get_receiver<dfmessages::TriggerDecision>(con->UID());
      92              :     }
      93            0 :     if (con->get_data_type() == datatype_to_string<std::unique_ptr<daqdataformats::Fragment>>()) {
      94            0 :       m_fragment_input = iom->get_receiver<std::unique_ptr<daqdataformats::Fragment>>(con->UID());
      95              : 
      96              :       // save the data fragment receiver global connection name for later, when it gets
      97              :       // copied into the DataRequests so that data producers know where to send their fragments
      98            0 :       m_reply_connection = con->UID();
      99              :     }
     100            0 :     if (con->get_data_type() == datatype_to_string<dfmessages::TRMonRequest>()) {
     101            0 :       m_mon_receiver = iom->get_receiver<dfmessages::TRMonRequest>(con->UID());
     102              :     }
     103              :   }
     104              : 
     105            0 :   if (m_trigger_decision_input == nullptr) {
     106            0 :     throw InvalidQueueFatalError(ERS_HERE, get_name(), "TriggerDecision Input queue");
     107              :   }
     108            0 :   if (m_fragment_input == nullptr) {
     109            0 :     throw InvalidQueueFatalError(ERS_HERE, get_name(), "Fragment Input queue");
     110              :   }
     111              : 
     112            0 :   m_trigger_record_output = iom->get_sender<std::unique_ptr<daqdataformats::TriggerRecord>>(mdal->get_trigger_record_output()->UID());
     113              : 
     114            0 :   for (auto con : mdal->get_request_connections()) {
     115            0 :     for (auto source_id : con->get_source_ids()) {
     116              : 
     117              :       // find the queue for sourceid_req in the map
     118            0 :       std::unique_lock<std::mutex> lk(m_map_sourceid_connections_mutex);
     119            0 :       daqdataformats::SourceID sid;
     120            0 :       sid.subsystem = daqdataformats::SourceID::string_to_subsystem(source_id->get_subsystem());
     121            0 :       sid.id = source_id->get_sid();
     122            0 :       auto it_req = m_map_sourceid_connections.find(sid);
     123            0 :       if (it_req == m_map_sourceid_connections.end() || it_req->second == nullptr) {
     124            0 :         m_map_sourceid_connections[sid] = get_iom_sender<dfmessages::DataRequest>(con->get_netconn()->UID());
     125              :       }
     126            0 :       lk.unlock();
     127            0 :     }
     128              :   }
     129              : 
     130            0 :   m_trb_conf = mdal->get_configuration();
     131              : 
     132            0 :   TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting init() method";
     133            0 : }
     134              : 
     135              : void
     136            0 : TRBModule::generate_opmon_data()
     137              : {
     138              : 
     139            0 :   opmon::TRBInfo i;
     140              : 
     141              :   // status metrics
     142            0 :   i.set_pending_trigger_decisions(m_trigger_decisions_counter.load());
     143            0 :   i.set_fragments_in_the_book(m_fragment_counter.load());
     144            0 :   i.set_pending_fragments(m_pending_fragment_counter.load());
     145              : 
     146              :   // operation metrics
     147            0 :   i.set_received_trigger_decisions(m_received_trigger_decisions.exchange(0));
     148            0 :   i.set_generated_trigger_records(m_generated_trigger_records.exchange(0));
     149            0 :   i.set_generated_data_requests(m_generated_data_requests.exchange(0));
     150            0 :   i.set_received_fragments(m_received_fragments.exchange(0));
     151            0 :   i.set_data_waiting_time(m_data_waiting_time.exchange(0));
     152            0 :   i.set_data_request_width(m_data_request_width.exchange(0));
     153            0 :   i.set_trigger_decision_width(m_trigger_decision_width.exchange(0));
     154            0 :   i.set_received_trmon_requests(m_trmon_request_counter.exchange(0));
     155            0 :   i.set_sent_trmon(m_trmon_sent_counter.exchange(0));
     156              : 
     157            0 :   i.set_td_processing_us(m_td_processing_us.exchange(0));
     158            0 :   i.set_fragment_processing_us(m_fragment_processing_us.exchange(0));
     159              : 
     160            0 :   publish(std::move(i));
     161              : 
     162            0 :   opmon::TRBErrors err;
     163              :   // error counters
     164            0 :   err.set_timed_out_trigger_records(m_timed_out_trigger_records.load());
     165            0 :   err.set_abandoned_trigger_records(m_abandoned_trigger_records.load());
     166            0 :   err.set_unexpected_fragments(m_unexpected_fragments.load());
     167            0 :   err.set_unexpected_trigger_decisions(m_unexpected_trigger_decisions.load());
     168            0 :   err.set_lost_fragments(m_lost_fragments.load());
     169            0 :   err.set_invalid_requests(m_invalid_requests.load());
     170            0 :   err.set_duplicated_trigger_ids(m_duplicated_trigger_ids.load());
     171              : 
     172            0 :   publish(std::move(err));
     173            0 : }
     174              : 
     175              : void
     176            0 : TRBModule::do_conf(const CommandData_t&)
     177              : {
     178            0 :   TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering do_conf() method";
     179              : 
     180            0 :   m_trigger_timeout = std::chrono::milliseconds(m_trb_conf->get_trigger_record_timeout_ms());
     181              : 
     182            0 :   m_tr_queue_timeout = std::chrono::milliseconds(m_trb_conf->get_tr_queue_timeout());
     183            0 :   m_dreq_queue_timeout = std::chrono::milliseconds(m_trb_conf->get_request_queue_timeout());
     184              : 
     185            0 :   TLOG() << get_name() << ": timeouts (ms): TR = " << m_tr_queue_timeout.count()
     186            0 :          << ", DReq = " << m_dreq_queue_timeout.count();
     187            0 :   m_max_sequence_length = m_trb_conf->get_max_sequence_length_ticks();
     188            0 :   TLOG() << get_name() << ": Max time window is " << m_max_sequence_length;
     189              : 
     190            0 :   m_this_trb_source_id.subsystem = daqdataformats::SourceID::Subsystem::kTRBuilder;
     191            0 :   m_this_trb_source_id.id = m_trb_conf->get_source_id();
     192              : 
     193            0 :   m_max_open_trigger_records = m_trb_conf->get_maximum_open_trigger_records();
     194              : 
     195            0 :   TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting do_conf() method";
     196            0 : }
     197              : 
     198              : void
     199            0 : TRBModule::do_scrap(const CommandData_t& /*args*/)
     200              : {
     201            0 :   TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering do_scrap() method";
     202              : 
     203            0 :   TLOG() << get_name() << " successfully scrapped";
     204            0 :   TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting do_scrap() method";
     205            0 : }
     206              : 
     207              : void
     208            0 : TRBModule::do_start(const CommandData_t& args)
     209              : {
     210            0 :   TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering do_start() method";
     211              : 
     212              :   // clean books from possible previous memory
     213            0 :   m_trigger_records.clear();
     214            0 :   m_trigger_decisions_counter.store(0);
     215            0 :   m_unexpected_trigger_decisions.store(0);
     216            0 :   m_pending_fragment_counter.store(0);
     217            0 :   m_generated_trigger_records.store(0);
     218            0 :   m_fragment_counter.store(0);
     219            0 :   m_timed_out_trigger_records.store(0);
     220            0 :   m_abandoned_trigger_records.store(0);
     221            0 :   m_unexpected_fragments.store(0);
     222            0 :   m_lost_fragments.store(0);
     223            0 :   m_invalid_requests.store(0);
     224            0 :   m_duplicated_trigger_ids.store(0);
     225              : 
     226              :   // 19-Dec-2024, KAB: check that DataRequest senders are ready to send. This is done so
     227              :   // that the IOManager infrastructure fetches the necessary connection details from
     228              :   // the ConnectivityService at 'start' time, instead of the first time that the sender
     229              :   // is used to send a message.  This avoids delays in the sending of the first request in
     230              :   // the first data-taking run in a DAQ session. Such delays can lead to undesirable
     231              :   // system behavior like trigger inhibits.
     232            0 :   {
     233            0 :     std::unique_lock<std::mutex> lk(m_map_sourceid_connections_mutex);
     234            0 :     for (const auto& sid_sender : m_map_sourceid_connections) {
     235            0 :       std::shared_ptr<data_req_sender_t> sender = sid_sender.second;
     236            0 :       if (sender != nullptr) {
     237            0 :         bool is_ready = sender->is_ready_for_sending(std::chrono::milliseconds(100));
     238            0 :         TLOG_DEBUG(0) << "The DataRequest sender for " << sid_sender.first << " " << (is_ready ? "is" : "is not")
     239            0 :                       << " ready.";
     240              :       }
     241            0 :     }
     242            0 :   }
     243              : 
     244            0 :   m_run_number.reset(new const daqdataformats::run_number_t(args.at("run").get<daqdataformats::run_number_t>()));
     245            0 :   m_stop_requested = false;
     246              : 
     247              :   // Register the callback to receive monitoring requests
     248            0 :   if (m_mon_receiver) {
     249            0 :     m_mon_requests.clear();
     250            0 :     m_mon_receiver->add_callback(std::bind(&TRBModule::tr_requested, this, std::placeholders::_1));
     251              :   }
     252              : 
     253            0 :   m_fragment_input->add_callback(std::bind(&TRBModule::fragments_callback, this, std::placeholders::_1));
     254            0 :   m_trigger_decision_input->add_callback(std::bind(&TRBModule::trigger_decision_callback, this, std::placeholders::_1));
     255              : 
     256            0 :   TLOG() << get_name() << " successfully started";
     257            0 :   TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting do_start() method";
     258            0 : }
     259              : 
     260              : void
     261            0 : TRBModule::do_stop(const CommandData_t& /*args*/)
     262              : {
     263            0 :   TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering do_stop() method";
     264              :   // Unregister the monitoring requests callback
     265              : 
     266            0 :   if (m_mon_receiver) {
     267            0 :     m_mon_receiver->remove_callback();
     268              :   }
     269              : 
     270            0 :   m_trigger_decision_input->remove_callback();
     271            0 :   m_fragment_input->remove_callback();
     272              : 
     273            0 :   m_stop_requested = true;
     274              : 
     275            0 :   flush_trigger_records();
     276              : 
     277            0 :   TLOG() << get_name() << " successfully stopped";
     278            0 :   TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting do_stop() method";
     279            0 : }
     280              : 
     281              : void
     282            0 : TRBModule::tr_requested(const dfmessages::TRMonRequest& req)
     283              : {
     284            0 :   ++m_trmon_request_counter;
     285              : 
     286              :   // Ignore requests that don't belong to the ongoing run
     287            0 :   if (req.run_number != *m_run_number)
     288            0 :     return;
     289              : 
     290              :   // Add requests to pending requests
     291              :   // To be done: choose a concurrent container implementation.
     292            0 :   const std::lock_guard<std::mutex> lock(m_mon_mutex);
     293            0 :   m_mon_requests.push_back(req);
     294            0 : }
     295              : 
     296              : void
     297            0 : TRBModule::flush_trigger_records()
     298              : {
     299            0 :   TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Starting draining phase ";
     300            0 :   std::chrono::steady_clock::time_point t1 = std::chrono::steady_clock::now();
     301              : 
     302              :   // //-------------------------------------------------
     303              :   // // Here we drain what has been left from the running condition
     304              :   // //--------------------------------------------------
     305              : 
     306              :   // create all possible trigger record
     307            0 :   std::vector<TriggerId> triggers;
     308            0 :   for (const auto& entry : m_trigger_records) {
     309            0 :     triggers.push_back(entry.first);
     310              :   }
     311              : 
     312              :   // create the trigger record and send it
     313            0 :   for (const auto& t : triggers) {
     314            0 :     send_trigger_record(t);
     315              :   }
     316              : 
     317            0 :   std::chrono::steady_clock::time_point t2 = std::chrono::steady_clock::now();
     318              : 
     319            0 :   std::chrono::duration<double> time_span = std::chrono::duration_cast<std::chrono::duration<double>>(t2 - t1);
     320              : 
     321            0 :   std::ostringstream oss_summ;
     322            0 :   oss_summ << ": Exiting the do_work() method, " << m_trigger_records.size() << " remaining Trigger Records"
     323            0 :            << std::endl
     324            0 :            << "Draining took : " << time_span.count() << " s";
     325            0 :   TLOG() << ProgressUpdate(ERS_HERE, get_name(), oss_summ.str());
     326              : 
     327            0 :   TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting do_work() method";
     328            0 : } // NOLINT(readability/fn_size)
     329              : 
     330              : void
     331            0 : TRBModule::fragments_callback(std::unique_ptr<daqdataformats::Fragment>& temp_fragment)
     332              : {
     333              : 
     334            0 :   auto start_time = std::chrono::steady_clock::now();
     335              : 
     336            0 :   TLOG_DEBUG(TLVL_FRAGMENT_RECEIVE) << get_name() << " Received fragment for trigger/sequence_number "
     337            0 :                                     << temp_fragment->get_trigger_number() << "."
     338            0 :                                     << temp_fragment->get_sequence_number() << " from "
     339            0 :                                     << temp_fragment->get_element_id();
     340              : 
     341            0 :   TriggerId temp_id(*temp_fragment);
     342            0 :   std::vector<TriggerId> complete;
     343            0 :   bool requested = false;
     344              : 
     345            0 :   { // Begin mutex block
     346            0 :     std::lock_guard<std::mutex> lk(m_trigger_records_mutex);
     347            0 :     auto it = m_trigger_records.find(temp_id);
     348              : 
     349            0 :     if (it != m_trigger_records.end()) {
     350              : 
     351              :       // check if the fragment has a Source Id that was desired
     352            0 :       daqdataformats::TriggerRecordHeader& header = it->second.second->get_header_ref();
     353              : 
     354            0 :       for (size_t i = 0; i < header.get_num_requested_components(); ++i) {
     355              : 
     356            0 :         const daqdataformats::ComponentRequest& request = header[i];
     357            0 :         if (request.component == temp_fragment->get_element_id()) {
     358              :           requested = true;
     359              :           break;
     360              :         }
     361              : 
     362              :       } // request loop
     363              : 
     364              :     } // if there is a corresponding trigger ID entry in the boook
     365              : 
     366            0 :     if (requested) {
     367            0 :       it->second.second->add_fragment(std::move(temp_fragment));
     368            0 :       ++m_fragment_counter;
     369            0 :       --m_pending_fragment_counter;
     370              :     } else {
     371            0 :       ers::error(UnexpectedFragment(
     372            0 :         ERS_HERE, temp_id, temp_fragment->get_fragment_type_code(), temp_fragment->get_element_id()));
     373            0 :       ++m_unexpected_fragments;
     374              :     }
     375              : 
     376              :     // Only do bookkeeping periodically
     377            0 :     if (std::chrono::duration_cast<std::chrono::milliseconds>(clock_type::now() - m_last_bookkeeping).count() > 1000) {
     378              :       //-------------------------------------------------
     379              :       // Check if trigger records are complete or timedout
     380              :       // and create dedicated record
     381              :       //--------------------------------------------------
     382            0 :       TLOG_DEBUG(TLVL_BOOKKEEPING) << "Bookeeping status: " << m_trigger_records.size()
     383            0 :                                    << " trigger records in progress ";
     384              : 
     385            0 :       for (const auto& tr : m_trigger_records) {
     386              : 
     387            0 :         auto comp_size = tr.second.second->get_fragments_ref().size();
     388            0 :         auto requ_size = tr.second.second->get_header_ref().get_num_requested_components();
     389            0 :         std::ostringstream message;
     390            0 :         message << tr.first << " with " << comp_size << '/' << requ_size << " components";
     391              : 
     392            0 :         if (comp_size == requ_size) {
     393              : 
     394            0 :           message << ": complete";
     395            0 :           complete.push_back(tr.first);
     396              :         }
     397              : 
     398            0 :         TLOG_DEBUG(TLVL_BOOKKEEPING) << message.str();
     399            0 :         m_last_bookkeeping = clock_type::now();
     400              : 
     401            0 :       } // loop over TRs to check if they are complete
     402            0 :     } else if (requested && it != m_trigger_records.end()) {
     403              : 
     404            0 :       auto comp_size = it->second.second->get_fragments_ref().size();
     405            0 :       auto requ_size = it->second.second->get_header_ref().get_num_requested_components();
     406              : 
     407            0 :       if (comp_size == requ_size) {
     408            0 :         complete.push_back(temp_id);
     409              :       }
     410              :     }
     411            0 :   } // End mutex block
     412              :   //------------------------------------------------
     413              :   // Create TriggerRecords and send them
     414              :   //-----------------------------------------------
     415              : 
     416            0 :   for (const auto& id : complete) {
     417              : 
     418            0 :     send_trigger_record(id);
     419              : 
     420              :   } // loop over compled trigger id
     421              : 
     422            0 :   check_stale_requests();
     423              : 
     424            0 :   ++m_received_fragments;
     425              : 
     426            0 :   auto end_time = std::chrono::steady_clock::now();
     427            0 :   m_fragment_processing_us += std::chrono::duration_cast<std::chrono::microseconds>(end_time - start_time).count();
     428            0 : }
     429              : 
     430              : void
     431            0 : TRBModule::trigger_decision_callback(dfmessages::TriggerDecision& td)
     432              : {
     433              : 
     434            0 :   auto start_time = std::chrono::steady_clock::now();
     435              : 
     436            0 :   if (td.run_number != *m_run_number) {
     437            0 :     ers::error(UnexpectedTriggerDecision(ERS_HERE, td.trigger_number, td.run_number, *m_run_number));
     438            0 :     ++m_unexpected_trigger_decisions;
     439            0 :     return;
     440              :   }
     441              : 
     442            0 :   ++m_received_trigger_decisions;
     443              : 
     444            0 :   create_trigger_records_and_dispatch(td);
     445              :   //check_stale_requests();
     446              : 
     447            0 :   auto end_time = std::chrono::steady_clock::now();
     448            0 :   m_td_processing_us += std::chrono::duration_cast<std::chrono::microseconds>(end_time - start_time).count();
     449              : }
     450              : 
     451              : TRBModule::trigger_record_ptr_t
     452            0 : TRBModule::extract_trigger_record(const TriggerId& id)
     453              : {
     454            0 :   std::unique_lock<std::mutex> lk(m_trigger_records_mutex);
     455            0 :   auto it = m_trigger_records.extract(id);
     456            0 :   m_open_trigger_record_cv.notify_one();
     457            0 :   lk.unlock();
     458              : 
     459            0 :   if (it.empty())
     460            0 :     return nullptr;
     461              : 
     462            0 :   trigger_record_ptr_t temp = std::move(it.mapped().second);
     463              : 
     464            0 :   auto time = clock_type::now();
     465            0 :   auto duration = time - it.mapped().first;
     466              : 
     467            0 :   m_data_waiting_time += std::chrono::duration_cast<duration_type>(duration).count();
     468              : 
     469            0 :   --m_trigger_decisions_counter;
     470            0 :   m_fragment_counter -= temp->get_fragments_ref().size();
     471              : 
     472            0 :   auto missing_fragments = temp->get_header_ref().get_num_requested_components() - temp->get_fragments_ref().size();
     473              : 
     474            0 :   if (missing_fragments > 0) {
     475              : 
     476            0 :     m_lost_fragments += missing_fragments;
     477            0 :     m_pending_fragment_counter -= missing_fragments;
     478            0 :     temp->get_header_ref().set_error_bit(TriggerRecordErrorBits::kIncomplete, true);
     479              : 
     480            0 :     TLOG() << get_name() << " sending incomplete TriggerRecord downstream "
     481            0 :            << (m_stop_requested.load() ? "at Stop time " : "") << "(trigger/run_number=" << id << ", "
     482            0 :            << temp->get_fragments_ref().size() << " of " << temp->get_header_ref().get_num_requested_components()
     483            0 :            << " fragments included)";
     484              :   }
     485              : 
     486            0 :   return temp;
     487            0 : }
     488              : 
     489              : unsigned int
     490            0 : TRBModule::create_trigger_records_and_dispatch(const dfmessages::TriggerDecision& td)
     491              : {
     492              : 
     493            0 :   unsigned int new_tr_counter = 0;
     494              : 
     495              :   // check the whole time window
     496            0 :   daqdataformats::timestamp_t begin = std::numeric_limits<daqdataformats::timestamp_t>::max();
     497            0 :   daqdataformats::timestamp_t end = 0;
     498              : 
     499            0 :   for (const auto& component : td.components) {
     500            0 :     if (component.window_begin < begin)
     501            0 :       begin = component.window_begin;
     502            0 :     if (component.window_end > end)
     503            0 :       end = component.window_end;
     504              :   }
     505              : 
     506            0 :   daqdataformats::timestamp_diff_t tot_width = end - begin;
     507            0 :   daqdataformats::sequence_number_t max_sequence_number =
     508            0 :     (m_max_sequence_length > 0 && tot_width > 0) ? ((tot_width - 1) / m_max_sequence_length) : 0;
     509              : 
     510            0 :   TLOG_DEBUG(TLVL_WORK_STEPS) << get_name() << ": trig_number " << td.trigger_number << ": run_number " << td.run_number
     511            0 :                               << ": trig_timestamp " << td.trigger_timestamp << " will have " << max_sequence_number + 1
     512            0 :                               << " sequences";
     513              : 
     514            0 :   m_trigger_decision_width += tot_width;
     515              : 
     516              :   // create the trigger records
     517            0 :   for (daqdataformats::sequence_number_t sequence = 0; sequence <= max_sequence_number; ++sequence) {
     518              : 
     519            0 :     daqdataformats::timestamp_t slice_begin = begin + sequence * m_max_sequence_length;
     520            0 :     daqdataformats::timestamp_t slice_end =
     521            0 :       m_max_sequence_length > 0 ? std::min(slice_begin + m_max_sequence_length, end) : end;
     522              : 
     523            0 :     TLOG_DEBUG(TLVL_WORK_STEPS) << get_name() << ": trig_number " << td.trigger_number << ", sequence " << sequence
     524            0 :                                 << " ts=" << slice_begin << ":" << slice_end << " (TR " << begin << ":" << end << ")";
     525              : 
     526              :     // create the components cropped in time
     527            0 :     decltype(td.components) slice_components;
     528            0 :     for (const auto& component : td.components) {
     529              : 
     530            0 :       if (component.window_begin > slice_end)
     531            0 :         continue;
     532            0 :       if (component.window_end < slice_begin)
     533            0 :         continue;
     534              : 
     535            0 :       daqdataformats::timestamp_t new_begin = std::max(slice_begin, component.window_begin);
     536            0 :       daqdataformats::timestamp_t new_end = std::min(slice_end, component.window_end);
     537              : 
     538            0 :       daqdataformats::ComponentRequest temp(component.component, new_begin, new_end);
     539            0 :       slice_components.push_back(temp);
     540              : 
     541            0 :       m_data_request_width += new_end - new_begin;
     542              : 
     543              :     } // loop over component in trigger decision
     544              : 
     545              :     // Pleae note that the system could generate empty sequences
     546              :     // The code keeps them.
     547              : 
     548              :     // create the book entry
     549            0 :     TriggerId slice_id(td, sequence);
     550            0 :     {
     551            0 :       std::unique_lock<std::mutex> lk(m_trigger_records_mutex);
     552            0 :       m_open_trigger_record_cv.wait(lk, [&] { return m_trigger_records.size() < m_max_open_trigger_records; });
     553            0 :       auto it = m_trigger_records.find(slice_id);
     554            0 :       if (it != m_trigger_records.end()) {
     555            0 :         ers::error(DuplicatedTriggerDecision(ERS_HERE, slice_id));
     556            0 :         ++m_duplicated_trigger_ids;
     557            0 :         continue;
     558              :       }
     559              : 
     560              :       // create trigger record for the slice
     561            0 :       auto& entry = m_trigger_records[slice_id] = std::make_pair(clock_type::now(), trigger_record_ptr_t());
     562            0 :       trigger_record_ptr_t& trp = entry.second;
     563            0 :       trp.reset(new daqdataformats::TriggerRecord(slice_components));
     564            0 :       daqdataformats::TriggerRecord& tr = *trp;
     565              : 
     566            0 :       tr.get_header_ref().set_trigger_number(td.trigger_number);
     567            0 :       tr.get_header_ref().set_sequence_number(sequence);
     568            0 :       tr.get_header_ref().set_max_sequence_number(max_sequence_number);
     569            0 :       tr.get_header_ref().set_run_number(td.run_number);
     570            0 :       tr.get_header_ref().set_trigger_timestamp(td.trigger_timestamp);
     571            0 :       tr.get_header_ref().set_trigger_type(td.trigger_type);
     572            0 :       tr.get_header_ref().set_element_id(m_this_trb_source_id);
     573              : 
     574            0 :       m_trigger_decisions_counter++;
     575            0 :       m_pending_fragment_counter += slice_components.size();
     576            0 :       ++new_tr_counter;
     577            0 :     }
     578              : 
     579              :     // create and send the requests
     580            0 :     TLOG_DEBUG(TLVL_WORK_STEPS) << get_name() << ": Trigger Decision components: " << td.components.size()
     581            0 :                                 << ", slice components: " << slice_components.size();
     582            0 :     for (const auto& component : slice_components) {
     583              : 
     584            0 :       dfmessages::DataRequest dataReq;
     585            0 :       dataReq.trigger_number = td.trigger_number;
     586            0 :       dataReq.sequence_number = sequence;
     587            0 :       dataReq.run_number = td.run_number;
     588            0 :       dataReq.trigger_timestamp = td.trigger_timestamp;
     589            0 :       dataReq.readout_type = td.readout_type;
     590            0 :       dataReq.request_information = component;
     591            0 :       dataReq.data_destination = m_reply_connection;
     592            0 :       TLOG_DEBUG(TLVL_WORK_STEPS) << get_name() << ": TR " << slice_id << ": trig_timestamp "
     593            0 :                                   << dataReq.trigger_timestamp << ": SourceID " << component.component << ": window ["
     594            0 :                                   << dataReq.request_information.window_begin << ", "
     595            0 :                                   << dataReq.request_information.window_end << ']';
     596              : 
     597            0 :       dispatch_data_requests(std::move(dataReq), component.component);
     598              : 
     599            0 :     } // loop loop over component in the slice
     600              : 
     601            0 :   } // sequence loop
     602              : 
     603            0 :   return new_tr_counter;
     604              : }
     605              : 
     606              : bool
     607            0 : TRBModule::dispatch_data_requests(dfmessages::DataRequest dr, const daqdataformats::SourceID& sid)
     608              : 
     609              : {
     610              : 
     611              :   // find the queue for sourceid_req in the map
     612            0 :   std::unique_lock<std::mutex> lk(m_map_sourceid_connections_mutex);
     613            0 :   std::shared_ptr<data_req_sender_t> sender = nullptr;
     614            0 :   auto it_req = m_map_sourceid_connections.find(sid);
     615            0 :   if (it_req == m_map_sourceid_connections.end() || it_req->second == nullptr) {
     616              : 
     617              :     // if sourceid request is not valid. then print error and continue
     618            0 :     ers::error(
     619            0 :       dunedaq::dfmodules::DRSenderLookupFailed(ERS_HERE, sid, dr.run_number, dr.trigger_number, dr.sequence_number));
     620            0 :     ++m_invalid_requests;
     621            0 :     return false; // lk goes out of scope, is destroyed
     622              :   } else {
     623              :     // get the queue from map element
     624            0 :     sender = it_req->second;
     625              :   }
     626            0 :   lk.unlock();
     627              : 
     628            0 :   if (sender == nullptr) {
     629              :     // if sender lookup failed, report error and continue
     630            0 :     ers::error(
     631            0 :       dunedaq::dfmodules::DRSenderLookupFailed(ERS_HERE, sid, dr.run_number, dr.trigger_number, dr.sequence_number));
     632            0 :     ++m_invalid_requests;
     633            0 :     return false;
     634              :   }
     635              : 
     636              :   bool wasSentSuccessfully = false;
     637            0 :   do {
     638            0 :     TLOG_DEBUG(TLVL_DISPATCH_DATAREQ) << get_name() << ": Pushing the DataRequest from trigger/sequence number "
     639            0 :                                       << dr.trigger_number << "." << dr.sequence_number
     640            0 :                                       << " onto connection :" << sender->get_name();
     641              : 
     642              :     // send data request into the corresponding connection
     643            0 :     try {
     644            0 :       sender->send(std::move(dr), m_dreq_queue_timeout);
     645            0 :       wasSentSuccessfully = true;
     646            0 :       ++m_generated_data_requests;
     647            0 :     } catch (const ers::Issue& excpt) {
     648            0 :       std::ostringstream oss_warn;
     649            0 :       oss_warn << "Send to connection \"" << sender->get_name() << "\" failed";
     650            0 :       ers::warning(iomanager::OperationFailed(ERS_HERE, oss_warn.str(), excpt));
     651            0 :     }
     652            0 :   } while (!wasSentSuccessfully && !m_stop_requested.load());
     653              : 
     654              :   return wasSentSuccessfully;
     655            0 : }
     656              : 
     657              : bool
     658            0 : TRBModule::send_trigger_record(const TriggerId& id)
     659              : {
     660              : 
     661            0 :   trigger_record_ptr_t temp_record(extract_trigger_record(id));
     662              : 
     663              :   // Send to monitoring, if needed
     664              : 
     665            0 :   if (m_mon_receiver) {
     666            0 :     const std::lock_guard<std::mutex> lock(m_mon_mutex);
     667            0 :     auto it = m_mon_requests.begin();
     668            0 :     std::set<std::string> sent_destinations;
     669              : 
     670            0 :     while (it != m_mon_requests.end()) {
     671              :         // Only sent TR to each monitor once
     672            0 :       if (sent_destinations.count(it->data_destination)) {
     673            0 :         ++it;
     674            0 :         continue;
     675              :       }
     676              : 
     677              :       // send TR to mon if correct trigger type
     678            0 :       if ((it->trigger_type_mask & temp_record->get_header_data().trigger_type) != 0) {
     679            0 :         auto iom = iomanager::IOManager::get();
     680              :         bool wasSentSuccessfully = false;
     681            0 :         do {
     682            0 :           try {
     683              :             // HACK to copy the trigger record so we can send it off to monitoring
     684            0 :             auto trigger_record_bytes =
     685            0 :               serialization::serialize(temp_record, serialization::SerializationType::kMsgPack);
     686            0 :             trigger_record_ptr_t record_copy = serialization::deserialize<trigger_record_ptr_t>(trigger_record_bytes);
     687            0 :             iom->get_sender<trigger_record_ptr_t>(it->data_destination)->send(std::move(record_copy), m_tr_queue_timeout);
     688            0 :             ++m_trmon_sent_counter;
     689            0 :             wasSentSuccessfully = true;
     690            0 :           } catch (const ers::Issue& excpt) {
     691            0 :             std::ostringstream oss_warn;
     692            0 :             oss_warn << "Sending TR to connection \"" << it->data_destination << "\" failed";
     693            0 :             ers::warning(iomanager::OperationFailed(ERS_HERE, oss_warn.str(), excpt));
     694            0 :           }
     695            0 :         } while (!m_stop_requested.load() && !wasSentSuccessfully);
     696            0 :         sent_destinations.insert(it->data_destination);
     697            0 :         it = m_mon_requests.erase(it);
     698            0 :       } else {
     699            0 :         ++it;
     700              :       }
     701              :     }
     702            0 :   } // if m_mon_receiver
     703              : 
     704              :   bool wasSentSuccessfully = false;
     705            0 :   do {
     706            0 :     try {
     707            0 :       m_trigger_record_output->send(std::move(temp_record), m_tr_queue_timeout);
     708            0 :       wasSentSuccessfully = true;
     709            0 :       ++m_generated_trigger_records;
     710            0 :     } catch (const ers::Issue& excpt) {
     711            0 :       ers::warning(excpt);
     712            0 :     }
     713            0 :   } while (!m_stop_requested.load() && !wasSentSuccessfully); // push while loop
     714              : 
     715            0 :   if (!wasSentSuccessfully) {
     716            0 :     ++m_abandoned_trigger_records;
     717            0 :     m_lost_fragments += temp_record->get_fragments_ref().size();
     718            0 :     ers::error(dunedaq::dfmodules::AbandonedTriggerDecision(ERS_HERE, id));
     719              :   }
     720              : 
     721            0 :   return wasSentSuccessfully;
     722            0 : }
     723              : 
     724              : bool
     725            0 : TRBModule::check_stale_requests()
     726              : {
     727              : 
     728            0 :   bool book_updates = false;
     729              : 
     730              :   // -----------------------------------------------
     731              :   // optionally send over stale trigger records
     732              :   // -----------------------------------------------
     733              : 
     734            0 :   if (m_trigger_timeout.count() > 0) {
     735              : 
     736            0 :     std::vector<TriggerId> stale_triggers;
     737            0 :     {
     738            0 :       std::lock_guard<std::mutex> lk(m_trigger_records_mutex);
     739            0 :       for (auto it = m_trigger_records.begin(); it != m_trigger_records.end(); ++it) {
     740              : 
     741            0 :         daqdataformats::TriggerRecord& tr = *it->second.second;
     742              : 
     743            0 :         auto tr_time = clock_type::now() - it->second.first;
     744              : 
     745            0 :         if (tr_time > m_trigger_timeout) {
     746              : 
     747            0 :           ers::error(TimedOutTriggerDecision(ERS_HERE, it->first, tr.get_header_ref().get_trigger_timestamp()));
     748              : 
     749              :           // mark trigger record for seding
     750            0 :           stale_triggers.push_back(it->first);
     751            0 :           ++m_timed_out_trigger_records;
     752              : 
     753            0 :           book_updates = true;
     754              :         }
     755              : 
     756              :       } // trigger record loop
     757            0 :     }
     758              :     // create the trigger record and send it
     759            0 :     for (const auto& t : stale_triggers) {
     760            0 :       send_trigger_record(t);
     761              :     }
     762              : 
     763            0 :   } //  m_trigger_timeout > 0
     764              : 
     765            0 :   return book_updates;
     766              : }
     767              : 
     768              : } // namespace dfmodules
     769              : } // namespace dunedaq
     770              : 
     771            0 : DEFINE_DUNE_DAQ_MODULE(dunedaq::dfmodules::TRBModule)
        

Generated by: LCOV version 2.0-1