Line data Source code
1 : /**
2 : * @file TRBModule.hpp
3 : *
4 : * This is part of the DUNE DAQ Software Suite, copyright 2020.
5 : * Licensing/copyright details are in the COPYING file that you should have
6 : * received with this code.
7 : */
8 :
9 : #ifndef DFMODULES_PLUGINS_TRIGGERRECORDBUILDER_HPP_
10 : #define DFMODULES_PLUGINS_TRIGGERRECORDBUILDER_HPP_
11 :
12 : #include "appmodel/TRBConf.hpp"
13 : #include "daqdataformats/Fragment.hpp"
14 : #include "daqdataformats/SourceID.hpp"
15 : #include "daqdataformats/TriggerRecord.hpp"
16 : #include "daqdataformats/Types.hpp"
17 : #include "appmodel/ReadoutApplication.hpp"
18 : #include "appmodel/SmartDaqApplication.hpp"
19 : #include "dfmessages/DataRequest.hpp"
20 : #include "dfmessages/TRMonRequest.hpp"
21 : #include "dfmessages/TriggerDecision.hpp"
22 : #include "dfmessages/Types.hpp"
23 :
24 : #include "appfwk/DAQModule.hpp"
25 : #include "utilities/WorkerThread.hpp"
26 : #include "iomanager/Sender.hpp"
27 : #include "iomanager/Receiver.hpp"
28 : #include "logging/Logging.hpp" // NOTE: if ISSUES ARE DECLARED BEFORE include logging/Logging.hpp, TLOG_DEBUG<<issue wont work.
29 :
30 : #include "dfmodules/opmon/TRBModule.pb.h"
31 :
32 : #include <chrono>
33 : #include <list>
34 : #include <map>
35 : #include <memory>
36 : #include <mutex>
37 : #include <string>
38 : #include <tuple>
39 : #include <utility>
40 : #include <vector>
41 :
42 : namespace dunedaq {
43 :
44 : namespace dfmodules {
45 :
46 : /**
47 : * @brief TriggerId is a little class that defines a unique identifier for a
48 : * trigger decision/record It also provides an operator < to be used by map to
49 : * optimise bookkeeping
50 : */
51 : struct TriggerId
52 : {
53 :
54 : TriggerId() = default;
55 :
56 0 : explicit TriggerId(const dfmessages::TriggerDecision& td,
57 : daqdataformats::sequence_number_t s = daqdataformats::TypeDefaults::s_invalid_sequence_number)
58 0 : : trigger_number(td.trigger_number)
59 0 : , sequence_number(s)
60 0 : , run_number(td.run_number)
61 : {
62 0 : ;
63 0 : }
64 0 : explicit TriggerId(daqdataformats::Fragment& f)
65 0 : : trigger_number(f.get_trigger_number())
66 0 : , sequence_number(f.get_sequence_number())
67 0 : , run_number(f.get_run_number())
68 : {
69 0 : ;
70 0 : }
71 :
72 : daqdataformats::trigger_number_t trigger_number;
73 : daqdataformats::sequence_number_t sequence_number;
74 : daqdataformats::run_number_t run_number;
75 :
76 0 : bool operator<(const TriggerId& other) const noexcept
77 : {
78 0 : return std::tuple(trigger_number, sequence_number, run_number) <
79 0 : std::tuple(other.trigger_number, other.sequence_number, other.run_number);
80 : }
81 :
82 0 : friend std::ostream& operator<<(std::ostream& out, const TriggerId& id) noexcept
83 : {
84 0 : out << id.trigger_number << '-' << id.sequence_number << '/' << id.run_number;
85 0 : return out;
86 : }
87 :
88 0 : friend TraceStreamer& operator<<(TraceStreamer& out, const TriggerId& id) noexcept
89 : {
90 0 : return out << id.trigger_number << '.' << id.sequence_number << "/" << id.run_number;
91 : }
92 :
93 : friend std::istream& operator>>(std::istream& in, TriggerId& id)
94 : {
95 : char t1, t2;
96 : in >> id.trigger_number >> t1 >> id.sequence_number >> t2 >> id.run_number;
97 : return in;
98 : }
99 : };
100 :
101 : } // namespace dfmodules
102 :
103 : /**
104 : * @brief Unexpected trigger decision
105 : */
106 0 : ERS_DECLARE_ISSUE(dfmodules, ///< Namespace
107 : UnexpectedTriggerDecision, ///< Issue class name
108 : "Unexpected Trigger Decisions: " << trigger << '/' << decision_run << " while in run " << current_run,
109 : ((daqdataformats::trigger_number_t)trigger) ///< Message parameters
110 : ((daqdataformats::run_number_t)decision_run) ///< Message parameters
111 : ((daqdataformats::run_number_t)current_run) ///< Message parameters
112 : )
113 :
114 : /**
115 : * @brief Timed out Trigger Decision
116 : */
117 0 : ERS_DECLARE_ISSUE(dfmodules, ///< Namespace
118 : TimedOutTriggerDecision, ///< Issue class name
119 : "trigger id: " << trigger_id << " generate at: " << trigger_timestamp << " timed out", ///< Message
120 : ((dfmodules::TriggerId)trigger_id) ///< Message parameters
121 : ((daqdataformats::timestamp_t)trigger_timestamp) ///< Message parameters
122 : )
123 :
124 : /**
125 : * @brief Unexpected fragment
126 : */
127 0 : ERS_DECLARE_ISSUE(dfmodules, ///< Namespace
128 : UnexpectedFragment, ///< Issue class name
129 : "Unexpected Fragment for triggerID " << trigger_id << ", type " << fragment_type << ", " << source_id,
130 : ((dfmodules::TriggerId)trigger_id) ///< Message parameters
131 : ((daqdataformats::fragment_type_t)fragment_type) ///< Message parameters
132 : ((daqdataformats::SourceID)source_id) ///< Message parameters
133 : )
134 :
135 : /**
136 : * @brief Duplicate trigger decision
137 : */
138 0 : ERS_DECLARE_ISSUE(dfmodules, ///< Namespace
139 : DuplicatedTriggerDecision, ///< Issue class name
140 : "Duplicated trigger ID " << trigger_id,
141 : ((dfmodules::TriggerId)trigger_id) ///< Message parameters
142 : )
143 :
144 : /**
145 : * @brief Abandoned TR
146 : */
147 0 : ERS_DECLARE_ISSUE(dfmodules, ///< Namespace
148 : AbandonedTriggerDecision, ///< Issue class name
149 : "trigger ID " << trigger_id << " could not be sent to writing and it's lost",
150 : ((dfmodules::TriggerId)trigger_id) ///< Message parameters
151 : )
152 :
153 : /**
154 : * @brief Incomplete TR
155 : */
156 0 : ERS_DECLARE_ISSUE(dfmodules, ///< Namespace
157 : IncompleteTriggerRecord , ///< Issue class name
158 : "sending incomplete TriggerRecord downstream " << optional_stop_time_phrase << " (trigger/run_number=" << id << ", " << num_frags_present << " of " << num_components_requested << " fragments included)",
159 : ((std::string)optional_stop_time_phrase)((dfmodules::TriggerId)id)((int)num_frags_present)((int)num_components_requested) ///< Message parameters
160 : )
161 :
162 : /**
163 : * @brief Missing connection ID
164 : */
165 0 : ERS_DECLARE_ISSUE(dfmodules, ///< Namespace
166 : MissingConnectionID, ///< Issue class name
167 : "No connection ID was found for connection name \"" << conn_name
168 : << "\" in the conn_ref list that was provided at 'init' time.",
169 : ((std::string)conn_name) ///< Message parameters
170 : )
171 :
172 : namespace dfmodules {
173 :
174 : /**
175 : * @brief TRBModule is the Module that collects Trigger
176 : TriggersDecisions, sends the corresponding data requests and collects Fragment
177 : to form a complete Trigger Record. The TR then sent out possibly to a write
178 : module
179 : */
180 : class TRBModule : public dunedaq::appfwk::DAQModule
181 : {
182 : public:
183 : /**
184 : * @brief TRBModule Constructor
185 : * @param name Instance name for this TRBModule instance
186 : */
187 : explicit TRBModule(const std::string& name);
188 :
189 : TRBModule(const TRBModule&) = delete; ///< TRBModule is not copy-constructible
190 : TRBModule& operator=(const TRBModule&) =
191 : delete; ///< TRBModule is not copy-assignable
192 : TRBModule(TRBModule&&) = delete; ///< TRBModule is not move-constructible
193 : TRBModule& operator=(TRBModule&&) = delete; ///< TRBModule is not move-assignable
194 :
195 : void init(std::shared_ptr<appfwk::ConfigurationManager> mcfg) override;
196 :
197 : void generate_opmon_data() override;
198 :
199 : protected:
200 : using trigger_decision_receiver_t = iomanager::ReceiverConcept<dfmessages::TriggerDecision>;
201 : using data_req_sender_t = iomanager::SenderConcept<dfmessages::DataRequest>;
202 : using fragment_receiver_t = iomanager::ReceiverConcept<std::unique_ptr<daqdataformats::Fragment>>;
203 :
204 : using trigger_record_ptr_t = std::unique_ptr<daqdataformats::TriggerRecord>;
205 : using trigger_record_sender_t = iomanager::SenderConcept<trigger_record_ptr_t>;
206 :
207 : void trigger_decision_callback(dfmessages::TriggerDecision& td);
208 : void fragments_callback(std::unique_ptr<daqdataformats::Fragment>& frag);
209 :
210 : trigger_record_ptr_t extract_trigger_record(const TriggerId&);
211 : // build_trigger_record will allocate memory and then orphan it to the caller
212 : // via the returned pointer Plese note that the method will destroy the memory
213 : // saved in the bookkeeping map
214 :
215 : unsigned int create_trigger_records_and_dispatch(const dfmessages::TriggerDecision&);
216 :
217 : bool dispatch_data_requests(dfmessages::DataRequest,
218 : const daqdataformats::SourceID&);
219 :
220 : bool send_trigger_record(const TriggerId&);
221 : // this creates a trigger record and send it
222 :
223 : bool check_stale_requests();
224 : // it returns true when there are changes in the book = a TR timed out
225 :
226 : void flush_trigger_records();
227 :
228 : private:
229 : // Commands
230 : void do_conf(const CommandData_t&);
231 : void do_scrap(const CommandData_t&);
232 : void do_start(const CommandData_t&);
233 : void do_stop(const CommandData_t&);
234 :
235 : // Monitoring callback
236 : void tr_requested(const dfmessages::TRMonRequest &);
237 :
238 : // Threading
239 : std::atomic<bool> m_stop_requested;
240 :
241 : // Configuration
242 : const appmodel::TRBConf* m_trb_conf;
243 : std::chrono::milliseconds m_tr_queue_timeout;
244 : std::chrono::milliseconds m_dreq_queue_timeout;
245 : std::string m_reply_connection;
246 : size_t m_max_open_trigger_records;
247 : daqdataformats::SourceID m_this_trb_source_id;
248 :
249 : // Input Connections
250 : std::shared_ptr<trigger_decision_receiver_t> m_trigger_decision_input;
251 : std::shared_ptr<fragment_receiver_t> m_fragment_input;
252 :
253 : // Output connections
254 : std::shared_ptr<trigger_record_sender_t> m_trigger_record_output;
255 : mutable std::mutex m_map_sourceid_connections_mutex;
256 : std::map<daqdataformats::SourceID, std::shared_ptr<data_req_sender_t>> m_map_sourceid_connections; ///< Mappinng between SourceID and connections
257 :
258 : // bookeeping
259 : using clock_type = std::chrono::steady_clock;
260 : std::mutex m_trigger_records_mutex;
261 : clock_type::time_point m_last_bookkeeping{};
262 : std::map<TriggerId, std::pair<clock_type::time_point, trigger_record_ptr_t>> m_trigger_records;
263 : std::condition_variable m_open_trigger_record_cv;
264 :
265 : // Data request properties
266 : daqdataformats::timestamp_diff_t m_max_sequence_length;
267 :
268 : // Run information
269 : std::unique_ptr<const daqdataformats::run_number_t> m_run_number = nullptr;
270 :
271 : // Monitoring related variables
272 : std::mutex m_mon_mutex;
273 : std::shared_ptr<iomanager::ReceiverConcept<dfmessages::TRMonRequest>> m_mon_receiver;
274 : std::list<dfmessages::TRMonRequest> m_mon_requests;
275 :
276 : // book related metrics
277 : using metric_counter_type = uint64_t; // decltype(triggerrecordbuilderinfo::Info::pending_trigger_decisions);
278 : mutable std::atomic<metric_counter_type> m_trigger_decisions_counter = { 0 }; // currently
279 : mutable std::atomic<metric_counter_type> m_fragment_counter = { 0 }; // currently
280 : mutable std::atomic<metric_counter_type> m_pending_fragment_counter = { 0 }; // currently
281 :
282 : mutable std::atomic<metric_counter_type> m_timed_out_trigger_records = { 0 }; // in the run
283 : mutable std::atomic<metric_counter_type> m_unexpected_fragments = { 0 }; // in the run
284 : mutable std::atomic<metric_counter_type> m_unexpected_trigger_decisions = { 0 }; // in the run
285 : mutable std::atomic<metric_counter_type> m_lost_fragments = { 0 }; // in the run
286 : mutable std::atomic<metric_counter_type> m_invalid_requests = { 0 }; // in the run
287 : mutable std::atomic<metric_counter_type> m_duplicated_trigger_ids = { 0 }; // in the run
288 : mutable std::atomic<metric_counter_type> m_abandoned_trigger_records = { 0 }; // in the run
289 :
290 : mutable std::atomic<metric_counter_type> m_received_trigger_decisions = { 0 }; // in between calls
291 : mutable std::atomic<metric_counter_type> m_received_fragments = { 0 }; // in between calls
292 : mutable std::atomic<metric_counter_type> m_generated_trigger_records = { 0 }; // in between calls
293 : mutable std::atomic<metric_counter_type> m_generated_data_requests = { 0 }; // in between calls
294 : mutable std::atomic<metric_counter_type> m_data_waiting_time = { 0 }; // in between calls
295 : mutable std::atomic<metric_counter_type> m_trigger_decision_width = { 0 }; // in between calls
296 : mutable std::atomic<metric_counter_type> m_data_request_width = { 0 }; // in between calls
297 : mutable std::atomic<metric_counter_type> m_td_processing_us = { 0 }; // in between calls
298 : mutable std::atomic<metric_counter_type> m_fragment_processing_us = { 0 }; // in between calls
299 :
300 :
301 : mutable std::atomic<metric_counter_type> m_trmon_request_counter = { 0 };
302 : mutable std::atomic<metric_counter_type> m_trmon_sent_counter = { 0 };
303 :
304 : // time thresholds
305 : using duration_type = std::chrono::microseconds;
306 : duration_type m_old_trigger_threshold;
307 : duration_type m_trigger_timeout;
308 : };
309 : } // namespace dfmodules
310 : } // namespace dunedaq
311 :
312 : #endif // DFMODULES_PLUGINS_TRIGGERRECORDBUILDER_HPP_
|