LCOV - code coverage report
Current view: top level - dfmodules/plugins - DFOModule.hpp (source / functions) Coverage Total Hit
Test: code.result Lines: 92.3 % 13 12
Test Date: 2025-12-21 13:07:08 Functions: 100.0 % 2 2

            Line data    Source code
       1              : /**
       2              :  * @file DFOModule.hpp
       3              :  *
       4              :  * This is part of the DUNE DAQ Software Suite, copyright 2020.
       5              :  * Licensing/copyright details are in the COPYING file that you should have
       6              :  * received with this code.
       7              :  */
       8              : 
       9              : #ifndef DFMODULES_PLUGINS_DATAFLOWORCHESTRATOR_HPP_
      10              : #define DFMODULES_PLUGINS_DATAFLOWORCHESTRATOR_HPP_
      11              : 
      12              : #include "dfmodules/TriggerRecordBuilderData.hpp"
      13              : 
      14              : #include "appmodel/DFOConf.hpp"
      15              : 
      16              : #include "daqdataformats/TriggerRecord.hpp"
      17              : #include "dfmessages/DataRequest.hpp"
      18              : #include "dfmessages/TriggerDecision.hpp"
      19              : #include "dfmessages/TriggerDecisionToken.hpp"
      20              : #include "dfmessages/TriggerInhibit.hpp"
      21              : #include "trgdataformats/TriggerCandidateData.hpp"
      22              : 
      23              : #include "iomanager/Sender.hpp"
      24              : 
      25              : #include "appfwk/DAQModule.hpp"
      26              : #include "logging/Logging.hpp" // NOTE: if ISSUES ARE DECLARED BEFORE include logging/Logging.hpp, TLOG_DEBUG<<issue wont work.
      27              : 
      28              : #include <map>
      29              : #include <memory>
      30              : #include <string>
      31              : #include <utility>
      32              : #include <vector>
      33              : #include <mutex>
      34              : 
      35              : namespace dunedaq {
      36              : 
      37              : // Disable coverage checking LCOV_EXCL_START
      38              : ERS_DECLARE_ISSUE(dfmodules,
      39              :                   TRBModuleAppUpdate,
      40              :                   "TRBModule app " << connection_name << ": " << message,
      41              :                   ((std::string)connection_name)((std::string)message))
      42              : ERS_DECLARE_ISSUE(dfmodules,
      43              :                   UnknownTokenSource,
      44              :                   "Token from unknown source: " << connection_name,
      45              :                   ((std::string)connection_name))
      46              : ERS_DECLARE_ISSUE(dfmodules,
      47              :                   DFOModuleRunNumberMismatch,
      48              :                   "DFOModule encountered run number mismatch: recvd ("
      49              :                     << received_run_number << ") != " << run_number << " from " << src_app << " for trigger_number "
      50              :                     << trig_num,
      51              :                   ((uint32_t)received_run_number)((uint32_t)run_number)((std::string)src_app)(
      52              :                     (uint32_t)trig_num)) // NOLINT(build/unsigned)
      53              : ERS_DECLARE_ISSUE(dfmodules,
      54              :                   IncompleteTriggerDecision,
      55              :                   "TriggerDecision " << trigger_number << " didn't complete within timeout in run " << run_number,
      56              :                   ((uint32_t)trigger_number)((uint32_t)run_number)) // NOLINT(build/unsigned)
      57              : ERS_DECLARE_ISSUE(dfmodules,
      58              :                   UnableToAssign,
      59              :                   "TriggerDecision " << trigger_number << " could not be assigned",
      60              :                   ((uint32_t)trigger_number)) // NOLINT(build/unsigned)
      61              : ERS_DECLARE_ISSUE(dfmodules,
      62              :                   AssignedToBusyApp,
      63              :                   "TriggerDecision " << trigger_number << " was assigned to DF app " << app << " that was busy with "
      64              :                                      << used_slots << " TDs",
      65              :                   ((uint32_t)trigger_number)((std::string)app)((size_t)used_slots)) // NOLINT(build/unsigned)
      66              : // Re-enable coverage checking LCOV_EXCL_STOP
      67              : 
      68              : namespace dfmodules {
      69              : 
      70              : /**
      71              :  * @brief DFOModule distributes triggers according to the
      72              :  * availability of the DF apps in the system
      73              :  */
      74              : class DFOModule : public dunedaq::appfwk::DAQModule
      75              : {
      76              : public:
      77              :   /**
      78              :    * @brief DFOModule Constructor
      79              :    * @param name Instance name for this DFOModule instance
      80              :    */
      81              :   explicit DFOModule(const std::string& name);
      82              : 
      83              :   DFOModule(const DFOModule&) = delete; ///< DFOModule is not copy-constructible
      84              :   DFOModule& operator=(const DFOModule&) =
      85              :     delete;                                                         ///< DFOModule is not copy-assignable
      86              :   DFOModule(DFOModule&&) = delete;            ///< DFOModule is not move-constructible
      87              :   DFOModule& operator=(DFOModule&&) = delete; ///< DFOModule is not move-assignable
      88              : 
      89              :   void init(std::shared_ptr<appfwk::ConfigurationManager> mcfg) override;
      90              : 
      91              : protected:
      92              :   virtual std::shared_ptr<AssignedTriggerDecision> find_slot(const dfmessages::TriggerDecision& decision);
      93              :   // find_slot operates on a round-robin logic
      94              : 
      95              :   using trbd_ptr_t = std::shared_ptr<TriggerRecordBuilderData>;
      96              :   using data_structure_t = std::map<std::string, trbd_ptr_t>;
      97              :   data_structure_t m_dataflow_availability;
      98              :   data_structure_t::iterator m_last_assignement_it;
      99              :   std::function<void(nlohmann::json&)> m_metadata_function;
     100              : 
     101              : private:
     102              :   // Commands
     103              :   void do_conf(const CommandData_t&);
     104              :   void do_start(const CommandData_t&);
     105              :   void do_stop(const CommandData_t&);
     106              :   void do_scrap(const CommandData_t&);
     107              : 
     108              :   void generate_opmon_data() override;
     109              : 
     110              :   virtual void receive_trigger_complete_token(const dfmessages::TriggerDecisionToken&);
     111              :   void receive_trigger_decision(const dfmessages::TriggerDecision&);
     112              :   virtual bool is_busy() const;
     113              :   bool is_empty() const;
     114              :   size_t used_slots() const;
     115              :   void notify_trigger_if_needed() const;
     116              :   bool dispatch(const std::shared_ptr<AssignedTriggerDecision>& assignment);
     117              :   virtual void assign_trigger_decision(const std::shared_ptr<AssignedTriggerDecision>& assignment);
     118              : 
     119              :   // Configuration
     120              :   const appmodel::DFOConf* m_dfo_conf;
     121              :   std::chrono::milliseconds m_queue_timeout;
     122              :   std::chrono::microseconds m_stop_timeout;
     123              :   dunedaq::daqdataformats::run_number_t m_run_number;
     124              : 
     125              :   // Connections
     126              :   std::shared_ptr<iomanager::SenderConcept<dfmessages::TriggerInhibit>> m_busy_sender;
     127              :   std::string m_token_connection;
     128              :   std::string m_td_connection;
     129              :   size_t m_td_send_retries;
     130              :   size_t m_busy_threshold;
     131              :   size_t m_free_threshold;
     132              :   std::vector<std::string> m_trb_conn_ids;
     133              : 
     134              :   // Coordination
     135              :   std::atomic<bool> m_running_status{ false };
     136              :   mutable std::atomic<bool> m_last_notified_busy{ false };
     137              :   std::chrono::steady_clock::time_point m_last_token_received;
     138              :   std::chrono::steady_clock::time_point m_last_td_received;
     139              :   mutable std::mutex m_notify_trigger_mutex;
     140              : 
     141              :   // Struct for statistic
     142              :   struct TriggerData {
     143              :     std::atomic<uint64_t> received{0};
     144              :     std::atomic<uint64_t> completed{0};
     145              :   };
     146              :   static std::set<trgdataformats::TriggerCandidateData::Type>
     147            7 :   unpack_types( decltype(dfmessages::TriggerDecision::trigger_type) t) {
     148            7 :     std::set<trgdataformats::TriggerCandidateData::Type> results;
     149            7 :     if (t == dfmessages::TypeDefaults::s_invalid_trigger_type)
     150              :       return results;
     151            7 :     const std::bitset<64> bits(t);
     152          455 :     for( size_t i = 0; i < bits.size(); ++i ) {
     153          448 :       if ( bits[i] ) results.insert((trgdataformats::TriggerCandidateData::Type)i);
     154              :     }
     155              :     return results;
     156            0 :   }
     157              :   
     158              :   // Statistics
     159              :   std::atomic<uint64_t> m_received_tokens{ 0 };      // NOLINT (build/unsigned)
     160              :   std::atomic<uint64_t> m_sent_decisions{ 0 };       // NOLINT (build/unsigned)
     161              :   std::atomic<uint64_t> m_received_decisions{ 0 };   // NOLINT (build/unsigned)
     162              :   std::atomic<uint64_t> m_waiting_for_decision{ 0 }; // NOLINT (build/unsigned)
     163              :   std::atomic<uint64_t> m_deciding_destination{ 0 }; // NOLINT (build/unsigned)
     164              :   std::atomic<uint64_t> m_forwarding_decision{ 0 };  // NOLINT (build/unsigned)
     165              :   std::atomic<uint64_t> m_waiting_for_token{ 0 };    // NOLINT (build/unsigned)
     166              :   std::atomic<uint64_t> m_processing_token{ 0 };     // NOLINT (build/unsigned)
     167              :   std::map<dunedaq::trgdataformats::TriggerCandidateData::Type, TriggerData> m_trigger_counters;
     168              :   std::mutex m_trigger_counters_mutex;  // used to safely handle the map above
     169            7 :   TriggerData & get_trigger_counter(trgdataformats::TriggerCandidateData::Type type) {
     170            7 :     auto it = m_trigger_counters.find(type);
     171            7 :     if (it != m_trigger_counters.end()) return it->second;
     172              :     
     173            2 :     std::lock_guard<std::mutex> guard(m_trigger_counters_mutex);
     174            2 :     return m_trigger_counters[type];
     175            2 :   }
     176              :   
     177              : };
     178              : } // namespace dfmodules
     179              : } // namespace dunedaq
     180              : 
     181              : #endif // DFMODULES_PLUGINS_DATAFLOWORCHESTRATOR_HPP_
        

Generated by: LCOV version 2.0-1