Line data Source code
1 : /**
2 : * @file FragmentAggregatorModule.hpp Module to dispatch data requests within an application, aggregate and send fragments
3 : * using the IOMManager
4 : *
5 : * This is part of the DUNE DAQ , copyright 2020.
6 : * Licensing/copyright details are in the COPYING file that you should have
7 : * received with this code.
8 : */
9 :
10 : #ifndef DFMODULES_PLUGINS_FRAGMENTAGGREGATOR_HPP_
11 : #define DFMODULES_PLUGINS_FRAGMENTAGGREGATOR_HPP_
12 :
13 : #include "daqdataformats/Fragment.hpp"
14 : #include "daqdataformats/SourceID.hpp"
15 : #include "dfmessages/DataRequest.hpp"
16 :
17 : #include "appfwk/DAQModule.hpp"
18 : #include "logging/Logging.hpp" // NOTE: if ISSUES ARE DECLARED BEFORE include logging/Logging.hpp, TLOG_DEBUG<<issue wont work.
19 :
20 : #include "iomanager/Receiver.hpp"
21 : #include "iomanager/Sender.hpp"
22 :
23 : #include <atomic>
24 : #include <map>
25 : #include <mutex>
26 : #include <string>
27 : #include <tuple>
28 :
29 : namespace dunedaq {
30 : /**
31 : * @brief Unknown TRB
32 : */
33 0 : ERS_DECLARE_ISSUE(dfmodules, ///< Namespace
34 : UnknownFragmentDestination, ///< Issue class name
35 : "Could not find a valid destination for sending Fragment with trigger number: "
36 : << trg_num << " sequence number: " << seq_num << " from DLH " << src, ///< Message
37 : ((uint64_t)trg_num) ///< Message parameters
38 : ((uint16_t)seq_num) ///< Message parameters
39 : ((daqdataformats::SourceID)src) ///< Message parameters
40 : )
41 :
42 :
43 0 : ERS_DECLARE_ISSUE(dfmodules, ///< Namespace
44 : AbandonedFragment, ///< Issue class name
45 : "Fragment from " << source << " for trigger " << trigger << '-' << sequence << " of run " << run << " was dropped",
46 : ((daqdataformats::run_number_t)run)
47 : ((daqdataformats::trigger_number_t)trigger)
48 : ((daqdataformats::sequence_number_t)sequence)
49 : ((daqdataformats::SourceID)source)
50 : )
51 :
52 : namespace dfmodules {
53 :
54 : class FragmentAggregatorModule : public dunedaq::appfwk::DAQModule
55 : {
56 : public:
57 : explicit FragmentAggregatorModule(const std::string& name);
58 :
59 : FragmentAggregatorModule(const FragmentAggregatorModule&) = delete;
60 : FragmentAggregatorModule& operator=(const FragmentAggregatorModule&) = delete;
61 : FragmentAggregatorModule(FragmentAggregatorModule&&) = delete;
62 : FragmentAggregatorModule& operator=(FragmentAggregatorModule&&) = delete;
63 :
64 : void init(std::shared_ptr<appfwk::ConfigurationManager> mcfg) override;
65 : void generate_opmon_data() override;
66 :
67 : private:
68 : // Commands
69 : void do_start(const CommandData_t& obj);
70 : void do_stop(const CommandData_t& obj);
71 :
72 : void process_data_request(dfmessages::DataRequest&);
73 : void process_fragment(std::unique_ptr<daqdataformats::Fragment>&);
74 :
75 : // Input and Output Connection names
76 : std::string m_data_req_input;
77 : std::string m_fragment_input;
78 : std::map<int, std::string> m_producer_conn_ids;
79 : std::vector<std::string> m_trb_conn_ids;
80 :
81 : std::chrono::milliseconds m_fragment_send_timeout;
82 :
83 : // Opmon
84 : uint64_t get_current_time_us();
85 : uint64_t m_timestamp_before_dr;
86 : uint64_t m_timestamp_before_frag;
87 : using metric_counter_type = uint64_t;
88 : std::atomic<metric_counter_type> m_data_requests_received{ 0 };
89 : std::atomic<metric_counter_type> m_data_requests_processed{ 0 };
90 : std::atomic<metric_counter_type> m_data_requests_failed{ 0 };
91 : std::atomic<metric_counter_type> m_fragments_received{ 0 };
92 : std::atomic<metric_counter_type> m_fragments_processed{ 0 };
93 : std::atomic<metric_counter_type> m_fragments_failed{ 0 };
94 : std::atomic<metric_counter_type> m_fragments_empty{ 0 };
95 : std::atomic<metric_counter_type> m_fragments_incomplete{ 0 };
96 : std::atomic<metric_counter_type> m_fragments_invalid{ 0 };
97 : std::atomic<metric_counter_type> m_fragments_time_average_us{ 0 };
98 : std::atomic<metric_counter_type> m_fragments_time_min_us{ 0 };
99 : std::atomic<metric_counter_type> m_fragments_time_max_us{ 0 };
100 : std::atomic<metric_counter_type> m_data_requests_time_average_us{ 0 };
101 : std::atomic<metric_counter_type> m_data_requests_time_min_us{ 0 };
102 : std::atomic<metric_counter_type> m_data_requests_time_max_us{ 0 };
103 :
104 : // TRB tracking
105 : std::map<std::tuple<dfmessages::trigger_number_t, dfmessages::sequence_number_t, daqdataformats::SourceID>,
106 : std::string>
107 : m_data_req_map;
108 : std::mutex m_mutex;
109 : };
110 : } // namespace dfmodules
111 : } // namespace dunedaq
112 :
113 : #endif // DFMODULES_PLUGINS_FRAGMENTAGGREGATOR_HPP_
|