DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
DataHandlingModel.hpp
Go to the documentation of this file.
1
9#ifndef DATAHANDLINGLIBS_INCLUDE_DATAHANDLINGLIBS_MODELS_READOUTMODEL_HPP_
10#define DATAHANDLINGLIBS_INCLUDE_DATAHANDLINGLIBS_MODELS_READOUTMODEL_HPP_
11
19
21
23#include "iomanager/Sender.hpp"
25
26#include "logging/Logging.hpp"
27
30
33
37
40
44
47
48#include <folly/coro/Baton.h>
49#include <folly/coro/Task.h>
50#include <folly/futures/Future.h>
51
52#include <algorithm>
53#include <functional>
54#include <memory>
55#include <string>
56#include <utility>
57#include <vector>
58
63
64namespace dunedaq {
65namespace datahandlinglibs {
66
67template<class ReadoutType, class RequestHandlerType, class LatencyBufferType, class RawDataProcessorType, class InputDataType = ReadoutType>
69{
70public:
71 // Using shorter typenames
72 using RDT = ReadoutType;
73 using RHT = RequestHandlerType;
74 using LBT = LatencyBufferType;
75 using RPT = RawDataProcessorType;
76 using IDT = InputDataType;
77
78 // Using timestamp typenames
79 using timestamp_t = std::uint64_t; // NOLINT(build/unsigned)
80 static inline constexpr timestamp_t ns = 1;
81 static inline constexpr timestamp_t us = 1000 * ns;
82 static inline constexpr timestamp_t ms = 1000 * us;
83 static inline constexpr timestamp_t s = 1000 * ms;
84
85 // Explicit constructor with run marker pass-through
86 explicit DataHandlingModel(std::atomic<bool>& run_marker)
88 , m_callback_mode(false)
89 , m_fake_trigger(false)
94 , m_raw_data_receiver(nullptr)
96 , m_latency_buffer_impl(nullptr)
97 , m_raw_processor_impl(nullptr)
98 {
99 }
100
101 virtual ~DataHandlingModel() = default;
102
103 // Initializes the readoutmodel and its internals
104 void init(const appmodel::DataHandlerModule* modconf);
105
106 // Configures the readoutmodel and its internals
107 void conf(const appfwk::DAQModule::CommandData_t& args);
108
109 // Unconfigures readoutmodel's internals
110 void scrap(const appfwk::DAQModule::CommandData_t& args)
111 {
112 m_request_handler_impl->scrap(args);
113 m_latency_buffer_impl->scrap(args);
114 m_raw_processor_impl->scrap(args);
115 }
116
117 // Starts readoutmodel's internals
118 void start(const appfwk::DAQModule::CommandData_t& args);
119
120 // Stops readoutmodel's internals
121 void stop(const appfwk::DAQModule::CommandData_t& args);
122
123 // Record function: invokes request handler's record implementation
124 void record(const appfwk::DAQModule::CommandData_t& args) override
125 {
126 m_request_handler_impl->record(args);
127 }
128
129 // Opmon get_info call implementation
130 //void get_info(opmonlib::InfoCollector& ci, int level);
131
132 // Consume callback
133 std::function<void(IDT&&)> m_consume_callback;
134
135protected:
137 {
138 public:
139 PostprocessScheduleAlgorithm(LatencyBufferType& latency_buffer_impl,
140 RawDataProcessorType& raw_processor_impl,
141 uint64_t processing_delay_ticks, // NOLINT(build/unsigned)
142 uint64_t post_processing_delay_min_wait, // NOLINT(build/unsigned)
143 uint64_t post_processing_delay_max_wait) // NOLINT(build/unsigned)
144 : m_latency_buffer_impl{ latency_buffer_impl }
145 , m_raw_processor_impl{ raw_processor_impl }
146 , m_processing_delay_ticks{ processing_delay_ticks }
147 , m_post_processing_delay_min_wait{ post_processing_delay_min_wait }
148 , m_post_processing_delay_max_wait{ post_processing_delay_max_wait }
149 , m_first_cycle{ true }
151 , m_last_post_proc_time{ std::chrono::system_clock::now() }
153 , m_max_wait_in_ticks{ post_processing_delay_max_wait * 62500 } // FIXME: hardcoded clock frequency
154 {
155 }
156
157 // High-level interface
158 // Schedule deferred post-processing and notify timeout expiration to the processor
159 int run(bool timeout) {
160 int processed = this->do_run(timeout);
161
162 if (timeout) {
164 m_raw_processor_impl.invoke_postprocess_schedule_timeout_policy(timeout_accumulated);
165 }
166
167 return processed;
168 }
169
170
171 // Deferral of the post processing, to allow elements being reordered in the LB
172 // Basically, find data older than a certain timestamp and process all data since the last post-processed element up to that value
173 int do_run(bool timeout)
174 {
175 if (m_latency_buffer_impl.occupancy() == 0) {
176 TLOG_DEBUG(TLVL_WORK_STEPS) << "Nothing to postprocess (empty buffer)";
177 return 0;
178 }
179
180 if (m_first_cycle) {
181 auto head = m_latency_buffer_impl.front();
182 m_processed_up_to.set_timestamp(head->get_timestamp());
183 m_first_cycle = false;
184 TLOG() << "***** First pass post processing *****";
185 }
186
187 // Get the LB boundaries
188 auto tail = m_latency_buffer_impl.back();
189 auto newest_ts = tail->get_timestamp();
190
191 timestamp_t end_win_ts = 0;
192 std::chrono::time_point<std::chrono::system_clock> now{ std::chrono::system_clock::now() };
193
194 if (timeout) {
195 // Return if the last processed timestamp is greater than the newest timestamp
196 // This condition occurs after a timeout
197 if (m_processed_up_to.get_timestamp() >= newest_ts + 1) {
198 TLOG_DEBUG(TLVL_WORK_STEPS) << "Nothing to postprocess (at or past cap)";
199 return 0;
200 }
201
204
205 end_win_ts = newest_ts - m_processing_delay_ticks + timeout_accumulated;
206 end_win_ts = std::min(end_win_ts, newest_ts + 1); // Cap to prevent end_win_ts from becoming unnecessarily large
207 } else {
209
210 if (m_processed_up_to.get_timestamp() >= newest_ts + 1) {
211 TLOG_DEBUG(TLVL_WORK_STEPS) << "Nothing to postprocess (data arrived too late, will be ignored)";
212 return 0;
213 }
214
215 auto milliseconds = std::chrono::duration_cast<std::chrono::milliseconds>(now - m_last_post_proc_time);
216
217 if (milliseconds.count() > m_post_processing_delay_min_wait) {
218 if (newest_ts - m_processed_up_to.get_timestamp() > m_processing_delay_ticks) {
219 end_win_ts = newest_ts - m_processing_delay_ticks;
220 } else {
221 TLOG_DEBUG(TLVL_WORK_STEPS) << "Not ready to postprocess (m_processing_delay_ticks is greater)";
222 return 0;
223 }
224 } else {
225 TLOG_DEBUG(TLVL_WORK_STEPS) << "Not ready to postprocess (too fast)";
226 return 0;
227 }
228 }
229
230 auto start_iter = m_latency_buffer_impl.lower_bound(m_processed_up_to, false);
231 m_processed_up_to.set_timestamp(end_win_ts);
232 auto end_iter = m_latency_buffer_impl.lower_bound(m_processed_up_to, false);
233
234 // This likely happens when RDT uses a composite key
235 // The current algorithm does not support composite keys
236 // Our search item `m_processed_up_to` will have its other keys set to their defaults
237 // E.g., for TriggerPrimitive, channel = INVALID_TP_CHANNEL
238 // Even if an entry with the same ts exists in the buffer, its channel will be a valid (smaller) value,
239 // so `lower_bound` will not be able to find it
240 // We should verify that this is the only scenario in which we end up here
241 if (!start_iter.good()) {
242 TLOG_DEBUG(TLVL_WORK_STEPS) << "Nothing to postprocess (!start_iter.good())";
243 return 0;
244 }
245
246 if (start_iter == end_iter) {
247 TLOG_DEBUG(TLVL_WORK_STEPS) << "Nothing to postprocess (start_iter == end_iter)";
248 return 0;
249 }
250
251 int processed = 0;
252 for (auto it = start_iter; it != end_iter; ++it) {
253 // Just to be completely safe
254 // We should understand why we end up here
255 if (!it.good()) {
256 TLOG_DEBUG(TLVL_WORK_STEPS) << "Invalid iterator in postprocessing loop";
257 break;
258 }
259 m_raw_processor_impl.postprocess_item(&(*it));
260 ++processed;
261 }
262
264
265 return processed;
266 }
267
268 private:
269 LatencyBufferType& m_latency_buffer_impl;
270 RawDataProcessorType& m_raw_processor_impl;
271 const uint64_t m_processing_delay_ticks; // NOLINT(build/unsigned)
272 const uint64_t m_post_processing_delay_min_wait; // NOLINT(build/unsigned)
273 const uint64_t m_post_processing_delay_max_wait; // NOLINT(build/unsigned)
278 std::chrono::time_point<std::chrono::system_clock> m_last_post_proc_time;
279 };
280
281 // Perform processing operations on payload
282 void process_item(RDT&& payload);
283
284 // Transform payload if needed, then perform processing
285 void transform_and_process(IDT&& payload);
286
287 // Raw data consume callback
288 void consume_callback(IDT&& payload);
289
290 // Raw data consumer's work function
291 void run_consume();
292
293 // Timesync thread's work function
294 void run_timesync();
295
296 // Postprocess scheduler thread's work function
298
299 // Postprocess schedule coroutine
300 folly::coro::Task<void> postprocess_schedule();
301
302 // Dispatch data request
303 void dispatch_requests(dfmessages::DataRequest& data_request);
304
305 // Transform input data type to readout
306 virtual std::vector<RDT> transform_payload(IDT& original) const
307 {
308 return { reinterpret_cast<RDT&>(original) };
309 }
310
311 // Actions postprocess scheduler takes if no data arrives in a configured time
313 {
314 return; // No-op for this class
315 }
316
317 // Operational monitoring
318 virtual void generate_opmon_data() override;
319
320 // Constructor params
321 std::atomic<bool>& m_run_marker;
322
323 // CONFIGURATION
324 //appfwk::app::ModInit m_queue_config;
331 uint64_t m_processing_delay_ticks; // NOLINT(build/unsigned)
332 uint64_t m_post_processing_delay_min_wait; // NOLINT(build/unsigned)
333 uint64_t m_post_processing_delay_max_wait; // NOLINT(build/unsigned)
334
335 // STATS
337 using num_payload_t = std::remove_const<std::invoke_result<decltype(&metric_t::num_payloads),metric_t>::type>::type;
338 using sum_payload_t = std::remove_const<std::invoke_result<decltype(&metric_t::sum_payloads),metric_t>::type>::type;
339 using num_request_t = std::remove_const<std::invoke_result<decltype(&metric_t::num_requests),metric_t>::type>::type;
340 using sum_request_t = std::remove_const<std::invoke_result<decltype(&metric_t::sum_requests),metric_t>::type>::type;
341 using rawq_timeout_count_t = std::remove_const<std::invoke_result<decltype(&metric_t::num_data_input_timeouts),metric_t>::type>::type;
342 using num_lb_insert_failures_t = std::remove_const<std::invoke_result<decltype(&metric_t::num_lb_insert_failures),metric_t>::type>::type;
343 using num_post_processing_delay_max_waits_t = std::remove_const<std::invoke_result<decltype(&metric_t::num_post_processing_delay_max_waits),metric_t>::type>::type;
344
345 std::atomic<num_payload_t> m_num_payloads{ 0 };
346 std::atomic<sum_payload_t> m_sum_payloads{ 0 };
347 std::atomic<num_request_t> m_num_requests{ 0 };
348 std::atomic<sum_request_t> m_sum_requests{ 0 };
349 std::atomic<rawq_timeout_count_t> m_rawq_timeout_count{ 0 };
350 std::atomic<num_lb_insert_failures_t> m_num_lb_insert_failures{ 0 };
351 std::atomic<num_post_processing_delay_max_waits_t> m_num_post_processing_delay_max_waits{ 0 };
352 std::atomic<int> m_stats_packet_count{ 0 };
353
354 // CONSUMER
356
357 // RAW RECEIVER
358 std::chrono::milliseconds m_raw_receiver_timeout_ms;
359 std::chrono::microseconds m_raw_receiver_sleep_us;
361 std::shared_ptr<raw_receiver_ct> m_raw_data_receiver;
363
364 // REQUEST RECEIVERS
366 std::shared_ptr<request_receiver_ct> m_data_request_receiver;
367
368 // FRAGMENT SENDER
369 //std::chrono::milliseconds m_fragment_sender_timeout_ms;
370 //using fragment_sender_ct = iomanager::SenderConcept<std::pair<std::unique_ptr<daqdataformats::Fragment>, std::string>>;
371 //std::shared_ptr<fragment_sender_ct> m_fragment_sender;
372
373 // TIME-SYNC
375 std::shared_ptr<timesync_sender_ct> m_timesync_sender;
378
379 // POSTPROCESS SCHEDULER
381 folly::coro::Baton m_baton;
382 std::unique_ptr<folly::Timekeeper> m_timekeeper;
383
384 // LATENCY BUFFER
385 std::shared_ptr<LatencyBufferType> m_latency_buffer_impl;
386
387 // RAW PROCESSING
388 std::shared_ptr<RawDataProcessorType> m_raw_processor_impl;
389
390 // REQUEST HANDLER
391 std::shared_ptr<RequestHandlerType> m_request_handler_impl;
393
394 // ERROR REGISTRY
395 std::unique_ptr<FrameErrorRegistry> m_error_registry;
396
397 // RUN START T0
398 std::chrono::time_point<std::chrono::high_resolution_clock> m_t0;
399};
400
401} // namespace datahandlinglibs
402} // namespace dunedaq
403
404// Declarations
406
407#endif // DATAHANDLINGLIBS_INCLUDE_DATAHANDLINGLIBS_MODELS_READOUTMODEL_HPP_
PostprocessScheduleAlgorithm(LatencyBufferType &latency_buffer_impl, RawDataProcessorType &raw_processor_impl, uint64_t processing_delay_ticks, uint64_t post_processing_delay_min_wait, uint64_t post_processing_delay_max_wait)
std::chrono::time_point< std::chrono::system_clock > m_last_post_proc_time
std::atomic< num_lb_insert_failures_t > m_num_lb_insert_failures
std::unique_ptr< FrameErrorRegistry > m_error_registry
DataHandlingModel(std::atomic< bool > &run_marker)
void record(const appfwk::DAQModule::CommandData_t &args) override
std::shared_ptr< RequestHandlerType > m_request_handler_impl
void init(const appmodel::DataHandlerModule *modconf)
Forward calls from the appfwk.
std::remove_const< std::invoke_result< decltype(&metric_t::num_post_processing_delay_max_waits), metric_t >::type >::type num_post_processing_delay_max_waits_t
std::shared_ptr< timesync_sender_ct > m_timesync_sender
void stop(const appfwk::DAQModule::CommandData_t &args)
virtual std::vector< RDT > transform_payload(IDT &original) const
void start(const appfwk::DAQModule::CommandData_t &args)
std::remove_const< std::invoke_result< decltype(&metric_t::sum_payloads), metric_t >::type >::type sum_payload_t
void scrap(const appfwk::DAQModule::CommandData_t &args)
std::remove_const< std::invoke_result< decltype(&metric_t::num_requests), metric_t >::type >::type num_request_t
std::remove_const< std::invoke_result< decltype(&metric_t::sum_requests), metric_t >::type >::type sum_request_t
std::chrono::time_point< std::chrono::high_resolution_clock > m_t0
std::shared_ptr< raw_receiver_ct > m_raw_data_receiver
void conf(const appfwk::DAQModule::CommandData_t &args)
void dispatch_requests(dfmessages::DataRequest &data_request)
std::remove_const< std::invoke_result< decltype(&metric_t::num_data_input_timeouts), metric_t >::type >::type rawq_timeout_count_t
void run_consume()
Function that will be run in its own thread to read the raw packets from the connection and add them ...
void run_timesync()
Function that will be run in its own thread and sends periodic timesync messages by pushing them to t...
std::remove_const< std::invoke_result< decltype(&metric_t::num_payloads), metric_t >::type >::type num_payload_t
std::unique_ptr< folly::Timekeeper > m_timekeeper
std::atomic< num_post_processing_delay_max_waits_t > m_num_post_processing_delay_max_waits
std::remove_const< std::invoke_result< decltype(&metric_t::num_lb_insert_failures), metric_t >::type >::type num_lb_insert_failures_t
std::atomic< rawq_timeout_count_t > m_rawq_timeout_count
std::shared_ptr< LatencyBufferType > m_latency_buffer_impl
std::shared_ptr< request_receiver_ct > m_data_request_receiver
std::shared_ptr< RawDataProcessorType > m_raw_processor_impl
std::atomic< bool > run_marker
Global atomic for process lifetime.
static int64_t now()
#define TLOG_DEBUG(lvl,...)
Definition Logging.hpp:112
#define TLOG(...)
Definition macro.hpp:22
uint32_t run_number_t
Type used to represent run number.
Definition Types.hpp:20
The DUNE-DAQ namespace.
SourceID is a generalized representation of the source of a piece of data in the DAQ....
Definition SourceID.hpp:32
This message represents a request for data sent to a single component of the DAQ.