LCOV - code coverage report
Current view: top level - dfmodules/plugins - TRBModule.hpp (source / functions) Coverage Total Hit
Test: code.result Lines: 0.0 % 26 0
Test Date: 2025-12-21 13:07:08 Functions: 0.0 % 52 0

            Line data    Source code
       1              : /**
       2              :  * @file TRBModule.hpp
       3              :  *
       4              :  * This is part of the DUNE DAQ Software Suite, copyright 2020.
       5              :  * Licensing/copyright details are in the COPYING file that you should have
       6              :  * received with this code.
       7              :  */
       8              : 
       9              : #ifndef DFMODULES_PLUGINS_TRIGGERRECORDBUILDER_HPP_
      10              : #define DFMODULES_PLUGINS_TRIGGERRECORDBUILDER_HPP_
      11              : 
      12              : #include "appmodel/TRBConf.hpp"
      13              : #include "daqdataformats/Fragment.hpp"
      14              : #include "daqdataformats/SourceID.hpp"
      15              : #include "daqdataformats/TriggerRecord.hpp"
      16              : #include "daqdataformats/Types.hpp"
      17              : #include "appmodel/ReadoutApplication.hpp"
      18              : #include "appmodel/SmartDaqApplication.hpp"
      19              : #include "dfmessages/DataRequest.hpp"
      20              : #include "dfmessages/TRMonRequest.hpp"
      21              : #include "dfmessages/TriggerDecision.hpp"
      22              : #include "dfmessages/Types.hpp"
      23              : 
      24              : #include "appfwk/DAQModule.hpp"
      25              : #include "utilities/WorkerThread.hpp"
      26              : #include "iomanager/Sender.hpp"
      27              : #include "iomanager/Receiver.hpp"
      28              : #include "logging/Logging.hpp" // NOTE: if ISSUES ARE DECLARED BEFORE include logging/Logging.hpp, TLOG_DEBUG<<issue wont work.
      29              : 
      30              : #include "dfmodules/opmon/TRBModule.pb.h"
      31              : 
      32              : #include <chrono>
      33              : #include <list>
      34              : #include <map>
      35              : #include <memory>
      36              : #include <mutex>
      37              : #include <string>
      38              : #include <tuple>
      39              : #include <utility>
      40              : #include <vector>
      41              : 
      42              : namespace dunedaq {
      43              : 
      44              : namespace dfmodules {
      45              : 
      46              : /**
      47              :  * @brief TriggerId is a little class that defines a unique identifier for a
      48              :  * trigger decision/record It also provides an operator < to be used by map to
      49              :  * optimise bookkeeping
      50              :  */
      51              : struct TriggerId
      52              : {
      53              : 
      54              :   TriggerId() = default;
      55              : 
      56            0 :   explicit TriggerId(const dfmessages::TriggerDecision& td,
      57              :                      daqdataformats::sequence_number_t s = daqdataformats::TypeDefaults::s_invalid_sequence_number)
      58            0 :     : trigger_number(td.trigger_number)
      59            0 :     , sequence_number(s)
      60            0 :     , run_number(td.run_number)
      61              :   {
      62            0 :     ;
      63            0 :   }
      64            0 :   explicit TriggerId(daqdataformats::Fragment& f)
      65            0 :     : trigger_number(f.get_trigger_number())
      66            0 :     , sequence_number(f.get_sequence_number())
      67            0 :     , run_number(f.get_run_number())
      68              :   {
      69            0 :     ;
      70            0 :   }
      71              : 
      72              :   daqdataformats::trigger_number_t trigger_number;
      73              :   daqdataformats::sequence_number_t sequence_number;
      74              :   daqdataformats::run_number_t run_number;
      75              : 
      76            0 :   bool operator<(const TriggerId& other) const noexcept
      77              :   {
      78            0 :     return std::tuple(trigger_number, sequence_number, run_number) <
      79            0 :            std::tuple(other.trigger_number, other.sequence_number, other.run_number);
      80              :   }
      81              : 
      82            0 :   friend std::ostream& operator<<(std::ostream& out, const TriggerId& id) noexcept
      83              :   {
      84            0 :     out << id.trigger_number << '-' << id.sequence_number << '/' << id.run_number;
      85            0 :     return out;
      86              :   }
      87              : 
      88            0 :   friend TraceStreamer& operator<<(TraceStreamer& out, const TriggerId& id) noexcept
      89              :   {
      90            0 :     return out << id.trigger_number << '.' << id.sequence_number << "/" << id.run_number;
      91              :   }
      92              : 
      93              :   friend std::istream& operator>>(std::istream& in, TriggerId& id)
      94              :   {
      95              :     char t1, t2;
      96              :     in >> id.trigger_number >> t1 >> id.sequence_number >> t2 >> id.run_number;
      97              :     return in;
      98              :   }
      99              : };
     100              : 
     101              : } // namespace dfmodules
     102              : 
     103              : /**
     104              :  * @brief Unexpected trigger decision
     105              :  */
     106            0 : ERS_DECLARE_ISSUE(dfmodules,                 ///< Namespace
     107              :                   UnexpectedTriggerDecision, ///< Issue class name
     108              :                   "Unexpected Trigger Decisions: " << trigger << '/' << decision_run << " while in run " << current_run,
     109              :                   ((daqdataformats::trigger_number_t)trigger)  ///< Message parameters
     110              :                   ((daqdataformats::run_number_t)decision_run) ///< Message parameters
     111              :                   ((daqdataformats::run_number_t)current_run)  ///< Message parameters
     112              : )
     113              : 
     114              : /**
     115              :  * @brief Timed out Trigger Decision
     116              :  */
     117            0 : ERS_DECLARE_ISSUE(dfmodules,               ///< Namespace
     118              :                   TimedOutTriggerDecision, ///< Issue class name
     119              :                   "trigger id: " << trigger_id << " generate at: " << trigger_timestamp << " timed out", ///< Message
     120              :                   ((dfmodules::TriggerId)trigger_id)               ///< Message parameters
     121              :                   ((daqdataformats::timestamp_t)trigger_timestamp) ///< Message parameters
     122              : )
     123              : 
     124              : /**
     125              :  * @brief Unexpected fragment
     126              :  */
     127            0 : ERS_DECLARE_ISSUE(dfmodules,          ///< Namespace
     128              :                   UnexpectedFragment, ///< Issue class name
     129              :                   "Unexpected Fragment for triggerID " << trigger_id << ", type " << fragment_type << ", " << source_id,
     130              :                   ((dfmodules::TriggerId)trigger_id)               ///< Message parameters
     131              :                   ((daqdataformats::fragment_type_t)fragment_type) ///< Message parameters
     132              :                   ((daqdataformats::SourceID)source_id)                  ///< Message parameters
     133              : )
     134              : 
     135              : /**
     136              :  * @brief Duplicate trigger decision
     137              :  */
     138            0 : ERS_DECLARE_ISSUE(dfmodules,                 ///< Namespace
     139              :                   DuplicatedTriggerDecision, ///< Issue class name
     140              :                   "Duplicated trigger ID " << trigger_id,
     141              :                   ((dfmodules::TriggerId)trigger_id) ///< Message parameters
     142              : )
     143              : 
     144              : /**
     145              :  * @brief Abandoned TR
     146              :  */
     147            0 : ERS_DECLARE_ISSUE(dfmodules,                ///< Namespace
     148              :                   AbandonedTriggerDecision, ///< Issue class name
     149              :                   "trigger ID " << trigger_id << " could not be sent to writing and it's lost",
     150              :                   ((dfmodules::TriggerId)trigger_id) ///< Message parameters
     151              : )
     152              : 
     153              : /**
     154              :  * @brief Missing connection ID
     155              :  */
     156            0 : ERS_DECLARE_ISSUE(dfmodules,           ///< Namespace
     157              :                   MissingConnectionID, ///< Issue class name
     158              :                   "No connection ID was found for connection name \"" << conn_name
     159              :                   << "\" in the conn_ref list that was provided at 'init' time.",
     160              :                   ((std::string)conn_name)                   ///< Message parameters
     161              : )
     162              : 
     163              : namespace dfmodules {
     164              : 
     165              : /**
     166              :  * @brief TRBModule is the Module that collects Trigger
     167              :  TriggersDecisions, sends the corresponding data requests and collects Fragment
     168              :  to form a complete Trigger Record. The TR then sent out possibly to a write
     169              :  module
     170              : */
     171              : class TRBModule : public dunedaq::appfwk::DAQModule
     172              : {
     173              : public:
     174              :   /**
     175              :    * @brief TRBModule Constructor
     176              :    * @param name Instance name for this TRBModule instance
     177              :    */
     178              :   explicit TRBModule(const std::string& name);
     179              : 
     180              :   TRBModule(const TRBModule&) = delete; ///< TRBModule is not copy-constructible
     181              :   TRBModule& operator=(const TRBModule&) =
     182              :     delete;                                                         ///< TRBModule is not copy-assignable
     183              :   TRBModule(TRBModule&&) = delete;            ///< TRBModule is not move-constructible
     184              :   TRBModule& operator=(TRBModule&&) = delete; ///< TRBModule is not move-assignable
     185              : 
     186              :   void init(std::shared_ptr<appfwk::ConfigurationManager> mcfg) override;
     187              : 
     188              :   void generate_opmon_data() override;
     189              : 
     190              : protected:
     191              :   using trigger_decision_receiver_t = iomanager::ReceiverConcept<dfmessages::TriggerDecision>;
     192              :   using data_req_sender_t = iomanager::SenderConcept<dfmessages::DataRequest>;
     193              :   using fragment_receiver_t = iomanager::ReceiverConcept<std::unique_ptr<daqdataformats::Fragment>>;
     194              : 
     195              :   using trigger_record_ptr_t = std::unique_ptr<daqdataformats::TriggerRecord>;
     196              :   using trigger_record_sender_t = iomanager::SenderConcept<trigger_record_ptr_t>;
     197              : 
     198              :   void trigger_decision_callback(dfmessages::TriggerDecision& td);
     199              :   void fragments_callback(std::unique_ptr<daqdataformats::Fragment>& frag);
     200              : 
     201              :   trigger_record_ptr_t extract_trigger_record(const TriggerId&);
     202              :   // build_trigger_record will allocate memory and then orphan it to the caller
     203              :   // via the returned pointer Plese note that the method will destroy the memory
     204              :   // saved in the bookkeeping map
     205              : 
     206              :   unsigned int create_trigger_records_and_dispatch(const dfmessages::TriggerDecision&);
     207              : 
     208              :   bool dispatch_data_requests(dfmessages::DataRequest,
     209              :                               const daqdataformats::SourceID&);
     210              : 
     211              :   bool send_trigger_record(const TriggerId&);
     212              :   // this creates a trigger record and send it
     213              : 
     214              :   bool check_stale_requests();
     215              :   // it returns true when there are changes in the book = a TR timed out
     216              : 
     217              :   void flush_trigger_records();
     218              : 
     219              : private:
     220              :   // Commands
     221              :   void do_conf(const CommandData_t&);
     222              :   void do_scrap(const CommandData_t&);
     223              :   void do_start(const CommandData_t&);
     224              :   void do_stop(const CommandData_t&);
     225              : 
     226              :   // Monitoring callback
     227              :   void tr_requested(const dfmessages::TRMonRequest &);
     228              : 
     229              :   // Threading
     230              :   std::atomic<bool> m_stop_requested;
     231              : 
     232              :   // Configuration
     233              :   const appmodel::TRBConf* m_trb_conf;
     234              :   std::chrono::milliseconds m_tr_queue_timeout;
     235              :   std::chrono::milliseconds m_dreq_queue_timeout;
     236              :   std::string m_reply_connection;
     237              :   size_t m_max_open_trigger_records;
     238              :   daqdataformats::SourceID m_this_trb_source_id;
     239              : 
     240              :   // Input Connections
     241              :   std::shared_ptr<trigger_decision_receiver_t> m_trigger_decision_input;
     242              :   std::shared_ptr<fragment_receiver_t> m_fragment_input;
     243              : 
     244              :   // Output connections
     245              :   std::shared_ptr<trigger_record_sender_t> m_trigger_record_output;
     246              :   mutable std::mutex m_map_sourceid_connections_mutex;
     247              :   std::map<daqdataformats::SourceID, std::shared_ptr<data_req_sender_t>> m_map_sourceid_connections; ///< Mappinng between SourceID and connections
     248              : 
     249              :   // bookeeping
     250              :   using clock_type = std::chrono::steady_clock;
     251              :   std::mutex m_trigger_records_mutex;
     252              :   clock_type::time_point m_last_bookkeeping{};
     253              :   std::map<TriggerId, std::pair<clock_type::time_point, trigger_record_ptr_t>> m_trigger_records;
     254              :   std::condition_variable m_open_trigger_record_cv;
     255              : 
     256              :   // Data request properties
     257              :   daqdataformats::timestamp_diff_t m_max_sequence_length;
     258              : 
     259              :   // Run information
     260              :   std::unique_ptr<const daqdataformats::run_number_t> m_run_number = nullptr;
     261              : 
     262              :   // Monitoring related variables
     263              :   std::mutex m_mon_mutex;
     264              :   std::shared_ptr<iomanager::ReceiverConcept<dfmessages::TRMonRequest>> m_mon_receiver;
     265              :   std::list<dfmessages::TRMonRequest> m_mon_requests;
     266              : 
     267              :   // book related metrics
     268              :   using metric_counter_type = uint64_t; // decltype(triggerrecordbuilderinfo::Info::pending_trigger_decisions);
     269              :   mutable std::atomic<metric_counter_type> m_trigger_decisions_counter = { 0 }; // currently
     270              :   mutable std::atomic<metric_counter_type> m_fragment_counter = { 0 };          // currently
     271              :   mutable std::atomic<metric_counter_type> m_pending_fragment_counter = { 0 };  // currently
     272              : 
     273              :   mutable std::atomic<metric_counter_type> m_timed_out_trigger_records = { 0 };    // in the run
     274              :   mutable std::atomic<metric_counter_type> m_unexpected_fragments = { 0 };         // in the run
     275              :   mutable std::atomic<metric_counter_type> m_unexpected_trigger_decisions = { 0 }; // in the run
     276              :   mutable std::atomic<metric_counter_type> m_lost_fragments = { 0 };               // in the run
     277              :   mutable std::atomic<metric_counter_type> m_invalid_requests = { 0 };             // in the run
     278              :   mutable std::atomic<metric_counter_type> m_duplicated_trigger_ids = { 0 };       // in the run
     279              :   mutable std::atomic<metric_counter_type> m_abandoned_trigger_records = { 0 };    // in the run
     280              : 
     281              :   mutable std::atomic<metric_counter_type> m_received_trigger_decisions = { 0 }; // in between calls
     282              :   mutable std::atomic<metric_counter_type> m_received_fragments = { 0 };         // in between calls
     283              :   mutable std::atomic<metric_counter_type> m_generated_trigger_records = { 0 };  // in between calls
     284              :   mutable std::atomic<metric_counter_type> m_generated_data_requests = { 0 };    // in between calls
     285              :   mutable std::atomic<metric_counter_type> m_data_waiting_time = { 0 };          // in between calls
     286              :   mutable std::atomic<metric_counter_type> m_trigger_decision_width = { 0 };     // in between calls
     287              :   mutable std::atomic<metric_counter_type> m_data_request_width = { 0 };         // in between calls
     288              :   mutable std::atomic<metric_counter_type> m_td_processing_us = { 0 };           // in between calls
     289              :   mutable std::atomic<metric_counter_type> m_fragment_processing_us = { 0 };     // in between calls
     290              : 
     291              :   
     292              :   mutable std::atomic<metric_counter_type> m_trmon_request_counter = { 0 };
     293              :   mutable std::atomic<metric_counter_type> m_trmon_sent_counter = { 0 };
     294              : 
     295              :   // time thresholds
     296              :   using duration_type = std::chrono::microseconds;
     297              :   duration_type m_old_trigger_threshold;
     298              :   duration_type m_trigger_timeout;
     299              : };
     300              : } // namespace dfmodules
     301              : } // namespace dunedaq
     302              : 
     303              : #endif // DFMODULES_PLUGINS_TRIGGERRECORDBUILDER_HPP_
        

Generated by: LCOV version 2.0-1