LCOV - code coverage report
Current view: top level - trigger/plugins - MLTModule.cpp (source / functions) Coverage Total Hit
Test: code.result Lines: 0.0 % 265 0
Test Date: 2025-12-21 13:07:08 Functions: 0.0 % 56 0

            Line data    Source code
       1              : /**
       2              :  * @file MLTModule.cpp MLTModule class
       3              :  * implementation
       4              :  *
       5              :  * This is part of the DUNE DAQ Software Suite, copyright 2020.
       6              :  * Licensing/copyright details are in the COPYING file that you should have
       7              :  * received with this code.
       8              :  */
       9              : 
      10              : /**
      11              :  * TODO: get_group_links
      12              :  * TODO: get_mandatory_links
      13              :  */
      14              : 
      15              : #include "MLTModule.hpp"
      16              : 
      17              : #include "trigger/Issues.hpp"
      18              : #include "trigger/LivetimeCounter.hpp"
      19              : 
      20              : #include "daqdataformats/ComponentRequest.hpp"
      21              : #include "dfmessages/TriggerDecision.hpp"
      22              : #include "dfmessages/TriggerInhibit.hpp"
      23              : #include "dfmessages/Types.hpp"
      24              : #include "iomanager/IOManager.hpp"
      25              : #include "logging/Logging.hpp"
      26              : #include "trgdataformats/Types.hpp"
      27              : 
      28              : namespace dunedaq {
      29              : namespace trigger {
      30              : 
      31            0 : MLTModule::MLTModule(const std::string& name)
      32              :   : DAQModule(name)
      33            0 :   , m_last_trigger_number(1)
      34            0 :   , m_run_number(0)
      35              : {
      36              :   // clang-format off
      37            0 :   register_command("conf",   &MLTModule::do_configure);
      38            0 :   register_command("start",  &MLTModule::do_start);
      39            0 :   register_command("stop",   &MLTModule::do_stop);
      40            0 :   register_command("disable_triggers",  &MLTModule::do_pause);
      41            0 :   register_command("enable_triggers", &MLTModule::do_resume);
      42            0 :   register_command("scrap",  &MLTModule::do_scrap);
      43              :   // clang-format on
      44            0 : }
      45              : 
      46              : std::map<std::string, int>
      47            0 : MLTModule::decode_geoid(uint64_t _geoid_int)
      48              : {
      49              : 
      50            0 :   std::map<std::string, int> geoid;
      51              : 
      52              :   // Extract stream_id (stored in the top 16 bits)
      53            0 :   geoid["stream_id"] = (_geoid_int >> 48) & 0xFFFF;
      54              : 
      55              :   // Extract slot_id (stored in the next 16 bits)
      56            0 :   geoid["slot_id"] = (_geoid_int >> 32) & 0xFFFF;
      57              : 
      58              :   // Extract crate_id (stored in the next 16 bits)
      59            0 :   geoid["crate_id"] = (_geoid_int >> 16) & 0xFFFF;
      60              : 
      61              :   // Extract det_id (stored in the lowest 16 bits)
      62            0 :   geoid["detector_id"] = _geoid_int & 0xFFFF;
      63              : 
      64            0 :   return geoid;
      65            0 : }
      66              : 
      67              : void
      68            0 : MLTModule::init(std::shared_ptr<appfwk::ConfigurationManager> mcfg)
      69              : {
      70            0 :   TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering init() method";
      71              : 
      72            0 :   m_mtrg = mcfg->get_dal<appmodel::MLTModule>(get_name());
      73              :   // Get the session to access the detector configuration
      74            0 :   m_session = mcfg->get_session();
      75              : 
      76            0 :   TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting init() method";
      77            0 : }
      78              : 
      79              : void
      80            0 : MLTModule::do_configure(const CommandData_t& /*obj*/)
      81              : {
      82            0 :   TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering conf() method";
      83              : 
      84              :   // Get the inputs
      85            0 :   for (auto con : m_mtrg->get_inputs()) {
      86            0 :     if (con->get_data_type() == datatype_to_string<dfmessages::TriggerDecision>()) {
      87            0 :         m_decision_input = get_iom_receiver<dfmessages::TriggerDecision>(con->UID());
      88            0 :     } else if (con->get_data_type() == datatype_to_string<dfmessages::TriggerInhibit>()) {
      89            0 :       m_inhibit_input = get_iom_receiver<dfmessages::TriggerInhibit>(con->UID());
      90              :     }
      91              :   }
      92              : 
      93              :   // Get the outputs
      94            0 :   for (auto con : m_mtrg->get_outputs()) {
      95            0 :     if (con->get_data_type() == datatype_to_string<dfmessages::TriggerDecision>())
      96            0 :       m_decision_output = get_iom_sender<dfmessages::TriggerDecision>(con->UID());
      97              :   }
      98              : 
      99            0 :   hdf5libs::HDF5SourceIDHandler::source_id_geo_id_map_t geoidmap =
     100            0 :     hdf5libs::HDF5SourceIDHandler::make_source_id_geo_id_map(m_session);
     101              :   // Fill the SourceID -- Subdetector map
     102            0 :   for (auto const& [sourceid, geoids] : geoidmap) {
     103            0 :     TLOG() << "SourceID: " << sourceid;
     104            0 :     for (auto const& geoid : geoids) {
     105            0 :       std::map<std::string, int> gidmap = decode_geoid(geoid);
     106            0 :       SubdetectorID detid = static_cast<SubdetectorID>(gidmap["detector_id"]);
     107            0 :       if (m_srcid_detid_map.contains(sourceid) && !(m_srcid_detid_map[sourceid] == detid)) {
     108            0 :         throw MLTConfigurationProblem(
     109            0 :           ERS_HERE, get_name(), "Multiple subdetector types for one SourceID not supported in trigger system!");
     110              :       }
     111            0 :       m_srcid_detid_map[sourceid] = detid;
     112            0 :     }
     113            0 :     TLOG() << " * Subdetector type: " << m_srcid_detid_map[sourceid];
     114              :   }
     115              : 
     116            0 :   for (auto subdet_readout_window : m_mtrg->get_configuration()->get_subdetector_readout_map()) {
     117            0 :     std::string subdetector_name = subdet_readout_window->get_subdetector();
     118            0 :     SubdetectorID detid = dunedaq::detdataformats::DetID::string_to_subdetector(subdetector_name);
     119            0 :     if (detid == detdataformats::DetID::Subdetector::kUnknown) {
     120            0 :       throw MLTConfigurationProblem(
     121            0 :         ERS_HERE, get_name(), "Unknown Subdetector supplied to MLT subdetector-readout window map");
     122              :     }
     123              : 
     124            0 :     if (m_subdetector_readout_window_map.count(detid)) {
     125            0 :       throw MLTConfigurationProblem(
     126            0 :         ERS_HERE,
     127            0 :         get_name(),
     128            0 :         "Supplied more than one of the same Subdetector name to MLT subdetector-readout window map");
     129              :     }
     130              : 
     131            0 :     m_subdetector_readout_window_map[detid] =
     132            0 :       std::make_pair(subdet_readout_window->get_time_before(), subdet_readout_window->get_time_after());
     133              : 
     134            0 :     TLOG() << "[MLT] Custom readout map for subdetector: " << detid
     135            0 :            << " time_start: " << subdet_readout_window->get_time_before()
     136            0 :            << " time_after: " << subdet_readout_window->get_time_after();
     137            0 :   }
     138              : 
     139              :   // Latency related
     140            0 :   m_latency_monitoring.store(m_mtrg->get_configuration()->get_latency_monitoring());
     141              : 
     142              :   // Now do the configuration: dummy for now
     143            0 :   m_configured_flag.store(true);
     144              : 
     145            0 :   TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting conf() method";
     146            0 : }
     147              : 
     148              : void
     149            0 : MLTModule::do_scrap(const CommandData_t& /*obj*/)
     150              : {
     151            0 :   TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering scrap() method";
     152              : 
     153            0 :   m_decision_input.reset();
     154            0 :   m_decision_output.reset();
     155            0 :   m_inhibit_input.reset();
     156              : 
     157            0 :   m_srcid_detid_map.clear();
     158            0 :   m_subdetector_readout_window_map.clear();
     159            0 :   m_trigger_counters.clear();
     160              : 
     161            0 :   TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting scrap() method";
     162            0 : }
     163              : 
     164              : void
     165            0 : MLTModule::generate_opmon_data()
     166              : {
     167            0 :   opmon::ModuleLevelTriggerInfo info;
     168              : 
     169            0 :   info.set_td_msg_received_count(m_td_msg_received_count.load());
     170            0 :   info.set_td_sent_count(m_td_sent_count.load());
     171            0 :   info.set_td_inhibited_count(m_td_inhibited_count.load());
     172            0 :   info.set_td_paused_count(m_td_paused_count.load());
     173            0 :   info.set_td_queue_timeout_expired_err_count(m_td_queue_timeout_expired_err_count.load());
     174            0 :   info.set_td_total_count(m_td_total_count.load());
     175              : 
     176            0 :   if (m_lc_started) {
     177            0 :     info.set_lc_klive(m_livetime_counter->get_time(LivetimeCounter::State::kLive));
     178            0 :     info.set_lc_kpaused(m_livetime_counter->get_time(LivetimeCounter::State::kPaused));
     179            0 :     info.set_lc_kdead(m_livetime_counter->get_time(LivetimeCounter::State::kDead));
     180              :   } else {
     181            0 :     info.set_lc_klive(m_lc_kLive);
     182            0 :     info.set_lc_kpaused(m_lc_kPaused);
     183            0 :     info.set_lc_kdead(m_lc_kDead);
     184              :   }
     185              : 
     186            0 :   this->publish(std::move(info));
     187              : 
     188              :   // per TC type
     189            0 :   std::lock_guard<std::mutex> guard(m_trigger_mutex);
     190            0 :   for (auto& [type, counts] : m_trigger_counters) {
     191            0 :     auto name = dunedaq::trgdataformats::get_trigger_candidate_type_names()[type];
     192            0 :     opmon::TriggerDecisionInfo td_info;
     193            0 :     td_info.set_received(counts.received.exchange(0));
     194            0 :     td_info.set_sent(counts.sent.exchange(0));
     195            0 :     td_info.set_failed_send(counts.failed_send.exchange(0));
     196            0 :     td_info.set_paused(counts.paused.exchange(0));
     197            0 :     td_info.set_inhibited(counts.inhibited.exchange(0));
     198            0 :     this->publish(std::move(td_info), { { "type", name } });
     199            0 :   }
     200              : 
     201              :   // latency
     202            0 :   if (m_latency_monitoring.load() && m_running_flag.load()) {
     203              :     // TC in, TD out
     204            0 :     opmon::TriggerLatency lat_info;
     205            0 :     lat_info.set_latency_in(m_latency_instance.get_latency_in());
     206            0 :     lat_info.set_latency_out(m_latency_instance.get_latency_out());
     207            0 :     this->publish(std::move(lat_info));
     208              : 
     209              :     // vs readout window requests
     210            0 :     opmon::ModuleLevelTriggerRequestLatency lat_request_info;
     211            0 :     lat_request_info.set_latency_window_start(m_latency_requests_instance.get_latency_in());
     212            0 :     lat_request_info.set_latency_window_end(m_latency_requests_instance.get_latency_out());
     213            0 :     this->publish(std::move(lat_request_info));
     214            0 :   }
     215            0 : }
     216              : 
     217              : void
     218            0 : MLTModule::do_start(const CommandData_t& startobj)
     219              : {
     220            0 :   TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering start() method";
     221              : 
     222            0 :   m_run_number = startobj.value<dunedaq::daqdataformats::run_number_t>("run", 0);
     223              :   // We get here at start of run, so reset the trigger number
     224            0 :   m_last_trigger_number = 1;
     225              : 
     226              :   // OpMon.
     227            0 :   m_td_msg_received_count.store(0);
     228            0 :   m_td_sent_count.store(0);
     229            0 :   m_td_total_count.store(0);
     230              :   // OpMon DFO
     231            0 :   m_td_inhibited_count.store(0);
     232            0 :   m_td_paused_count.store(0);
     233            0 :   m_td_queue_timeout_expired_err_count.store(0);
     234              :   // OpMon Livetime counter
     235            0 :   m_lc_kLive.store(0);
     236            0 :   m_lc_kPaused.store(0);
     237            0 :   m_lc_kDead.store(0);
     238              : 
     239            0 :   m_paused.store(true);
     240            0 :   m_running_flag.store(true);
     241            0 :   m_dfo_is_busy.store(false);
     242              : 
     243            0 :   m_livetime_counter.reset(new LivetimeCounter(LivetimeCounter::State::kPaused));
     244            0 :   m_lc_started = true;
     245              : 
     246            0 :   m_inhibit_input->add_callback(std::bind(&MLTModule::dfo_busy_callback, this, std::placeholders::_1));
     247            0 :   m_decision_input->add_callback(std::bind(&MLTModule::trigger_decisions_callback, this, std::placeholders::_1));
     248              : 
     249            0 :   ers::info(TriggerStartOfRun(ERS_HERE, m_run_number));
     250              : 
     251            0 :   TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting start() method";
     252            0 : }
     253              : 
     254              : void
     255            0 : MLTModule::do_stop(const CommandData_t& /*stopobj*/)
     256              : {
     257            0 :   TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering stop() method";
     258              : 
     259            0 :   m_running_flag.store(false);
     260            0 :   m_decision_input->remove_callback();
     261            0 :   m_inhibit_input->remove_callback();
     262              :   // m_send_trigger_decisions_thread.join();
     263            0 :   m_lc_kLive_count = m_livetime_counter->get_time(LivetimeCounter::State::kLive);
     264            0 :   m_lc_kPaused_count = m_livetime_counter->get_time(LivetimeCounter::State::kPaused);
     265            0 :   m_lc_kDead_count = m_livetime_counter->get_time(LivetimeCounter::State::kDead);
     266            0 :   m_lc_kLive = m_lc_kLive_count;
     267            0 :   m_lc_kPaused = m_lc_kPaused_count;
     268            0 :   m_lc_kDead = m_lc_kDead_count;
     269              : 
     270            0 :   m_lc_deadtime = m_livetime_counter->get_time(LivetimeCounter::State::kDead) +
     271            0 :                   m_livetime_counter->get_time(LivetimeCounter::State::kPaused);
     272              : 
     273            0 :   TLOG(3) << "LivetimeCounter - total deadtime+paused: " << m_lc_deadtime << std::endl;
     274            0 :   m_livetime_counter.reset(); // Calls LivetimeCounter dtor?
     275            0 :   m_lc_started = false;
     276              : 
     277            0 :   print_opmon_stats();
     278              : 
     279            0 :   ers::info(TriggerEndOfRun(ERS_HERE, m_run_number));
     280              : 
     281            0 :   TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting stop() method";
     282            0 : }
     283              : 
     284              : void
     285            0 : MLTModule::do_pause(const CommandData_t& /*pauseobj*/)
     286              : {
     287            0 :   TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering pause() method";
     288              : 
     289            0 :   m_paused.store(true);
     290            0 :   m_livetime_counter->set_state(LivetimeCounter::State::kPaused);
     291            0 :   TLOG() << "******* Triggers PAUSED! in run " << m_run_number << " *********";
     292            0 :   ers::info(TriggerPaused(ERS_HERE));
     293            0 :   TLOG_DEBUG(5) << "TS End: "
     294            0 :                 << std::chrono::duration_cast<std::chrono::microseconds>(
     295            0 :                      std::chrono::system_clock::now().time_since_epoch())
     296            0 :                      .count();
     297              : 
     298            0 :   TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting pause() method";
     299            0 : }
     300              : 
     301              : void
     302            0 : MLTModule::do_resume(const CommandData_t& /*resumeobj*/)
     303              : {
     304            0 :   TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering resume() method";
     305              : 
     306            0 :   ers::info(TriggerActive(ERS_HERE));
     307            0 :   TLOG() << "******* Triggers RESUMED! in run " << m_run_number << " *********";
     308            0 :   m_livetime_counter->set_state(LivetimeCounter::State::kLive);
     309            0 :   m_lc_started = true;
     310            0 :   m_paused.store(false);
     311            0 :   TLOG_DEBUG(5) << "TS Start: "
     312            0 :                 << std::chrono::duration_cast<std::chrono::microseconds>(
     313            0 :                      std::chrono::system_clock::now().time_since_epoch())
     314            0 :                      .count();
     315              : 
     316            0 :   TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting resume() method";
     317            0 : }
     318              : 
     319              : void
     320            0 : MLTModule::trigger_decisions_callback(dfmessages::TriggerDecision& decision)
     321              : {
     322            0 :   m_td_msg_received_count++;
     323            0 :   if (m_latency_monitoring.load())
     324            0 :     m_latency_instance.update_latency_in(decision.trigger_timestamp);
     325              : 
     326            0 :   auto trigger_types = unpack_types(decision.trigger_type);
     327            0 :   for (const auto t : trigger_types) {
     328            0 :     ++get_trigger_counter(t).received;
     329              :   }
     330              : 
     331            0 :   auto ts = decision.trigger_timestamp;
     332            0 :   auto tt = decision.trigger_type;
     333            0 :   decision.run_number = m_run_number;
     334            0 :   decision.trigger_number = m_last_trigger_number;
     335              : 
     336              :   // Overwrite the component's readout window if we have custom
     337              :   // subdetector--readout window map
     338            0 :   for (const auto& [subdetectorid, window] : m_subdetector_readout_window_map) {
     339            0 :     for (auto& request : decision.components) {
     340            0 :       if (request.component.subsystem != daqdataformats::SourceID::Subsystem::kDetectorReadout) {
     341            0 :         continue;
     342              :       }
     343              : 
     344            0 :       if (subdetectorid != m_srcid_detid_map[request.component]) {
     345            0 :         continue;
     346              :       }
     347              : 
     348            0 :       request.window_begin = decision.trigger_timestamp - window.first;
     349            0 :       request.window_end = decision.trigger_timestamp + window.second;
     350              :     }
     351              :   }
     352              : 
     353            0 :   TLOG_DEBUG(2) << "Received decision with timestamp " << decision.trigger_timestamp;
     354              : 
     355            0 :   if ((!m_paused.load() && !m_dfo_is_busy.load())) {
     356            0 :     TLOG_DEBUG(1) << "Sending a decision with triggernumber " << decision.trigger_number << " timestamp "
     357            0 :                   << decision.trigger_timestamp << " start " << decision.components.front().window_begin << " end "
     358            0 :                   << decision.components.front().window_end << " number of links " << decision.components.size();
     359              : 
     360              :     // readout window latency update
     361              :     // TODO: The latency will be different for different components, since they might have different readout windows
     362            0 :     if (m_latency_monitoring.load()) {
     363            0 :       m_latency_requests_instance.update_latency_in(decision.components.front().window_begin);
     364            0 :       m_latency_requests_instance.update_latency_out(decision.components.front().window_end);
     365              :     }
     366              : 
     367            0 :     try {
     368            0 :       m_decision_output->send(std::move(decision), std::chrono::milliseconds(1));
     369            0 :       m_td_sent_count++;
     370              : 
     371            0 :       for (const auto t : trigger_types) {
     372            0 :         ++get_trigger_counter(t).sent;
     373              :       }
     374              : 
     375            0 :       m_last_trigger_number++;
     376              :       // add_td(pending_td);
     377            0 :     } catch (const ers::Issue& e) {
     378            0 :       ers::error(e);
     379            0 :       TLOG_DEBUG(1) << "The network is misbehaving: TD send failed for " << m_last_trigger_number;
     380            0 :       m_td_queue_timeout_expired_err_count++;
     381              : 
     382            0 :       for (const auto t : trigger_types) {
     383            0 :         ++get_trigger_counter(t).failed_send;
     384              :       }
     385            0 :     }
     386              : 
     387            0 :   } else if (m_paused.load()) {
     388            0 :     ++m_td_paused_count;
     389            0 :     for (const auto t : trigger_types) {
     390            0 :       ++get_trigger_counter(t).paused;
     391              :     }
     392              : 
     393            0 :     TLOG_DEBUG(1) << "Triggers are paused. Not sending a TriggerDecision for TD with timestamp and type " << ts << "/"
     394            0 :                   << tt;
     395              :   } else {
     396            0 :     ers::warning(TriggerInhibited(ERS_HERE, m_run_number));
     397            0 :     TLOG_DEBUG(1) << "The DFO is busy. Not sending a TriggerDecision with timestamp and type " << ts << "/" << tt;
     398            0 :     m_td_inhibited_count++;
     399            0 :     for (const auto t : trigger_types) {
     400            0 :       ++get_trigger_counter(t).inhibited;
     401              :     }
     402              :   }
     403            0 :   if (m_latency_monitoring.load())
     404            0 :     m_latency_instance.update_latency_out(decision.trigger_timestamp);
     405            0 :   m_td_total_count++;
     406            0 : }
     407              : 
     408              : void
     409            0 : MLTModule::dfo_busy_callback(dfmessages::TriggerInhibit& inhibit)
     410              : {
     411            0 :   TLOG_DEBUG(17) << "Received inhibit message with busy status " << inhibit.busy << " and run number "
     412            0 :                  << inhibit.run_number;
     413            0 :   if (inhibit.run_number == m_run_number) {
     414            0 :     TLOG_DEBUG(18) << "Changing our flag for the DFO busy state from " << m_dfo_is_busy.load() << " to "
     415            0 :                    << inhibit.busy;
     416            0 :     m_dfo_is_busy = inhibit.busy;
     417            0 :     LivetimeCounter::State state = (inhibit.busy) ? LivetimeCounter::State::kDead : LivetimeCounter::State::kLive;
     418            0 :     m_livetime_counter->set_state(state);
     419              :   }
     420            0 : }
     421              : 
     422              : void
     423            0 : MLTModule::print_opmon_stats()
     424              : {
     425            0 :   TLOG() << "MLT opmon counters summary:";
     426            0 :   TLOG() << "------------------------------";
     427            0 :   TLOG() << "Received TD messages: \t\t" << m_td_msg_received_count;
     428            0 :   TLOG() << "Sent TDs: \t\t\t" << m_td_sent_count;
     429            0 :   TLOG() << "Inhibited TDs: \t\t" << m_td_inhibited_count;
     430            0 :   TLOG() << "Paused TDs: \t\t\t" << m_td_paused_count;
     431            0 :   TLOG() << "Queue timeout TDs: \t\t" << m_td_queue_timeout_expired_err_count;
     432            0 :   TLOG() << "Total TDs: \t\t\t" << m_td_total_count;
     433            0 :   TLOG() << "------------------------------";
     434            0 :   TLOG() << "Livetime::Live: \t" << m_lc_kLive;
     435            0 :   TLOG() << "Livetime::Paused: \t" << m_lc_kPaused;
     436            0 :   TLOG() << "Livetime::Dead: \t" << m_lc_kDead;
     437            0 :   TLOG();
     438            0 : }
     439              : 
     440              : } // namespace trigger
     441              : } // namespace dunedaq
     442              : 
     443            0 : DEFINE_DUNE_DAQ_MODULE(dunedaq::trigger::MLTModule)
     444              : 
     445              : // Local Variables:
     446              : // c-basic-offset: 2
     447              : // End:
        

Generated by: LCOV version 2.0-1