LCOV - code coverage report
Current view: top level - dfmodules/plugins - FragmentAggregatorModule.hpp (source / functions) Coverage Total Hit
Test: code.result Lines: 0.0 % 2 0
Test Date: 2025-12-21 13:07:08 Functions: 0.0 % 16 0

            Line data    Source code
       1              : /**
       2              :  * @file FragmentAggregatorModule.hpp Module to dispatch data requests within an application, aggregate and send fragments
       3              :  * using the IOMManager
       4              :  *
       5              :  * This is part of the DUNE DAQ , copyright 2020.
       6              :  * Licensing/copyright details are in the COPYING file that you should have
       7              :  * received with this code.
       8              :  */
       9              : 
      10              : #ifndef DFMODULES_PLUGINS_FRAGMENTAGGREGATOR_HPP_
      11              : #define DFMODULES_PLUGINS_FRAGMENTAGGREGATOR_HPP_
      12              : 
      13              : #include "daqdataformats/Fragment.hpp"
      14              : #include "daqdataformats/SourceID.hpp"
      15              : #include "dfmessages/DataRequest.hpp"
      16              : 
      17              : #include "appfwk/DAQModule.hpp"
      18              : #include "logging/Logging.hpp" // NOTE: if ISSUES ARE DECLARED BEFORE include logging/Logging.hpp, TLOG_DEBUG<<issue wont work.
      19              : 
      20              : #include "iomanager/Receiver.hpp"
      21              : #include "iomanager/Sender.hpp"
      22              : 
      23              : #include <atomic>
      24              : #include <map>
      25              : #include <mutex>
      26              : #include <string>
      27              : #include <tuple>
      28              : 
      29              : namespace dunedaq {
      30              : /**
      31              :  * @brief Unknown TRB
      32              :  */
      33            0 : ERS_DECLARE_ISSUE(dfmodules,                  ///< Namespace
      34              :                   UnknownFragmentDestination, ///< Issue class name
      35              :                   "Could not find a valid destination for sending Fragment with trigger number: "
      36              :                     << trg_num << " sequence number: " << seq_num << " from DLH " << src, ///< Message
      37              :                   ((uint64_t)trg_num)                                                     ///< Message parameters
      38              :                   ((uint16_t)seq_num)                                                     ///< Message parameters
      39              :                   ((daqdataformats::SourceID)src)                                         ///< Message parameters
      40              : )
      41              : 
      42              : 
      43            0 : ERS_DECLARE_ISSUE(dfmodules,                ///< Namespace
      44              :                   AbandonedFragment,        ///< Issue class name
      45              :                   "Fragment from " <<  source << " for trigger " << trigger << '-' << sequence << " of run " << run << " was dropped",
      46              :                   ((daqdataformats::run_number_t)run)
      47              :                   ((daqdataformats::trigger_number_t)trigger)
      48              :                   ((daqdataformats::sequence_number_t)sequence)              
      49              :                   ((daqdataformats::SourceID)source)
      50              : )
      51              : 
      52              : namespace dfmodules {
      53              : 
      54              : class FragmentAggregatorModule : public dunedaq::appfwk::DAQModule
      55              : {
      56              : public:
      57              :   explicit FragmentAggregatorModule(const std::string& name);
      58              : 
      59              :   FragmentAggregatorModule(const FragmentAggregatorModule&) = delete;
      60              :   FragmentAggregatorModule& operator=(const FragmentAggregatorModule&) = delete;
      61              :   FragmentAggregatorModule(FragmentAggregatorModule&&) = delete;
      62              :   FragmentAggregatorModule& operator=(FragmentAggregatorModule&&) = delete;
      63              : 
      64              :   void init(std::shared_ptr<appfwk::ConfigurationManager> mcfg) override;
      65              :   void generate_opmon_data() override;
      66              : 
      67              : private:
      68              :   // Commands
      69              :   void do_start(const CommandData_t& obj);
      70              :   void do_stop(const CommandData_t& obj);
      71              : 
      72              :   void process_data_request(dfmessages::DataRequest&);
      73              :   void process_fragment(std::unique_ptr<daqdataformats::Fragment>&);
      74              : 
      75              :   // Input and Output Connection names
      76              :   std::string m_data_req_input;
      77              :   std::string m_fragment_input;
      78              :   std::map<int, std::string> m_producer_conn_ids;
      79              :   std::vector<std::string> m_trb_conn_ids;
      80              : 
      81              :   std::chrono::milliseconds m_fragment_send_timeout;
      82              : 
      83              :   // Opmon
      84              :   uint64_t get_current_time_us();
      85              :   uint64_t m_timestamp_before_dr;
      86              :   uint64_t m_timestamp_before_frag;
      87              :   using metric_counter_type = uint64_t;
      88              :   std::atomic<metric_counter_type> m_data_requests_received{ 0 };
      89              :   std::atomic<metric_counter_type> m_data_requests_processed{ 0 };
      90              :   std::atomic<metric_counter_type> m_data_requests_failed{ 0 };
      91              :   std::atomic<metric_counter_type> m_fragments_received{ 0 };
      92              :   std::atomic<metric_counter_type> m_fragments_processed{ 0 };
      93              :   std::atomic<metric_counter_type> m_fragments_failed{ 0 };
      94              :   std::atomic<metric_counter_type> m_fragments_empty{ 0 };
      95              :   std::atomic<metric_counter_type> m_fragments_incomplete{ 0 };
      96              :   std::atomic<metric_counter_type> m_fragments_invalid{ 0 };
      97              :   std::atomic<metric_counter_type> m_fragments_time_average_us{ 0 };
      98              :   std::atomic<metric_counter_type> m_fragments_time_min_us{ 0 };
      99              :   std::atomic<metric_counter_type> m_fragments_time_max_us{ 0 };
     100              :   std::atomic<metric_counter_type> m_data_requests_time_average_us{ 0 };
     101              :   std::atomic<metric_counter_type> m_data_requests_time_min_us{ 0 };
     102              :   std::atomic<metric_counter_type> m_data_requests_time_max_us{ 0 };
     103              : 
     104              :   // TRB tracking
     105              :   std::map<std::tuple<dfmessages::trigger_number_t, dfmessages::sequence_number_t, daqdataformats::SourceID>,
     106              :            std::string>
     107              :     m_data_req_map;
     108              :   std::mutex m_mutex;
     109              : };
     110              : } // namespace dfmodules
     111              : } // namespace dunedaq
     112              : 
     113              : #endif // DFMODULES_PLUGINS_FRAGMENTAGGREGATOR_HPP_
        

Generated by: LCOV version 2.0-1