DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
DataHandlingModel.hxx
Go to the documentation of this file.
1// Declarations for DataHandlingModel
2
3#include <folly/CancellationToken.h>
4#include <folly/coro/BlockingWait.h>
5#include <folly/coro/CurrentExecutor.h>
6#include <folly/coro/Timeout.h>
7#include <folly/futures/ThreadWheelTimekeeper.h>
8
9#include <typeinfo>
10
11namespace dunedaq {
12namespace datahandlinglibs {
13
14template<class RDT, class RHT, class LBT, class RPT, class IDT>
15void
17{
18 // Check if a callback is defined (TPs use IOManager Queues instead)
19 m_raw_data_callback_conf = mcfg->get_raw_data_callback();
20 if (m_raw_data_callback_conf != nullptr) {
21 m_raw_data_callback_conf = mcfg->get_raw_data_callback();
22 TLOG_DEBUG(TLVL_WORK_STEPS) << "DataHandlingModel operating in callback mode.";
23 } else {
24 TLOG_DEBUG(TLVL_WORK_STEPS) << "DataHandlingModel operating in message polling mode.";
25 }
26
27 try {
28 for (auto input : mcfg->get_inputs()) {
29 if (input->get_data_type() == "DataRequest") {
30 m_data_request_receiver = get_iom_receiver<dfmessages::DataRequest>(input->UID());
31 } else {
32 m_raw_data_receiver_connection_name = input->UID();
33
34 if (m_raw_data_callback_conf == nullptr) {
35 m_raw_data_receiver = get_iom_receiver<IDT>(m_raw_data_receiver_connection_name);
36 m_raw_receiver_timeout_ms = std::chrono::milliseconds(input->get_recv_timeout_ms());
37 }
38 }
39 }
40 for (auto output : mcfg->get_outputs()) {
41 if (output->get_data_type() == "TimeSync") {
42 m_generate_timesync = true;
43 m_timesync_sender = get_iom_sender<dfmessages::TimeSync>(output->UID());
44 m_timesync_connection_name = output->UID();
45 break;
46 }
47 }
48 } catch (const ers::Issue& excpt) {
49 throw ResourceQueueError(ERS_HERE, "raw_input or frag_output", "DataHandlingModel", excpt);
50 }
51
52 // Raw input connection sensibility check
53 if (m_raw_data_callback_conf == nullptr && m_raw_data_receiver == nullptr) {
54 ers::error(ConfigurationError(ERS_HERE, m_sourceid, "No callback configuration, and receiver is unset!"));
55 }
56
57 // Instantiate functionalities
58 m_error_registry.reset(new FrameErrorRegistry());
59 m_error_registry->set_ers_metadata("DLH of SourceID[" + std::to_string(mcfg->get_source_id()) + "] ");
60 m_latency_buffer_impl.reset(new LBT());
61 m_raw_processor_impl.reset(new RPT(m_error_registry, mcfg->get_post_processing_enabled()));
62 m_request_handler_impl.reset(new RHT(m_latency_buffer_impl, m_error_registry));
63
64 register_node(mcfg->get_module_configuration()->get_latency_buffer()->UID(), m_latency_buffer_impl);
65 register_node(mcfg->get_module_configuration()->get_data_processor()->UID(), m_raw_processor_impl);
66 register_node(mcfg->get_module_configuration()->get_request_handler()->UID(), m_request_handler_impl);
67
68 // m_request_handler_impl->init(args);
69 // m_raw_processor_impl->init(args);
70 m_request_handler_supports_cutoff_timestamp = m_request_handler_impl->supports_cutoff_timestamp();
71 m_fake_trigger = false;
72 m_raw_receiver_sleep_us = std::chrono::microseconds::zero();
73 m_sourceid.id = mcfg->get_source_id();
74 m_sourceid.subsystem = RDT::subsystem;
75 m_processing_delay_ticks = mcfg->get_module_configuration()->get_post_processing_delay_ticks();
76 m_post_processing_delay_min_wait = mcfg->get_module_configuration()->get_post_processing_delay_min_wait();
77 m_post_processing_delay_max_wait = mcfg->get_module_configuration()->get_post_processing_delay_max_wait();
78
79 if (m_processing_delay_ticks) {
81 ers::error(ConfigurationError(
83 m_sourceid,
84 "Delayed postprocessing (post_processing_delay_ticks > 0) requires a sorted buffer (SkipList). "
85 "Queue buffers (FixedRateQueue, BinarySearchQueue) expect in-order data and must use "
86 "post_processing_delay_ticks = 0."));
87 }
88 }
89
90 // Configure implementations:
91 m_raw_processor_impl->conf(mcfg);
92 // Configure the latency buffer before the request handler so the request handler can check for alignment
93 // restrictions
94 try {
95 m_latency_buffer_impl->conf(mcfg->get_module_configuration()->get_latency_buffer());
96 } catch (const std::bad_alloc& be) {
97 ers::error(ConfigurationError(ERS_HERE, m_sourceid, "Latency Buffer can't be allocated with size!"));
98 }
99 m_request_handler_impl->conf(mcfg);
100}
101
102template<class RDT, class RHT, class LBT, class RPT, class IDT>
103void
104DataHandlingModel<RDT, RHT, LBT, RPT, IDT>::conf(const appfwk::DAQModule::CommandData_t& /*args*/)
105{
106 // Register callbacks if operating in that mode.
107 if (m_raw_data_callback_conf != nullptr) {
108 // Configure and register consume callback
109 m_consume_callback =
110 std::bind(&DataHandlingModel<RDT, RHT, LBT, RPT, IDT>::consume_callback, this, std::placeholders::_1);
111
112 // Register callback
113 auto dmcbr = DataMoveCallbackRegistry::get();
114 dmcbr->register_callback<IDT>(m_raw_data_callback_conf, m_consume_callback);
115 }
116
117 // Configure threads:
118 m_consumer_thread.set_name("consumer", m_sourceid.id);
119 if (m_generate_timesync) {
120 m_timesync_thread.set_name("timesync", m_sourceid.id);
122 if (m_processing_delay_ticks) {
123 m_postprocess_scheduler_thread.set_name("pprocsched", m_sourceid.id);
124 m_timekeeper = std::make_unique<folly::ThreadWheelTimekeeper>();
125 }
126}
127
128template<class RDT, class RHT, class LBT, class RPT, class IDT>
129void
130DataHandlingModel<RDT, RHT, LBT, RPT, IDT>::start(const appfwk::DAQModule::CommandData_t& args)
131{
132 // Reset opmon variables
133 m_sum_payloads = 0;
134 m_num_payloads = 0;
135 m_sum_requests = 0;
136 m_num_requests = 0;
137 m_num_lb_insert_failures = 0;
138 m_stats_packet_count = 0;
139 m_rawq_timeout_count = 0;
140 m_num_post_processing_delay_max_waits = 0;
141
142 m_t0 = std::chrono::high_resolution_clock::now();
143
144 m_run_number = args.value<dunedaq::daqdataformats::run_number_t>("run", 1);
145
146 TLOG_DEBUG(TLVL_WORK_STEPS) << "Starting threads...";
147 m_raw_processor_impl->start(args);
148 m_request_handler_impl->start(args);
149 if (m_raw_data_callback_conf == nullptr) {
150 m_consumer_thread.set_work(&DataHandlingModel<RDT, RHT, LBT, RPT, IDT>::run_consume, this);
151 }
152 if (m_generate_timesync) {
153 m_timesync_thread.set_work(&DataHandlingModel<RDT, RHT, LBT, RPT, IDT>::run_timesync, this);
154 }
155 if (m_processing_delay_ticks) {
156 m_postprocess_scheduler_thread.set_work(&DataHandlingModel<RDT, RHT, LBT, RPT, IDT>::run_postprocess_scheduler,
157 this);
158 }
159 // Register callback to receive and dispatch data requests
160 m_data_request_receiver->add_callback(
161 std::bind(&DataHandlingModel<RDT, RHT, LBT, RPT, IDT>::dispatch_requests, this, std::placeholders::_1));
162}
163
164template<class RDT, class RHT, class LBT, class RPT, class IDT>
165void
166DataHandlingModel<RDT, RHT, LBT, RPT, IDT>::stop(const appfwk::DAQModule::CommandData_t& args)
167{
168 TLOG_DEBUG(TLVL_WORK_STEPS) << "Stoppping threads...";
169
170 // Stop receiving data requests as first thing
171 m_data_request_receiver->remove_callback();
172 // Stop the other threads
173 m_request_handler_impl->stop(args);
174 if (m_generate_timesync) {
175 while (!m_timesync_thread.get_readiness()) {
176 std::this_thread::sleep_for(std::chrono::milliseconds(10));
177 }
178 }
179 if (m_raw_data_callback_conf == nullptr) {
180 while (!m_consumer_thread.get_readiness()) {
181 std::this_thread::sleep_for(std::chrono::milliseconds(10));
182 }
183 }
184 if (m_processing_delay_ticks) {
185 m_baton.post(); // In case the coroutine is still waiting when the consumer has stopped
186 while (!m_postprocess_scheduler_thread.get_readiness()) {
187 std::this_thread::sleep_for(std::chrono::milliseconds(10));
188 }
189 }
190 TLOG_DEBUG(TLVL_WORK_STEPS) << "Flushing latency buffer with occupancy: " << m_latency_buffer_impl->occupancy();
191 m_latency_buffer_impl->flush();
192 m_raw_processor_impl->stop(args);
193 m_raw_processor_impl->reset_last_daq_time();
194}
195
196template<class RDT, class RHT, class LBT, class RPT, class IDT>
197void
199{
201 ri.set_sum_payloads(m_sum_payloads.load());
202 ri.set_num_payloads(m_num_payloads.exchange(0));
203
204 ri.set_num_data_input_timeouts(m_rawq_timeout_count.exchange(0));
205
206 auto now = std::chrono::high_resolution_clock::now();
207 int new_packets = m_stats_packet_count.exchange(0);
208 double seconds = std::chrono::duration_cast<std::chrono::microseconds>(now - m_t0).count() / 1000000.;
209 m_t0 = now;
210
211 // 08-May-2025, KAB: added a message to warn users when latency buffer inserts are failing.
212 int local_num_lb_insert_failures = m_num_lb_insert_failures.exchange(0);
213 if (local_num_lb_insert_failures != 0) {
215 NonZeroLatencyBufferInsertFailures(ERS_HERE, m_sourceid, local_num_lb_insert_failures, ri.num_payloads()));
216 }
217
218 ri.set_rate_payloads_consumed(new_packets / seconds / 1000.);
219 ri.set_num_lb_insert_failures(local_num_lb_insert_failures);
220 ri.set_sum_requests(m_sum_requests.load());
221 ri.set_num_requests(m_num_requests.exchange(0));
222 ri.set_num_post_processing_delay_max_waits(m_num_post_processing_delay_max_waits.exchange(0));
223 ri.set_last_daq_timestamp(m_raw_processor_impl->get_last_daq_time());
224 ri.set_newest_timestamp(m_raw_processor_impl->get_last_daq_time());
225 ri.set_oldest_timestamp(m_request_handler_impl->get_oldest_time());
226
227 this->publish(std::move(ri));
228}
229
230template<class RDT, class RHT, class LBT, class RPT, class IDT>
231void
233{
234 if constexpr (std::is_same_v<IDT, RDT>) {
235 process_item(std::move(payload));
236 } else {
237 auto transformed = transform_payload(payload);
238 for (auto& i : transformed) {
239 process_item(std::move(i));
240 }
241 }
242}
243
244template<class RDT, class RHT, class LBT, class RPT, class IDT>
245void
247{
248 transform_and_process(std::move(payload));
249}
250
251template<class RDT, class RHT, class LBT, class RPT, class IDT>
252void
254{
255 m_raw_processor_impl->preprocess_item(&payload);
256 if (m_request_handler_supports_cutoff_timestamp) {
257 int64_t diff1 = payload.get_timestamp() - m_request_handler_impl->get_cutoff_timestamp();
258 if (diff1 <= 0) {
259 // m_request_handler_impl->increment_tardy_tp_count();
260 ers::warning(DataPacketArrivedTooLate(ERS_HERE,
261 m_sourceid,
262 m_run_number,
263 payload.get_timestamp(),
264 m_request_handler_impl->get_cutoff_timestamp(),
265 diff1,
266 (static_cast<double>(diff1) / 62500.0)));
267 }
268 }
269 if (!m_latency_buffer_impl->write(std::move(payload))) {
270 // TLOG_DEBUG(TLVL_TAKE_NOTE) << "***ERROR: Latency buffer insert failed! (Payload timestamp=" <<
271 // payload.get_timestamp() << ")";
272 m_num_lb_insert_failures++;
273 return;
274 }
275
276 if (m_processing_delay_ticks == 0) {
277 m_raw_processor_impl->postprocess_item(m_latency_buffer_impl->back());
278 ++m_num_payloads;
279 ++m_sum_payloads;
280 ++m_stats_packet_count;
281 } else {
282 m_baton.post();
283 }
284}
286template<class RDT, class RHT, class LBT, class RPT, class IDT>
287void
289{
290 folly::coro::blockingWait(postprocess_schedule());
292
293template<class RDT, class RHT, class LBT, class RPT, class IDT>
294void
296{
298 TLOG_DEBUG(TLVL_WORK_STEPS) << "Consumer thread started...";
299 m_rawq_timeout_count = 0;
300 m_num_payloads = 0;
301 m_sum_payloads = 0;
302 m_stats_packet_count = 0;
303 m_num_post_processing_delay_max_waits = 0;
304
305 while (m_run_marker.load()) {
306 // Try to acquire data
307
308 auto opt_payload = m_raw_data_receiver->try_receive(m_raw_receiver_timeout_ms);
309
310 if (opt_payload) {
311 IDT& payload = opt_payload.value();
312 transform_and_process(std::move(payload));
313 } else {
314 ++m_rawq_timeout_count;
315 // Protection against a zero sleep becoming a yield
316 if (m_raw_receiver_sleep_us != std::chrono::microseconds::zero())
317 std::this_thread::sleep_for(m_raw_receiver_sleep_us);
319 }
320 TLOG_DEBUG(TLVL_WORK_STEPS) << "Consumer thread joins... ";
321}
322
323template<class RDT, class RHT, class LBT, class RPT, class IDT>
324folly::coro::Task<void>
326{
327
328 TLOG_DEBUG(TLVL_WORK_STEPS) << "Postprocess schedule coroutine started...";
329 TLOG() << "***** Starting post-process coroutine with timout " << m_post_processing_delay_max_wait << " *****";
330
331 PostprocessScheduleAlgorithm sched_algo{ *m_latency_buffer_impl,
332 *m_raw_processor_impl,
333 m_processing_delay_ticks,
334 m_post_processing_delay_min_wait,
335 m_post_processing_delay_max_wait };
336
337 const auto wait_data = [this]() -> folly::coro::Task<void> {
338 // folly::coro::timeout cancels the task on timeout.
339 // Baton is not cancellable, so we attach a callback to resume the coroutine.
340 auto token = co_await folly::coro::co_current_cancellation_token;
341 folly::CancellationCallback cb(token, [this] { m_baton.post(); });
342 co_await m_baton; // Wait data
343 };
344
345 while (m_run_marker.load()) {
346 bool timeout = false;
347
348 if (m_post_processing_delay_max_wait > 0) {
349 try {
350 co_await folly::coro::timeout(
351 wait_data(), std::chrono::milliseconds{ m_post_processing_delay_max_wait }, m_timekeeper.get());
352
353 } catch (const folly::FutureTimeout&) {
354 timeout = true;
355 ++m_num_post_processing_delay_max_waits;
356 }
357 } else {
358 co_await m_baton;
359 }
360
361 m_baton.reset();
362
363 if (auto processed = sched_algo.run(timeout); processed > 0) {
364 m_num_payloads += processed;
365 m_sum_payloads += processed;
366 m_stats_packet_count += processed;
367 }
368 }
369}
370
371template<class RDT, class RHT, class LBT, class RPT, class IDT>
372void
374{
375 TLOG_DEBUG(TLVL_WORK_STEPS) << "TimeSync thread started...";
376 m_num_requests = 0;
377 m_sum_requests = 0;
378 uint64_t msg_seqno = 0;
379 timestamp_t prev_timestamp = 0;
380 auto once_per_run = true;
381 size_t zero_timestamp_count = 0;
382 size_t duplicate_timestamp_count = 0;
383 size_t total_timestamp_count = 0;
384 while (m_run_marker.load()) {
385 try {
386 auto timesyncmsg = dfmessages::TimeSync(m_raw_processor_impl->get_last_daq_time());
387 ++total_timestamp_count;
388 // daq_time is zero for the first received timesync, and may
389 // be the same as the previous daq_time if the data has
390 // stopped flowing. In both cases we don't send the TimeSync
391 if (timesyncmsg.daq_time != 0 && timesyncmsg.daq_time != prev_timestamp) {
392 prev_timestamp = timesyncmsg.daq_time;
393 timesyncmsg.run_number = m_run_number;
394 timesyncmsg.sequence_number = ++msg_seqno;
395 timesyncmsg.source_id = m_sourceid.id;
396 TLOG_DEBUG(TLVL_TIME_SYNCS) << "New timesync: daq=" << timesyncmsg.daq_time
397 << " wall=" << timesyncmsg.system_time << " run=" << timesyncmsg.run_number
398 << " seqno=" << timesyncmsg.sequence_number
399 << " source_id=" << timesyncmsg.source_id;
400 try {
401 dfmessages::TimeSync timesyncmsg_copy(timesyncmsg);
402 m_timesync_sender->send(std::move(timesyncmsg_copy), std::chrono::milliseconds(500));
403 } catch (ers::Issue& excpt) {
404 ers::warning(TimeSyncTransmissionFailed(ERS_HERE, m_sourceid, m_timesync_connection_name, excpt));
405 }
406
407 if (m_fake_trigger) {
409 ++m_current_fake_trigger_id;
410 dr.trigger_number = m_current_fake_trigger_id;
411 dr.trigger_timestamp = timesyncmsg.daq_time > 500 * us ? timesyncmsg.daq_time - 500 * us : 0;
412 auto width = 300000;
413 uint offset = 100;
416 dr.request_information.component = m_sourceid;
417 dr.data_destination = "data_fragments_q";
418 TLOG_DEBUG(TLVL_WORK_STEPS) << "Issuing fake trigger based on timesync. "
419 << " ts=" << dr.trigger_timestamp
420 << " window_begin=" << dr.request_information.window_begin
421 << " window_end=" << dr.request_information.window_end;
422 m_request_handler_impl->issue_request(dr);
423
424 ++m_num_requests;
425 ++m_sum_requests;
426 }
427 } else {
428 if (timesyncmsg.daq_time == 0) {
429 ++zero_timestamp_count;
430 }
431 if (timesyncmsg.daq_time == prev_timestamp) {
432 ++duplicate_timestamp_count;
433 }
434 if (once_per_run) {
435 TLOG() << "Timesync with DAQ time 0 won't be sent out as it's an invalid sync.";
436 once_per_run = false;
437 }
438 }
439 } catch (const iomanager::TimeoutExpired& excpt) {
440 // ++m_timesyncqueue_timeout;
441 }
442 // Split up the 100ms sleep into 10 sleeps of 10ms, so we respond to "stop" quicker
443 for (size_t i = 0; i < 10; ++i) {
444 std::this_thread::sleep_for(std::chrono::milliseconds(10));
445 if (!m_run_marker.load()) {
446 break;
447 }
448 }
449 }
450 once_per_run = true;
451 TLOG_DEBUG(TLVL_WORK_STEPS) << "TimeSync thread joins... (timestamp count, zero/same/total = "
452 << zero_timestamp_count << "/" << duplicate_timestamp_count << "/"
453 << total_timestamp_count << ")";
454}
455
456template<class RDT, class RHT, class LBT, class RPT, class IDT>
457void
459{
460 if (data_request.request_information.component != m_sourceid) {
461 ers::error(RequestSourceIDMismatch(ERS_HERE, m_sourceid, data_request.request_information.component));
462 return;
463 }
464 TLOG_DEBUG(TLVL_QUEUE_POP) << "Received DataRequest"
465 << " for trig/seq_number " << data_request.trigger_number << "."
466 << data_request.sequence_number << ", runno " << data_request.run_number
467 << ", trig timestamp " << data_request.trigger_timestamp
468 << ", SourceID: " << data_request.request_information.component << ", window begin/end "
469 << data_request.request_information.window_begin << "/"
470 << data_request.request_information.window_end
471 << ", dest: " << data_request.data_destination;
472 m_request_handler_impl->issue_request(data_request);
473 ++m_num_requests;
474 ++m_sum_requests;
475}
476
477} // namespace datahandlinglibs
478} // namespace dunedaq
#define ERS_HERE
const dunedaq::appmodel::LatencyBuffer * get_latency_buffer() const
Get "latency_buffer" relationship value.
uint64_t get_post_processing_delay_ticks() const
Get "post_processing_delay_ticks" attribute value. Number of clock tick by which post processing of i...
const dunedaq::appmodel::DataProcessor * get_data_processor() const
Get "data_processor" relationship value.
uint64_t get_post_processing_delay_max_wait() const
Get "post_processing_delay_max_wait" attribute value. Maximum wait time (ms) before post processing c...
uint64_t get_post_processing_delay_min_wait() const
Get "post_processing_delay_min_wait" attribute value. Minimum time (ms) between consecutive post proc...
const dunedaq::appmodel::RequestHandler * get_request_handler() const
Get "request_handler" relationship value.
const dunedaq::appmodel::DataHandlerConf * get_module_configuration() const
Get "module_configuration" relationship value.
const dunedaq::appmodel::DataMoveCallbackConf * get_raw_data_callback() const
Get "raw_data_callback" relationship value. Configuration for raw data callback.
uint32_t get_source_id() const
Get "source_id" attribute value.
bool get_post_processing_enabled() const
Get "post_processing_enabled" attribute value.
const std::string & UID() const noexcept
const std::vector< const dunedaq::confmodel::Connection * > & get_inputs() const
Get "inputs" relationship value. List of connections to/from this module.
const std::vector< const dunedaq::confmodel::Connection * > & get_outputs() const
Get "outputs" relationship value. Output connections from this module.
void init(const appmodel::DataHandlerModule *modconf)
Forward calls from the appfwk.
void stop(const appfwk::DAQModule::CommandData_t &args)
void start(const appfwk::DAQModule::CommandData_t &args)
void conf(const appfwk::DAQModule::CommandData_t &args)
void dispatch_requests(dfmessages::DataRequest &data_request)
void run_consume()
Function that will be run in its own thread to read the raw packets from the connection and add them ...
void run_timesync()
Function that will be run in its own thread and sends periodic timesync messages by pushing them to t...
static std::shared_ptr< DataMoveCallbackRegistry > get()
Base class for any user define issue.
Definition Issue.hpp:69
double offset
static int64_t now()
#define TLOG_DEBUG(lvl,...)
Definition Logging.hpp:112
#define TLOG(...)
Definition macro.hpp:22
uint32_t run_number_t
Type used to represent run number.
Definition Types.hpp:20
Including Qt Headers.
static std::shared_ptr< iomanager::SenderConcept< Datatype > > get_iom_sender(iomanager::ConnectionId const &id)
SourceID[" << sourceid << "] Command daqdataformats::SourceID Readout Initialization std::string initerror Configuration std::string conferror Configuration std::string conferror TimeSyncTransmissionFailed
static std::shared_ptr< iomanager::ReceiverConcept< Datatype > > get_iom_receiver(iomanager::ConnectionId const &id)
void warning(const Issue &issue)
Definition ers.hpp:115
void error(const Issue &issue)
Definition ers.hpp:81
timestamp_t window_end
End of the data collection window.
SourceID component
The Requested Component.
timestamp_t window_begin
Start of the data collection window.
This message represents a request for data sent to a single component of the DAQ.
sequence_number_t sequence_number
Sequence Number of the request.
trigger_number_t trigger_number
Trigger number the request corresponds to.
timestamp_t trigger_timestamp
Timestamp of trigger.
run_number_t run_number
The current run number.
A synthetic message used to ensure that all elements of a DAQ system are synchronized.
Definition TimeSync.hpp:25