Line data Source code
1 : /**
2 : * @file TRBModule.hpp
3 : *
4 : * This is part of the DUNE DAQ Software Suite, copyright 2020.
5 : * Licensing/copyright details are in the COPYING file that you should have
6 : * received with this code.
7 : */
8 :
9 : #ifndef DFMODULES_PLUGINS_TRIGGERRECORDBUILDER_HPP_
10 : #define DFMODULES_PLUGINS_TRIGGERRECORDBUILDER_HPP_
11 :
12 : #include "appmodel/TRBConf.hpp"
13 : #include "daqdataformats/Fragment.hpp"
14 : #include "daqdataformats/SourceID.hpp"
15 : #include "daqdataformats/TriggerRecord.hpp"
16 : #include "daqdataformats/Types.hpp"
17 : #include "appmodel/ReadoutApplication.hpp"
18 : #include "appmodel/SmartDaqApplication.hpp"
19 : #include "dfmessages/DataRequest.hpp"
20 : #include "dfmessages/TRMonRequest.hpp"
21 : #include "dfmessages/TriggerDecision.hpp"
22 : #include "dfmessages/Types.hpp"
23 :
24 : #include "appfwk/DAQModule.hpp"
25 : #include "utilities/WorkerThread.hpp"
26 : #include "iomanager/Sender.hpp"
27 : #include "iomanager/Receiver.hpp"
28 : #include "logging/Logging.hpp" // NOTE: if ISSUES ARE DECLARED BEFORE include logging/Logging.hpp, TLOG_DEBUG<<issue wont work.
29 :
30 : #include "dfmodules/opmon/TRBModule.pb.h"
31 :
32 : #include <chrono>
33 : #include <list>
34 : #include <map>
35 : #include <memory>
36 : #include <mutex>
37 : #include <string>
38 : #include <tuple>
39 : #include <utility>
40 : #include <vector>
41 :
42 : namespace dunedaq {
43 :
44 : namespace dfmodules {
45 :
46 : /**
47 : * @brief TriggerId is a little class that defines a unique identifier for a
48 : * trigger decision/record It also provides an operator < to be used by map to
49 : * optimise bookkeeping
50 : */
51 : struct TriggerId
52 : {
53 :
54 : TriggerId() = default;
55 :
56 0 : explicit TriggerId(const dfmessages::TriggerDecision& td,
57 : daqdataformats::sequence_number_t s = daqdataformats::TypeDefaults::s_invalid_sequence_number)
58 0 : : trigger_number(td.trigger_number)
59 0 : , sequence_number(s)
60 0 : , run_number(td.run_number)
61 : {
62 0 : ;
63 0 : }
64 0 : explicit TriggerId(daqdataformats::Fragment& f)
65 0 : : trigger_number(f.get_trigger_number())
66 0 : , sequence_number(f.get_sequence_number())
67 0 : , run_number(f.get_run_number())
68 : {
69 0 : ;
70 0 : }
71 :
72 : daqdataformats::trigger_number_t trigger_number;
73 : daqdataformats::sequence_number_t sequence_number;
74 : daqdataformats::run_number_t run_number;
75 :
76 0 : bool operator<(const TriggerId& other) const noexcept
77 : {
78 0 : return std::tuple(trigger_number, sequence_number, run_number) <
79 0 : std::tuple(other.trigger_number, other.sequence_number, other.run_number);
80 : }
81 :
82 0 : friend std::ostream& operator<<(std::ostream& out, const TriggerId& id) noexcept
83 : {
84 0 : out << id.trigger_number << '-' << id.sequence_number << '/' << id.run_number;
85 0 : return out;
86 : }
87 :
88 0 : friend TraceStreamer& operator<<(TraceStreamer& out, const TriggerId& id) noexcept
89 : {
90 0 : return out << id.trigger_number << '.' << id.sequence_number << "/" << id.run_number;
91 : }
92 :
93 : friend std::istream& operator>>(std::istream& in, TriggerId& id)
94 : {
95 : char t1, t2;
96 : in >> id.trigger_number >> t1 >> id.sequence_number >> t2 >> id.run_number;
97 : return in;
98 : }
99 : };
100 :
101 : } // namespace dfmodules
102 :
103 : /**
104 : * @brief Unexpected trigger decision
105 : */
106 0 : ERS_DECLARE_ISSUE(dfmodules, ///< Namespace
107 : UnexpectedTriggerDecision, ///< Issue class name
108 : "Unexpected Trigger Decisions: " << trigger << '/' << decision_run << " while in run " << current_run,
109 : ((daqdataformats::trigger_number_t)trigger) ///< Message parameters
110 : ((daqdataformats::run_number_t)decision_run) ///< Message parameters
111 : ((daqdataformats::run_number_t)current_run) ///< Message parameters
112 : )
113 :
114 : /**
115 : * @brief Timed out Trigger Decision
116 : */
117 0 : ERS_DECLARE_ISSUE(dfmodules, ///< Namespace
118 : TimedOutTriggerDecision, ///< Issue class name
119 : "trigger id: " << trigger_id << " generate at: " << trigger_timestamp << " timed out", ///< Message
120 : ((dfmodules::TriggerId)trigger_id) ///< Message parameters
121 : ((daqdataformats::timestamp_t)trigger_timestamp) ///< Message parameters
122 : )
123 :
124 : /**
125 : * @brief Unexpected fragment
126 : */
127 0 : ERS_DECLARE_ISSUE(dfmodules, ///< Namespace
128 : UnexpectedFragment, ///< Issue class name
129 : "Unexpected Fragment for triggerID " << trigger_id << ", type " << fragment_type << ", " << source_id,
130 : ((dfmodules::TriggerId)trigger_id) ///< Message parameters
131 : ((daqdataformats::fragment_type_t)fragment_type) ///< Message parameters
132 : ((daqdataformats::SourceID)source_id) ///< Message parameters
133 : )
134 :
135 : /**
136 : * @brief Duplicate trigger decision
137 : */
138 0 : ERS_DECLARE_ISSUE(dfmodules, ///< Namespace
139 : DuplicatedTriggerDecision, ///< Issue class name
140 : "Duplicated trigger ID " << trigger_id,
141 : ((dfmodules::TriggerId)trigger_id) ///< Message parameters
142 : )
143 :
144 : /**
145 : * @brief Abandoned TR
146 : */
147 0 : ERS_DECLARE_ISSUE(dfmodules, ///< Namespace
148 : AbandonedTriggerDecision, ///< Issue class name
149 : "trigger ID " << trigger_id << " could not be sent to writing and it's lost",
150 : ((dfmodules::TriggerId)trigger_id) ///< Message parameters
151 : )
152 :
153 : /**
154 : * @brief Missing connection ID
155 : */
156 0 : ERS_DECLARE_ISSUE(dfmodules, ///< Namespace
157 : MissingConnectionID, ///< Issue class name
158 : "No connection ID was found for connection name \"" << conn_name
159 : << "\" in the conn_ref list that was provided at 'init' time.",
160 : ((std::string)conn_name) ///< Message parameters
161 : )
162 :
163 : namespace dfmodules {
164 :
165 : /**
166 : * @brief TRBModule is the Module that collects Trigger
167 : TriggersDecisions, sends the corresponding data requests and collects Fragment
168 : to form a complete Trigger Record. The TR then sent out possibly to a write
169 : module
170 : */
171 : class TRBModule : public dunedaq::appfwk::DAQModule
172 : {
173 : public:
174 : /**
175 : * @brief TRBModule Constructor
176 : * @param name Instance name for this TRBModule instance
177 : */
178 : explicit TRBModule(const std::string& name);
179 :
180 : TRBModule(const TRBModule&) = delete; ///< TRBModule is not copy-constructible
181 : TRBModule& operator=(const TRBModule&) =
182 : delete; ///< TRBModule is not copy-assignable
183 : TRBModule(TRBModule&&) = delete; ///< TRBModule is not move-constructible
184 : TRBModule& operator=(TRBModule&&) = delete; ///< TRBModule is not move-assignable
185 :
186 : void init(std::shared_ptr<appfwk::ConfigurationManager> mcfg) override;
187 :
188 : void generate_opmon_data() override;
189 :
190 : protected:
191 : using trigger_decision_receiver_t = iomanager::ReceiverConcept<dfmessages::TriggerDecision>;
192 : using data_req_sender_t = iomanager::SenderConcept<dfmessages::DataRequest>;
193 : using fragment_receiver_t = iomanager::ReceiverConcept<std::unique_ptr<daqdataformats::Fragment>>;
194 :
195 : using trigger_record_ptr_t = std::unique_ptr<daqdataformats::TriggerRecord>;
196 : using trigger_record_sender_t = iomanager::SenderConcept<trigger_record_ptr_t>;
197 :
198 : void trigger_decision_callback(dfmessages::TriggerDecision& td);
199 : void fragments_callback(std::unique_ptr<daqdataformats::Fragment>& frag);
200 :
201 : trigger_record_ptr_t extract_trigger_record(const TriggerId&);
202 : // build_trigger_record will allocate memory and then orphan it to the caller
203 : // via the returned pointer Plese note that the method will destroy the memory
204 : // saved in the bookkeeping map
205 :
206 : unsigned int create_trigger_records_and_dispatch(const dfmessages::TriggerDecision&);
207 :
208 : bool dispatch_data_requests(dfmessages::DataRequest,
209 : const daqdataformats::SourceID&);
210 :
211 : bool send_trigger_record(const TriggerId&);
212 : // this creates a trigger record and send it
213 :
214 : bool check_stale_requests();
215 : // it returns true when there are changes in the book = a TR timed out
216 :
217 : void flush_trigger_records();
218 :
219 : private:
220 : // Commands
221 : void do_conf(const CommandData_t&);
222 : void do_scrap(const CommandData_t&);
223 : void do_start(const CommandData_t&);
224 : void do_stop(const CommandData_t&);
225 :
226 : // Monitoring callback
227 : void tr_requested(const dfmessages::TRMonRequest &);
228 :
229 : // Threading
230 : std::atomic<bool> m_stop_requested;
231 :
232 : // Configuration
233 : const appmodel::TRBConf* m_trb_conf;
234 : std::chrono::milliseconds m_tr_queue_timeout;
235 : std::chrono::milliseconds m_dreq_queue_timeout;
236 : std::string m_reply_connection;
237 : size_t m_max_open_trigger_records;
238 : daqdataformats::SourceID m_this_trb_source_id;
239 :
240 : // Input Connections
241 : std::shared_ptr<trigger_decision_receiver_t> m_trigger_decision_input;
242 : std::shared_ptr<fragment_receiver_t> m_fragment_input;
243 :
244 : // Output connections
245 : std::shared_ptr<trigger_record_sender_t> m_trigger_record_output;
246 : mutable std::mutex m_map_sourceid_connections_mutex;
247 : std::map<daqdataformats::SourceID, std::shared_ptr<data_req_sender_t>> m_map_sourceid_connections; ///< Mappinng between SourceID and connections
248 :
249 : // bookeeping
250 : using clock_type = std::chrono::steady_clock;
251 : std::mutex m_trigger_records_mutex;
252 : clock_type::time_point m_last_bookkeeping{};
253 : std::map<TriggerId, std::pair<clock_type::time_point, trigger_record_ptr_t>> m_trigger_records;
254 : std::condition_variable m_open_trigger_record_cv;
255 :
256 : // Data request properties
257 : daqdataformats::timestamp_diff_t m_max_sequence_length;
258 :
259 : // Run information
260 : std::unique_ptr<const daqdataformats::run_number_t> m_run_number = nullptr;
261 :
262 : // Monitoring related variables
263 : std::mutex m_mon_mutex;
264 : std::shared_ptr<iomanager::ReceiverConcept<dfmessages::TRMonRequest>> m_mon_receiver;
265 : std::list<dfmessages::TRMonRequest> m_mon_requests;
266 :
267 : // book related metrics
268 : using metric_counter_type = uint64_t; // decltype(triggerrecordbuilderinfo::Info::pending_trigger_decisions);
269 : mutable std::atomic<metric_counter_type> m_trigger_decisions_counter = { 0 }; // currently
270 : mutable std::atomic<metric_counter_type> m_fragment_counter = { 0 }; // currently
271 : mutable std::atomic<metric_counter_type> m_pending_fragment_counter = { 0 }; // currently
272 :
273 : mutable std::atomic<metric_counter_type> m_timed_out_trigger_records = { 0 }; // in the run
274 : mutable std::atomic<metric_counter_type> m_unexpected_fragments = { 0 }; // in the run
275 : mutable std::atomic<metric_counter_type> m_unexpected_trigger_decisions = { 0 }; // in the run
276 : mutable std::atomic<metric_counter_type> m_lost_fragments = { 0 }; // in the run
277 : mutable std::atomic<metric_counter_type> m_invalid_requests = { 0 }; // in the run
278 : mutable std::atomic<metric_counter_type> m_duplicated_trigger_ids = { 0 }; // in the run
279 : mutable std::atomic<metric_counter_type> m_abandoned_trigger_records = { 0 }; // in the run
280 :
281 : mutable std::atomic<metric_counter_type> m_received_trigger_decisions = { 0 }; // in between calls
282 : mutable std::atomic<metric_counter_type> m_received_fragments = { 0 }; // in between calls
283 : mutable std::atomic<metric_counter_type> m_generated_trigger_records = { 0 }; // in between calls
284 : mutable std::atomic<metric_counter_type> m_generated_data_requests = { 0 }; // in between calls
285 : mutable std::atomic<metric_counter_type> m_data_waiting_time = { 0 }; // in between calls
286 : mutable std::atomic<metric_counter_type> m_trigger_decision_width = { 0 }; // in between calls
287 : mutable std::atomic<metric_counter_type> m_data_request_width = { 0 }; // in between calls
288 : mutable std::atomic<metric_counter_type> m_td_processing_us = { 0 }; // in between calls
289 : mutable std::atomic<metric_counter_type> m_fragment_processing_us = { 0 }; // in between calls
290 :
291 :
292 : mutable std::atomic<metric_counter_type> m_trmon_request_counter = { 0 };
293 : mutable std::atomic<metric_counter_type> m_trmon_sent_counter = { 0 };
294 :
295 : // time thresholds
296 : using duration_type = std::chrono::microseconds;
297 : duration_type m_old_trigger_threshold;
298 : duration_type m_trigger_timeout;
299 : };
300 : } // namespace dfmodules
301 : } // namespace dunedaq
302 :
303 : #endif // DFMODULES_PLUGINS_TRIGGERRECORDBUILDER_HPP_
|