LCOV - code coverage report
Current view: top level - dfmodules/plugins - TRBModule.hpp (source / functions) Coverage Total Hit
Test: code.result Lines: 0.0 % 27 0
Test Date: 2026-02-16 10:18:04 Functions: 0.0 % 60 0

            Line data    Source code
       1              : /**
       2              :  * @file TRBModule.hpp
       3              :  *
       4              :  * This is part of the DUNE DAQ Software Suite, copyright 2020.
       5              :  * Licensing/copyright details are in the COPYING file that you should have
       6              :  * received with this code.
       7              :  */
       8              : 
       9              : #ifndef DFMODULES_PLUGINS_TRIGGERRECORDBUILDER_HPP_
      10              : #define DFMODULES_PLUGINS_TRIGGERRECORDBUILDER_HPP_
      11              : 
      12              : #include "appmodel/TRBConf.hpp"
      13              : #include "daqdataformats/Fragment.hpp"
      14              : #include "daqdataformats/SourceID.hpp"
      15              : #include "daqdataformats/TriggerRecord.hpp"
      16              : #include "daqdataformats/Types.hpp"
      17              : #include "appmodel/ReadoutApplication.hpp"
      18              : #include "appmodel/SmartDaqApplication.hpp"
      19              : #include "dfmessages/DataRequest.hpp"
      20              : #include "dfmessages/TRMonRequest.hpp"
      21              : #include "dfmessages/TriggerDecision.hpp"
      22              : #include "dfmessages/Types.hpp"
      23              : 
      24              : #include "appfwk/DAQModule.hpp"
      25              : #include "utilities/WorkerThread.hpp"
      26              : #include "iomanager/Sender.hpp"
      27              : #include "iomanager/Receiver.hpp"
      28              : #include "logging/Logging.hpp" // NOTE: if ISSUES ARE DECLARED BEFORE include logging/Logging.hpp, TLOG_DEBUG<<issue wont work.
      29              : 
      30              : #include "dfmodules/opmon/TRBModule.pb.h"
      31              : 
      32              : #include <chrono>
      33              : #include <list>
      34              : #include <map>
      35              : #include <memory>
      36              : #include <mutex>
      37              : #include <string>
      38              : #include <tuple>
      39              : #include <utility>
      40              : #include <vector>
      41              : 
      42              : namespace dunedaq {
      43              : 
      44              : namespace dfmodules {
      45              : 
      46              : /**
      47              :  * @brief TriggerId is a little class that defines a unique identifier for a
      48              :  * trigger decision/record It also provides an operator < to be used by map to
      49              :  * optimise bookkeeping
      50              :  */
      51              : struct TriggerId
      52              : {
      53              : 
      54              :   TriggerId() = default;
      55              : 
      56            0 :   explicit TriggerId(const dfmessages::TriggerDecision& td,
      57              :                      daqdataformats::sequence_number_t s = daqdataformats::TypeDefaults::s_invalid_sequence_number)
      58            0 :     : trigger_number(td.trigger_number)
      59            0 :     , sequence_number(s)
      60            0 :     , run_number(td.run_number)
      61              :   {
      62            0 :     ;
      63            0 :   }
      64            0 :   explicit TriggerId(daqdataformats::Fragment& f)
      65            0 :     : trigger_number(f.get_trigger_number())
      66            0 :     , sequence_number(f.get_sequence_number())
      67            0 :     , run_number(f.get_run_number())
      68              :   {
      69            0 :     ;
      70            0 :   }
      71              : 
      72              :   daqdataformats::trigger_number_t trigger_number;
      73              :   daqdataformats::sequence_number_t sequence_number;
      74              :   daqdataformats::run_number_t run_number;
      75              : 
      76            0 :   bool operator<(const TriggerId& other) const noexcept
      77              :   {
      78            0 :     return std::tuple(trigger_number, sequence_number, run_number) <
      79            0 :            std::tuple(other.trigger_number, other.sequence_number, other.run_number);
      80              :   }
      81              : 
      82            0 :   friend std::ostream& operator<<(std::ostream& out, const TriggerId& id) noexcept
      83              :   {
      84            0 :     out << id.trigger_number << '-' << id.sequence_number << '/' << id.run_number;
      85            0 :     return out;
      86              :   }
      87              : 
      88            0 :   friend TraceStreamer& operator<<(TraceStreamer& out, const TriggerId& id) noexcept
      89              :   {
      90            0 :     return out << id.trigger_number << '.' << id.sequence_number << "/" << id.run_number;
      91              :   }
      92              : 
      93              :   friend std::istream& operator>>(std::istream& in, TriggerId& id)
      94              :   {
      95              :     char t1, t2;
      96              :     in >> id.trigger_number >> t1 >> id.sequence_number >> t2 >> id.run_number;
      97              :     return in;
      98              :   }
      99              : };
     100              : 
     101              : } // namespace dfmodules
     102              : 
     103              : /**
     104              :  * @brief Unexpected trigger decision
     105              :  */
     106            0 : ERS_DECLARE_ISSUE(dfmodules,                 ///< Namespace
     107              :                   UnexpectedTriggerDecision, ///< Issue class name
     108              :                   "Unexpected Trigger Decisions: " << trigger << '/' << decision_run << " while in run " << current_run,
     109              :                   ((daqdataformats::trigger_number_t)trigger)  ///< Message parameters
     110              :                   ((daqdataformats::run_number_t)decision_run) ///< Message parameters
     111              :                   ((daqdataformats::run_number_t)current_run)  ///< Message parameters
     112              : )
     113              : 
     114              : /**
     115              :  * @brief Timed out Trigger Decision
     116              :  */
     117            0 : ERS_DECLARE_ISSUE(dfmodules,               ///< Namespace
     118              :                   TimedOutTriggerDecision, ///< Issue class name
     119              :                   "trigger id: " << trigger_id << " generate at: " << trigger_timestamp << " timed out", ///< Message
     120              :                   ((dfmodules::TriggerId)trigger_id)               ///< Message parameters
     121              :                   ((daqdataformats::timestamp_t)trigger_timestamp) ///< Message parameters
     122              : )
     123              : 
     124              : /**
     125              :  * @brief Unexpected fragment
     126              :  */
     127            0 : ERS_DECLARE_ISSUE(dfmodules,          ///< Namespace
     128              :                   UnexpectedFragment, ///< Issue class name
     129              :                   "Unexpected Fragment for triggerID " << trigger_id << ", type " << fragment_type << ", " << source_id,
     130              :                   ((dfmodules::TriggerId)trigger_id)               ///< Message parameters
     131              :                   ((daqdataformats::fragment_type_t)fragment_type) ///< Message parameters
     132              :                   ((daqdataformats::SourceID)source_id)                  ///< Message parameters
     133              : )
     134              : 
     135              : /**
     136              :  * @brief Duplicate trigger decision
     137              :  */
     138            0 : ERS_DECLARE_ISSUE(dfmodules,                 ///< Namespace
     139              :                   DuplicatedTriggerDecision, ///< Issue class name
     140              :                   "Duplicated trigger ID " << trigger_id,
     141              :                   ((dfmodules::TriggerId)trigger_id) ///< Message parameters
     142              : )
     143              : 
     144              : /**
     145              :  * @brief Abandoned TR
     146              :  */
     147            0 : ERS_DECLARE_ISSUE(dfmodules,                ///< Namespace
     148              :                   AbandonedTriggerDecision, ///< Issue class name
     149              :                   "trigger ID " << trigger_id << " could not be sent to writing and it's lost",
     150              :                   ((dfmodules::TriggerId)trigger_id) ///< Message parameters
     151              : )
     152              : 
     153              : /**
     154              :  * @brief Incomplete TR
     155              :  */
     156            0 : ERS_DECLARE_ISSUE(dfmodules,                ///< Namespace
     157              :                   IncompleteTriggerRecord , ///< Issue class name
     158              :                   "sending incomplete TriggerRecord downstream " << optional_stop_time_phrase << " (trigger/run_number=" << id << ", " << num_frags_present << " of " << num_components_requested << " fragments included)",
     159              :                   ((std::string)optional_stop_time_phrase)((dfmodules::TriggerId)id)((int)num_frags_present)((int)num_components_requested) ///< Message parameters
     160              : )
     161              : 
     162              : /**
     163              :  * @brief Missing connection ID
     164              :  */
     165            0 : ERS_DECLARE_ISSUE(dfmodules,           ///< Namespace
     166              :                   MissingConnectionID, ///< Issue class name
     167              :                   "No connection ID was found for connection name \"" << conn_name
     168              :                   << "\" in the conn_ref list that was provided at 'init' time.",
     169              :                   ((std::string)conn_name)                   ///< Message parameters
     170              : )
     171              : 
     172              : namespace dfmodules {
     173              : 
     174              : /**
     175              :  * @brief TRBModule is the Module that collects Trigger
     176              :  TriggersDecisions, sends the corresponding data requests and collects Fragment
     177              :  to form a complete Trigger Record. The TR then sent out possibly to a write
     178              :  module
     179              : */
     180              : class TRBModule : public dunedaq::appfwk::DAQModule
     181              : {
     182              : public:
     183              :   /**
     184              :    * @brief TRBModule Constructor
     185              :    * @param name Instance name for this TRBModule instance
     186              :    */
     187              :   explicit TRBModule(const std::string& name);
     188              : 
     189              :   TRBModule(const TRBModule&) = delete; ///< TRBModule is not copy-constructible
     190              :   TRBModule& operator=(const TRBModule&) =
     191              :     delete;                                                         ///< TRBModule is not copy-assignable
     192              :   TRBModule(TRBModule&&) = delete;            ///< TRBModule is not move-constructible
     193              :   TRBModule& operator=(TRBModule&&) = delete; ///< TRBModule is not move-assignable
     194              : 
     195              :   void init(std::shared_ptr<appfwk::ConfigurationManager> mcfg) override;
     196              : 
     197              :   void generate_opmon_data() override;
     198              : 
     199              : protected:
     200              :   using trigger_decision_receiver_t = iomanager::ReceiverConcept<dfmessages::TriggerDecision>;
     201              :   using data_req_sender_t = iomanager::SenderConcept<dfmessages::DataRequest>;
     202              :   using fragment_receiver_t = iomanager::ReceiverConcept<std::unique_ptr<daqdataformats::Fragment>>;
     203              : 
     204              :   using trigger_record_ptr_t = std::unique_ptr<daqdataformats::TriggerRecord>;
     205              :   using trigger_record_sender_t = iomanager::SenderConcept<trigger_record_ptr_t>;
     206              : 
     207              :   void trigger_decision_callback(dfmessages::TriggerDecision& td);
     208              :   void fragments_callback(std::unique_ptr<daqdataformats::Fragment>& frag);
     209              : 
     210              :   trigger_record_ptr_t extract_trigger_record(const TriggerId&);
     211              :   // build_trigger_record will allocate memory and then orphan it to the caller
     212              :   // via the returned pointer Plese note that the method will destroy the memory
     213              :   // saved in the bookkeeping map
     214              : 
     215              :   unsigned int create_trigger_records_and_dispatch(const dfmessages::TriggerDecision&);
     216              : 
     217              :   bool dispatch_data_requests(dfmessages::DataRequest,
     218              :                               const daqdataformats::SourceID&);
     219              : 
     220              :   bool send_trigger_record(const TriggerId&);
     221              :   // this creates a trigger record and send it
     222              : 
     223              :   bool check_stale_requests();
     224              :   // it returns true when there are changes in the book = a TR timed out
     225              : 
     226              :   void flush_trigger_records();
     227              : 
     228              : private:
     229              :   // Commands
     230              :   void do_conf(const CommandData_t&);
     231              :   void do_scrap(const CommandData_t&);
     232              :   void do_start(const CommandData_t&);
     233              :   void do_stop(const CommandData_t&);
     234              : 
     235              :   // Monitoring callback
     236              :   void tr_requested(const dfmessages::TRMonRequest &);
     237              : 
     238              :   // Threading
     239              :   std::atomic<bool> m_stop_requested;
     240              : 
     241              :   // Configuration
     242              :   const appmodel::TRBConf* m_trb_conf;
     243              :   std::chrono::milliseconds m_tr_queue_timeout;
     244              :   std::chrono::milliseconds m_dreq_queue_timeout;
     245              :   std::string m_reply_connection;
     246              :   size_t m_max_open_trigger_records;
     247              :   daqdataformats::SourceID m_this_trb_source_id;
     248              : 
     249              :   // Input Connections
     250              :   std::shared_ptr<trigger_decision_receiver_t> m_trigger_decision_input;
     251              :   std::shared_ptr<fragment_receiver_t> m_fragment_input;
     252              : 
     253              :   // Output connections
     254              :   std::shared_ptr<trigger_record_sender_t> m_trigger_record_output;
     255              :   mutable std::mutex m_map_sourceid_connections_mutex;
     256              :   std::map<daqdataformats::SourceID, std::shared_ptr<data_req_sender_t>> m_map_sourceid_connections; ///< Mappinng between SourceID and connections
     257              : 
     258              :   // bookeeping
     259              :   using clock_type = std::chrono::steady_clock;
     260              :   std::mutex m_trigger_records_mutex;
     261              :   clock_type::time_point m_last_bookkeeping{};
     262              :   std::map<TriggerId, std::pair<clock_type::time_point, trigger_record_ptr_t>> m_trigger_records;
     263              :   std::condition_variable m_open_trigger_record_cv;
     264              : 
     265              :   // Data request properties
     266              :   daqdataformats::timestamp_diff_t m_max_sequence_length;
     267              : 
     268              :   // Run information
     269              :   std::unique_ptr<const daqdataformats::run_number_t> m_run_number = nullptr;
     270              : 
     271              :   // Monitoring related variables
     272              :   std::mutex m_mon_mutex;
     273              :   std::shared_ptr<iomanager::ReceiverConcept<dfmessages::TRMonRequest>> m_mon_receiver;
     274              :   std::list<dfmessages::TRMonRequest> m_mon_requests;
     275              : 
     276              :   // book related metrics
     277              :   using metric_counter_type = uint64_t; // decltype(triggerrecordbuilderinfo::Info::pending_trigger_decisions);
     278              :   mutable std::atomic<metric_counter_type> m_trigger_decisions_counter = { 0 }; // currently
     279              :   mutable std::atomic<metric_counter_type> m_fragment_counter = { 0 };          // currently
     280              :   mutable std::atomic<metric_counter_type> m_pending_fragment_counter = { 0 };  // currently
     281              : 
     282              :   mutable std::atomic<metric_counter_type> m_timed_out_trigger_records = { 0 };    // in the run
     283              :   mutable std::atomic<metric_counter_type> m_unexpected_fragments = { 0 };         // in the run
     284              :   mutable std::atomic<metric_counter_type> m_unexpected_trigger_decisions = { 0 }; // in the run
     285              :   mutable std::atomic<metric_counter_type> m_lost_fragments = { 0 };               // in the run
     286              :   mutable std::atomic<metric_counter_type> m_invalid_requests = { 0 };             // in the run
     287              :   mutable std::atomic<metric_counter_type> m_duplicated_trigger_ids = { 0 };       // in the run
     288              :   mutable std::atomic<metric_counter_type> m_abandoned_trigger_records = { 0 };    // in the run
     289              : 
     290              :   mutable std::atomic<metric_counter_type> m_received_trigger_decisions = { 0 }; // in between calls
     291              :   mutable std::atomic<metric_counter_type> m_received_fragments = { 0 };         // in between calls
     292              :   mutable std::atomic<metric_counter_type> m_generated_trigger_records = { 0 };  // in between calls
     293              :   mutable std::atomic<metric_counter_type> m_generated_data_requests = { 0 };    // in between calls
     294              :   mutable std::atomic<metric_counter_type> m_data_waiting_time = { 0 };          // in between calls
     295              :   mutable std::atomic<metric_counter_type> m_trigger_decision_width = { 0 };     // in between calls
     296              :   mutable std::atomic<metric_counter_type> m_data_request_width = { 0 };         // in between calls
     297              :   mutable std::atomic<metric_counter_type> m_td_processing_us = { 0 };           // in between calls
     298              :   mutable std::atomic<metric_counter_type> m_fragment_processing_us = { 0 };     // in between calls
     299              : 
     300              :   
     301              :   mutable std::atomic<metric_counter_type> m_trmon_request_counter = { 0 };
     302              :   mutable std::atomic<metric_counter_type> m_trmon_sent_counter = { 0 };
     303              : 
     304              :   // time thresholds
     305              :   using duration_type = std::chrono::microseconds;
     306              :   duration_type m_old_trigger_threshold;
     307              :   duration_type m_trigger_timeout;
     308              : };
     309              : } // namespace dfmodules
     310              : } // namespace dunedaq
     311              : 
     312              : #endif // DFMODULES_PLUGINS_TRIGGERRECORDBUILDER_HPP_
        

Generated by: LCOV version 2.0-1