LCOV - code coverage report
Current view: top level - dfmodules/plugins - DFOModule.cpp (source / functions) Coverage Total Hit
Test: code.result Lines: 89.9 % 308 277
Test Date: 2026-02-16 10:18:04 Functions: 90.9 % 44 40

            Line data    Source code
       1              : /**
       2              :  * @file DFOModule.cpp DFOModule class implementation
       3              :  *
       4              :  * This is part of the DUNE DAQ Software Suite, copyright 2020.
       5              :  * Licensing/copyright details are in the COPYING file that you should have
       6              :  * received with this code.
       7              :  */
       8              : 
       9              : #include "DFOModule.hpp"
      10              : #include "dfmodules/CommonIssues.hpp"
      11              : 
      12              : #include "dfmodules/opmon/DFOModule.pb.h"
      13              : 
      14              : #include "appmodel/DFOModule.hpp"
      15              : #include "confmodel/Connection.hpp"
      16              : #include "iomanager/IOManager.hpp"
      17              : #include "logging/Logging.hpp"
      18              : 
      19              : #include <chrono>
      20              : #include <cstdlib>
      21              : #include <future>
      22              : #include <limits>
      23              : #include <list>
      24              : #include <map>
      25              : #include <memory>
      26              : #include <string>
      27              : #include <thread>
      28              : #include <utility>
      29              : #include <vector>
      30              : 
      31              : /**
      32              :  * @brief Name used by TRACE TLOG calls from this source file
      33              :  */
      34              : #define TRACE_NAME "DFOModule" // NOLINT
      35              : enum
      36              : {
      37              :   TLVL_ENTER_EXIT_METHODS = 5,
      38              :   TLVL_CONFIG = 7,
      39              :   TLVL_WORK_STEPS = 10,
      40              :   TLVL_TRIGDEC_RECEIVED = 21,
      41              :   TLVL_NOTIFY_TRIGGER = 22,
      42              :   TLVL_DISPATCH_TO_TRB = 23,
      43              :   TLVL_TDTOKEN_RECEIVED = 24
      44              : };
      45              : 
      46              : namespace dunedaq::dfmodules {
      47              : 
      48            5 : DFOModule::DFOModule(const std::string& name)
      49              :   : dunedaq::appfwk::DAQModule(name)
      50            5 :   , m_queue_timeout(100)
      51           10 :   , m_run_number(0)
      52              : {
      53            5 :   register_command("conf", &DFOModule::do_conf);
      54            5 :   register_command("start", &DFOModule::do_start);
      55            5 :   register_command("drain_dataflow", &DFOModule::do_stop);
      56            5 :   register_command("scrap", &DFOModule::do_scrap);
      57            5 : }
      58              : 
      59              : void
      60            4 : DFOModule::init(std::shared_ptr<appfwk::ConfigurationManager> mcfg)
      61              : {
      62            4 :   TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering init() method";
      63              : 
      64            4 :   auto mdal = mcfg->get_dal<appmodel::DFOModule>(get_name());
      65            4 :   if (!mdal) {
      66            0 :     throw appfwk::CommandFailed(ERS_HERE, "init", get_name(), "Unable to retrieve configuration object");
      67              :   }
      68            4 :   auto iom = iomanager::IOManager::get();
      69              : 
      70           12 :   for (auto con : mdal->get_inputs()) {
      71            8 :     if (con->get_data_type() == datatype_to_string<dfmessages::TriggerDecisionToken>()) {
      72            4 :       m_token_connection = con->UID();
      73              :     }
      74            8 :     if (con->get_data_type() == datatype_to_string<dfmessages::TriggerDecision>()) {
      75            4 :       m_td_connection = con->UID();
      76              :     }
      77              :   }
      78           12 :   for (auto con : mdal->get_outputs()) {
      79            8 :     if (con->get_data_type() == datatype_to_string<dfmessages::TriggerInhibit>()) {
      80            4 :       m_busy_sender = iom->get_sender<dfmessages::TriggerInhibit>(con->UID());
      81              :     }
      82            8 :     if (con->get_data_type() == datatype_to_string<dfmessages::TriggerDecision>()) {
      83            4 :       m_trb_conn_ids.push_back(con->UID());
      84              :     }
      85              :   }
      86              : 
      87            4 :   if (m_token_connection == "") {
      88            0 :     throw appfwk::MissingConnection(
      89            0 :       ERS_HERE, get_name(), datatype_to_string<dfmessages::TriggerDecisionToken>(), "input");
      90              :   }
      91            4 :   if (m_td_connection == "") {
      92            0 :     throw appfwk::MissingConnection(ERS_HERE, get_name(), datatype_to_string<dfmessages::TriggerDecision>(), "input");
      93              :   }
      94            4 :   if (m_busy_sender == nullptr) {
      95            0 :     throw appfwk::MissingConnection(ERS_HERE, get_name(), datatype_to_string<dfmessages::TriggerInhibit>(), "output");
      96              :   
      97              :   }
      98              : 
      99            4 :   m_dfo_conf = mdal->get_configuration();
     100              :   // these are just tests to check if the connections are ok
     101            4 :   iom->get_receiver<dfmessages::TriggerDecisionToken>(m_token_connection);
     102            4 :   iom->get_receiver<dfmessages::TriggerDecision>(m_td_connection);
     103              : 
     104            4 :   TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting init() method";
     105            4 : }
     106              : 
     107              : void
     108            3 : DFOModule::do_conf(const CommandData_t&)
     109              : {
     110            3 :   TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering do_conf() method";
     111              : 
     112            3 :   m_queue_timeout = std::chrono::milliseconds(m_dfo_conf->get_general_queue_timeout_ms());
     113            3 :   m_stop_timeout = std::chrono::milliseconds(m_dfo_conf->get_stop_timeout_ms());
     114            3 :   m_busy_threshold = m_dfo_conf->get_busy_threshold();
     115            3 :   m_free_threshold = m_dfo_conf->get_free_threshold();
     116              : 
     117            3 :   m_td_send_retries = m_dfo_conf->get_td_send_retries();
     118              : 
     119            3 :   TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting do_conf() method, there are "
     120            3 :                                       << m_dataflow_availability.size() << " TRB apps defined";
     121            3 : }
     122              : 
     123              : void
     124            3 : DFOModule::do_start(const CommandData_t& payload)
     125              : {
     126            3 :   TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering do_start() method";
     127              : 
     128            3 :   m_received_tokens = 0;
     129            3 :   m_run_number = payload.value<dunedaq::daqdataformats::run_number_t>("run", 0);
     130              : 
     131            3 :   m_running_status.store(true);
     132            3 :   m_last_notified_busy.store(false);
     133            3 :   m_last_assignement_it = m_dataflow_availability.end();
     134              : 
     135            3 :   m_last_token_received = m_last_td_received = std::chrono::steady_clock::now();
     136              : 
     137              :   // 19-Dec-2024, KAB: check that TriggerDecision senders are ready to send. This is done
     138              :   // so that the IOManager infrastructure fetches the necessary connection details from
     139              :   // the ConnectivityService at 'start' time, instead of the first time that the sender
     140              :   // is used to send a message.  This avoids delays in the sending of the first TD in
     141              :   // the first data-taking run in a DAQ session. Such delays can lead to undesirable
     142              :   // system behavior like trigger inhibits.
     143            3 :   auto iom = iomanager::IOManager::get();
     144            3 :   if (m_busy_sender != nullptr) {
     145            3 :     bool is_ready = m_busy_sender->is_ready_for_sending(std::chrono::milliseconds(100));
     146            3 :     TLOG_DEBUG(0) << "The sender for TriggerInhibit messages " << (is_ready ? "is" : "is not") << " ready.";
     147              :   }
     148            6 :   for (auto trb_conn : m_trb_conn_ids) {
     149            3 :     auto sender = iom->get_sender<dfmessages::TriggerDecision>(trb_conn);
     150            3 :     if (sender != nullptr) {
     151            3 :       bool is_ready = sender->is_ready_for_sending(std::chrono::milliseconds(100));
     152            3 :       TLOG_DEBUG(0) << "The TriggerDecision sender for " << trb_conn << " " << (is_ready ? "is" : "is not") << " ready.";
     153              :     }
     154            3 :   }
     155            3 :   iom->add_callback<dfmessages::TriggerDecisionToken>(
     156            3 :     m_token_connection, std::bind(&DFOModule::receive_trigger_complete_token, this, std::placeholders::_1));
     157              : 
     158            3 :   iom->add_callback<dfmessages::TriggerDecision>(
     159            3 :     m_td_connection, std::bind(&DFOModule::receive_trigger_decision, this, std::placeholders::_1));
     160              : 
     161            3 :   TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting do_start() method";
     162            3 : }
     163              : 
     164              : void
     165            3 : DFOModule::do_stop(const CommandData_t& /*args*/)
     166              : {
     167            3 :   TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering do_stop() method";
     168              : 
     169            3 :   m_running_status.store(false);
     170              : 
     171            3 :   auto iom = iomanager::IOManager::get();
     172            3 :   iom->remove_callback<dfmessages::TriggerDecision>(m_td_connection);
     173              : 
     174            3 :   const int wait_steps = 20;
     175            3 :   auto step_timeout = m_stop_timeout / wait_steps;
     176            3 :   int step_counter = 0;
     177            3 :   while (!is_empty() && step_counter < wait_steps) {
     178            0 :     TLOG() << get_name() << ": stop delayed while waiting for " << used_slots() << " TDs to completed";
     179            0 :     std::this_thread::sleep_for(step_timeout);
     180            0 :     ++step_counter;
     181              :   }
     182              : 
     183            3 :   iom->remove_callback<dfmessages::TriggerDecisionToken>(m_token_connection);
     184              : 
     185            3 :   std::list<std::shared_ptr<AssignedTriggerDecision>> remnants;
     186            5 :   for (auto& app : m_dataflow_availability) {
     187            2 :     auto temp = app.second->flush();
     188            2 :     for (auto& td : temp) {
     189            0 :       remnants.push_back(td);
     190              :     }
     191            2 :   }
     192              : 
     193            3 :   for (auto& r : remnants) {
     194            0 :     ers::error(IncompleteTriggerDecision(ERS_HERE, r->decision.trigger_number, m_run_number));
     195              :   }
     196              : 
     197            3 :   std::lock_guard<std::mutex> guard(m_trigger_counters_mutex);
     198            3 :   m_trigger_counters.clear();
     199              :   
     200            6 :   TLOG() << get_name() << " successfully stopped";
     201            3 :   TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting do_stop() method";
     202            3 : }
     203              : 
     204              : void
     205            3 : DFOModule::do_scrap(const CommandData_t& /*args*/)
     206              : {
     207            3 :   TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering do_scrap() method";
     208              : 
     209            3 :   m_dataflow_availability.clear();
     210              : 
     211            6 :   TLOG() << get_name() << " successfully scrapped";
     212            3 :   TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting do_scrap() method";
     213            3 : }
     214              : 
     215              : void
     216            5 : DFOModule::receive_trigger_decision(const dfmessages::TriggerDecision& decision)
     217              : {
     218            5 :   TLOG_DEBUG(TLVL_TRIGDEC_RECEIVED) << get_name() << " Received TriggerDecision for trigger_number "
     219            0 :                                     << decision.trigger_number << " and run " << decision.run_number
     220            5 :                                     << " (current run is " << m_run_number << ")";
     221            5 :   if (decision.run_number != m_run_number) {
     222            1 :     ers::error(DFOModuleRunNumberMismatch(
     223            2 :       ERS_HERE, decision.run_number, m_run_number, "MLT", decision.trigger_number));
     224            1 :     return;
     225              :   }
     226              : 
     227            4 :   auto decision_received = std::chrono::steady_clock::now();
     228            4 :   ++m_received_decisions;
     229            4 :   auto trigger_types = unpack_types(decision.trigger_type);
     230            8 :   for ( const auto t : trigger_types ) {
     231            4 :     ++get_trigger_counter(t).received;
     232              :   }
     233              :   
     234            4 :   std::chrono::steady_clock::time_point decision_assigned;
     235            4 :   do {
     236              : 
     237            4 :     auto assignment = find_slot(decision);
     238              : 
     239            4 :     if (assignment == nullptr) { // this can happen if all application are in error state
     240            0 :       ers::error(UnableToAssign(ERS_HERE, decision.trigger_number));
     241            0 :       usleep(500);
     242            0 :       notify_trigger_if_needed();
     243            0 :       continue;
     244              :     }
     245              : 
     246            4 :     TLOG_DEBUG(TLVL_TRIGDEC_RECEIVED) << get_name() << " Slot found for trigger_number " << decision.trigger_number
     247            0 :                                       << " on connection " << assignment->connection_name
     248            4 :                                       << ", number of used slots is " << used_slots();
     249            4 :     decision_assigned = std::chrono::steady_clock::now();
     250            4 :     auto dispatch_successful = dispatch(assignment);
     251              : 
     252            4 :     if (dispatch_successful) {
     253            3 :       assign_trigger_decision(assignment);
     254            3 :       TLOG_DEBUG(TLVL_TRIGDEC_RECEIVED) << get_name() << " Assigned trigger_number " << decision.trigger_number
     255            3 :                                         << " to connection " << assignment->connection_name;
     256            3 :       break;
     257              :     } else {
     258            2 :       ers::error(
     259            2 :         TRBModuleAppUpdate(ERS_HERE, assignment->connection_name, "Could not send Trigger Decision"));
     260            1 :       m_dataflow_availability[assignment->connection_name]->set_in_error(true);
     261              :     }
     262              : 
     263            5 :   } while (m_running_status.load());
     264              : 
     265            4 :   notify_trigger_if_needed();
     266              : 
     267            4 :   m_waiting_for_decision +=
     268            4 :     std::chrono::duration_cast<std::chrono::microseconds>(decision_received - m_last_td_received).count();
     269            4 :   m_last_td_received = std::chrono::steady_clock::now();
     270            4 :   m_deciding_destination +=
     271            4 :     std::chrono::duration_cast<std::chrono::microseconds>(decision_assigned - decision_received).count();
     272            4 :   m_forwarding_decision +=
     273            4 :     std::chrono::duration_cast<std::chrono::microseconds>(m_last_td_received - decision_assigned).count();
     274            4 : }
     275              : 
     276              : std::shared_ptr<AssignedTriggerDecision>
     277            4 : DFOModule::find_slot(const dfmessages::TriggerDecision& decision)
     278              : {
     279              : 
     280              :   // this find_slot assings the decision with a round-robin logic
     281              :   // across all the available applications.
     282              :   // Applications in error are skipped.
     283              :   // we only probe the applications once.
     284              :   // if they are all unavailable the assignment is set to
     285              :   // the application with the lowest used slots
     286              :   // returning a nullptr will be considered as an error
     287              :   // from the upper level code
     288              : 
     289            4 :   std::shared_ptr<AssignedTriggerDecision> output = nullptr;
     290            4 :   auto minimum_occupied = m_dataflow_availability.end();
     291            4 :   size_t minimum = std::numeric_limits<size_t>::max();
     292            4 :   unsigned int counter = 0;
     293              : 
     294            4 :   auto candidate_it = m_last_assignement_it;
     295            4 :   if (candidate_it == m_dataflow_availability.end())
     296            2 :     candidate_it = m_dataflow_availability.begin();
     297              : 
     298            8 :   while (output == nullptr && counter < m_dataflow_availability.size()) {
     299              : 
     300            4 :     ++counter;
     301            4 :     ++candidate_it;
     302            4 :     if (candidate_it == m_dataflow_availability.end())
     303            4 :       candidate_it = m_dataflow_availability.begin();
     304              : 
     305              :     // get rid of the applications in error state
     306            4 :     if (candidate_it->second->is_in_error()) {
     307            0 :       continue;
     308              :     }
     309              : 
     310              :     // monitor
     311            4 :     auto slots = candidate_it->second->used_slots();
     312            4 :     if (slots < minimum) {
     313            4 :       minimum = slots;
     314            4 :       minimum_occupied = candidate_it;
     315              :     }
     316              : 
     317            4 :     if (candidate_it->second->is_busy())
     318            1 :       continue;
     319              : 
     320            3 :     output = candidate_it->second->make_assignment(decision);
     321            3 :     m_last_assignement_it = candidate_it;
     322              :   }
     323              : 
     324            4 :   if (!output) {
     325              :     // in this case all applications were busy
     326              :     // so we assign the decision to that with the lowest
     327              :     // number of assignments
     328            1 :     if (minimum_occupied != m_dataflow_availability.end()) {
     329            1 :       output = minimum_occupied->second->make_assignment(decision);
     330            1 :       m_last_assignement_it = minimum_occupied;
     331            1 :       ers::warning(AssignedToBusyApp(ERS_HERE, decision.trigger_number, minimum_occupied->first, minimum));
     332              :     }
     333              :   }
     334              : 
     335            4 :   if (output != nullptr) {
     336            4 :     TLOG_DEBUG(TLVL_WORK_STEPS) << "Assigned TriggerDecision with trigger number " << decision.trigger_number
     337            4 :                                 << " to TRB at connection " << output->connection_name;
     338              :   }
     339            4 :   return output;
     340            0 : }
     341              : 
     342              : void
     343            6 : DFOModule::generate_opmon_data() 
     344              : {
     345              : 
     346            6 :   opmon::DFOInfo info;
     347            6 :   info.set_tokens_received( m_received_tokens.exchange(0) );
     348            6 :   info.set_decisions_sent(m_sent_decisions.exchange(0));
     349            6 :   info.set_decisions_received(m_received_decisions.exchange(0));
     350            6 :   info.set_waiting_for_decision(m_waiting_for_decision.exchange(0));
     351            6 :   info.set_deciding_destination(m_deciding_destination.exchange(0));
     352            6 :   info.set_forwarding_decision(m_forwarding_decision.exchange(0));
     353            6 :   info.set_waiting_for_token(m_waiting_for_token.exchange(0));
     354            6 :   info.set_processing_token(m_processing_token.exchange(0));
     355            6 :   publish( std::move(info) );
     356              : 
     357            6 :   std::lock_guard<std::mutex>     guard(m_trigger_counters_mutex);
     358            9 :   for ( auto & [type, counts] : m_trigger_counters ) {
     359            3 :     opmon::TriggerInfo ti;
     360            3 :     ti.set_received(counts.received.exchange(0));
     361            3 :     ti.set_completed(counts.completed.exchange(0));
     362            3 :     auto name = dunedaq::trgdataformats::get_trigger_candidate_type_names()[type];
     363            6 :     publish( std::move(ti), {{"type", name}} );
     364            3 :    }
     365            9 : }
     366              : 
     367              : void
     368            9 : DFOModule::receive_trigger_complete_token(const dfmessages::TriggerDecisionToken& token)
     369              : {
     370            9 :   if (token.run_number == 0 && token.trigger_number == 0) {
     371            2 :     if (m_dataflow_availability.count(token.decision_destination) == 0) {
     372            2 :       TLOG_DEBUG(TLVL_CONFIG) << "Creating dataflow availability struct for uid " << token.decision_destination;
     373            2 :       auto entry = m_dataflow_availability[token.decision_destination] =
     374            4 :         std::make_shared<TriggerRecordBuilderData>(token.decision_destination, m_busy_threshold, m_free_threshold);
     375            2 :       register_node(token.decision_destination, entry);
     376            2 :     } else {
     377            0 :       TLOG() << TRBModuleAppUpdate(ERS_HERE, token.decision_destination, "Has reconnected");
     378            0 :       auto app_it = m_dataflow_availability.find(token.decision_destination);
     379            0 :       app_it->second->set_in_error(false);
     380              :     }
     381            5 :     return;
     382              :   }
     383              : 
     384            7 :   TLOG_DEBUG(TLVL_TDTOKEN_RECEIVED) << get_name() << " Received TriggerDecisionToken for trigger_number "
     385            0 :                                     << token.trigger_number << " and run " << token.run_number
     386            7 :                                     << " (current run is " << m_run_number << ")";
     387              :   // add a check to see if the application data found
     388            7 :   if (token.run_number != m_run_number) {
     389            2 :     std::ostringstream oss_source;
     390            2 :     oss_source << "TRB at connection " << token.decision_destination;
     391            2 :     ers::error(DFOModuleRunNumberMismatch(
     392            4 :       ERS_HERE, token.run_number, m_run_number, oss_source.str(), token.trigger_number));
     393            2 :     return;
     394            2 :   }
     395              : 
     396            5 :   auto app_it = m_dataflow_availability.find(token.decision_destination);
     397              :   // check if application data exists;
     398            5 :   if (app_it == m_dataflow_availability.end()) {
     399            1 :     ers::error(UnknownTokenSource(ERS_HERE, token.decision_destination));
     400            1 :     return;
     401              :   }
     402              : 
     403            4 :   ++m_received_tokens;
     404            4 :   auto callback_start = std::chrono::steady_clock::now();
     405              : 
     406            4 :   try {
     407            4 :     auto dec_ptr = app_it->second->complete_assignment(token.trigger_number, m_metadata_function);
     408            3 :     auto trigger_types = unpack_types(dec_ptr->decision.trigger_type);
     409            6 :     for ( const auto t : trigger_types ) ++ get_trigger_counter(t).completed;
     410            4 :   } catch (AssignedTriggerDecisionNotFound const& err) {
     411            1 :     ers::error(err);
     412            1 :   }
     413              : 
     414            4 :   if (app_it->second->is_in_error()) {
     415            0 :     TLOG() << TRBModuleAppUpdate(ERS_HERE, token.decision_destination, "Has reconnected");
     416            0 :     app_it->second->set_in_error(false);
     417              :   }
     418              : 
     419            4 :   notify_trigger_if_needed();
     420              : 
     421            4 :   m_waiting_for_token +=
     422            4 :     std::chrono::duration_cast<std::chrono::microseconds>(callback_start - m_last_token_received).count();
     423            4 :   m_last_token_received = std::chrono::steady_clock::now();
     424            4 :   m_processing_token +=
     425            4 :     std::chrono::duration_cast<std::chrono::microseconds>(m_last_token_received - callback_start).count();
     426              : }
     427              : 
     428              : bool
     429            8 : DFOModule::is_busy() const
     430              : {
     431           12 :   for (auto& dfapp : m_dataflow_availability) {
     432            8 :     if (!dfapp.second->is_busy())
     433            4 :       return false;
     434              :   }
     435            4 :   return true;
     436              : }
     437              : 
     438              : bool
     439            3 : DFOModule::is_empty() const
     440              : {
     441            5 :   for (auto& dfapp : m_dataflow_availability) {
     442            2 :     if (dfapp.second->used_slots() != 0)
     443            0 :       return false;
     444              :   }
     445            3 :   return true;
     446              : }
     447              : 
     448              : size_t
     449            0 : DFOModule::used_slots() const
     450              : {
     451            0 :   size_t total = 0;
     452            0 :   for (auto& dfapp : m_dataflow_availability) {
     453            0 :     total += dfapp.second->used_slots();
     454              :   }
     455            0 :   return total;
     456              : }
     457              : 
     458              : void
     459            8 : DFOModule::notify_trigger_if_needed() const
     460              : {
     461              :   // 19-Dec-2024, KAB, ELF, MaR: combined the is_busy() and notify_trigger() calls in
     462              :   // a single method (notify_trigger_if_needed), and protected the contents of the new
     463              :   // method with a mutex, to avoid a race condition in which a given is_busy() result
     464              :   // is determined, but by the time that the value is sent to the MLT, the busy state
     465              :   // has changed.
     466            8 :   std::lock_guard<std::mutex> guard(m_notify_trigger_mutex);
     467              : 
     468            8 :   bool busy = is_busy();
     469            8 :   if (busy == m_last_notified_busy.load())
     470            5 :     return;
     471              : 
     472              :   bool wasSentSuccessfully = false;
     473              : 
     474            3 :   do {
     475            3 :     try {
     476            3 :       dfmessages::TriggerInhibit message{ busy, m_run_number };
     477            3 :       m_busy_sender->send(std::move(message), m_queue_timeout);
     478            2 :       wasSentSuccessfully = true;
     479            2 :       TLOG_DEBUG(TLVL_NOTIFY_TRIGGER) << get_name() << " Sent BUSY status " << busy << " to trigger in run "
     480            2 :                                       << m_run_number;
     481            1 :     } catch (const ers::Issue& excpt) {
     482            1 :       std::ostringstream oss_warn;
     483            1 :       oss_warn << "Send with sender \"" << m_busy_sender->get_name() << "\" failed";
     484            1 :       ers::warning(iomanager::OperationFailed(ERS_HERE, oss_warn.str(), excpt));
     485            1 :     }
     486              : 
     487            5 :   } while (!wasSentSuccessfully && m_running_status.load());
     488              : 
     489            3 :   m_last_notified_busy.store(busy);
     490            8 : }
     491              : 
     492              : bool
     493            4 : DFOModule::dispatch(const std::shared_ptr<AssignedTriggerDecision>& assignment)
     494              : {
     495              : 
     496            4 :   TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering dispatch() method. assignment->connection_name: "
     497            4 :                                       << assignment->connection_name;
     498              : 
     499            4 :   bool wasSentSuccessfully = false;
     500            4 :   int retries = m_td_send_retries;
     501            4 :   auto iom = iomanager::IOManager::get();
     502            4 :   do {
     503              : 
     504            4 :     try {
     505            4 :       auto decision_copy = dfmessages::TriggerDecision(assignment->decision);
     506            8 :       iom->get_sender<dfmessages::TriggerDecision>(assignment->connection_name)
     507            4 :         ->send(std::move(decision_copy), m_queue_timeout);
     508            3 :       wasSentSuccessfully = true;
     509            3 :       ++m_sent_decisions;
     510            3 :       TLOG_DEBUG(TLVL_DISPATCH_TO_TRB) << get_name() << " Sent TriggerDecision for trigger_number "
     511            0 :                                        << decision_copy.trigger_number << " to TRB at connection "
     512            3 :                                        << assignment->connection_name << " for run number " << decision_copy.run_number;
     513            5 :     } catch (const ers::Issue& excpt) {
     514            1 :       std::ostringstream oss_warn;
     515            1 :       oss_warn << "Send to connection \"" << assignment->connection_name << "\" failed";
     516            1 :       ers::warning(iomanager::OperationFailed(ERS_HERE, oss_warn.str(), excpt));
     517            1 :     }
     518              : 
     519            4 :     retries--;
     520              : 
     521            4 :   } while (!wasSentSuccessfully && m_running_status.load() && retries > 0);
     522              : 
     523            4 :   TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting dispatch() method";
     524            4 :   return wasSentSuccessfully;
     525            4 : }
     526              : 
     527              : void
     528            3 : DFOModule::assign_trigger_decision(const std::shared_ptr<AssignedTriggerDecision>& assignment)
     529              : {
     530            3 :   m_dataflow_availability[assignment->connection_name]->add_assignment(assignment);
     531            3 : }
     532              : 
     533              : } // namespace dunedaq::dfmodules
     534              : 
     535            5 : DEFINE_DUNE_DAQ_MODULE(dunedaq::dfmodules::DFOModule)
        

Generated by: LCOV version 2.0-1