DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
SNBDataHandlingModel.hxx
Go to the documentation of this file.
1// Declarations for SNBDataHandlingModel
2
3#include <folly/coro/BlockingWait.h>
4#include <folly/coro/Timeout.h>
5
6#include <typeinfo>
7
8namespace dunedaq {
9namespace snbmodules {
10
11template<class RDT, class RHT, class LBT, class RPT, class IDT>
12void
14{
15 // Setup request queues
16 //setup_request_queues(mcfg);
17 try {
18 for (auto input : mcfg->get_inputs()) {
19 if (input->get_data_type() == "DataRequest") {
20 m_data_request_receiver = get_iom_receiver<dfmessages::DataRequest>(input->UID()) ;
21 }
22 else {
23 m_raw_data_receiver_connection_name = input->UID();
24 // Parse for prefix
25 std::string conn_name = input->UID();
26 const char delim = '_';
27 std::vector<std::string> words;
28 std::size_t start;
29 std::size_t end = 0;
30 while ((start = conn_name.find_first_not_of(delim, end)) != std::string::npos) {
31 end = conn_name.find(delim, start);
32 words.push_back(conn_name.substr(start, end - start));
33 }
34
35 TLOG_DEBUG(TLVL_WORK_STEPS) << "Initialize connection based on uid: "
36 << m_raw_data_receiver_connection_name << " front word: " << words.front();
37
38 std::string cb_prefix("cb");
39 if (words.front() == cb_prefix) {
40 m_callback_mode = true;
41 }
42
43 if (!m_callback_mode) {
44 m_raw_data_receiver = get_iom_receiver<IDT>(m_raw_data_receiver_connection_name);
45 m_raw_receiver_timeout_ms = std::chrono::milliseconds(input->get_recv_timeout_ms());
46 }
47 }
48 }
49 for (auto output : mcfg->get_outputs()) {
50 if (output->get_data_type() == "TimeSync") {
51 m_generate_timesync = true;
52 m_timesync_sender = get_iom_sender<dfmessages::TimeSync>(output->UID()) ;
53 m_timesync_connection_name = output->UID();
54 break;
55 }
56 }
57 } catch (const ers::Issue& excpt) {
58 throw datahandlinglibs::ResourceQueueError(ERS_HERE, "raw_input or frag_output", "SNBDataHandlingModel", excpt);
59 }
60
61 // Raw input connection sensibility check
62 if (!m_callback_mode && m_raw_data_receiver == nullptr) {
63 ers::error(datahandlinglibs::ConfigurationError(ERS_HERE, m_sourceid, "Non callback mode, and receiver is unset!"));
64 }
65
66 // Instantiate functionalities
67 m_error_registry.reset(new datahandlinglibs::FrameErrorRegistry());
68 m_error_registry->set_ers_metadata("DLH of SourceID[" + std::to_string(mcfg->get_source_id()) + "] ");
69 m_latency_buffer_impl.reset(new LBT());
70 m_raw_processor_impl.reset(new RPT(m_error_registry, mcfg->get_post_processing_enabled()));
71 m_request_handler_impl.reset(new RHT(m_latency_buffer_impl, m_error_registry));
72
73 register_node(mcfg->get_module_configuration()->get_latency_buffer()->UID(), m_latency_buffer_impl);
74 register_node(mcfg->get_module_configuration()->get_data_processor()->UID(), m_raw_processor_impl);
75 register_node(mcfg->get_module_configuration()->get_request_handler()->UID(), m_request_handler_impl);
76
77 //m_request_handler_impl->init(args);
78 //m_raw_processor_impl->init(args);
79 m_request_handler_supports_cutoff_timestamp = m_request_handler_impl->supports_cutoff_timestamp();
80 m_fake_trigger = false;
81 m_raw_receiver_sleep_us = std::chrono::microseconds::zero();
82 m_sourceid.id = mcfg->get_source_id();
83 m_sourceid.subsystem = RDT::subsystem;
84 m_processing_delay_ticks = mcfg->get_module_configuration()->get_post_processing_delay_ticks();
85 m_post_processing_delay_min_wait = mcfg->get_module_configuration()->get_post_processing_delay_min_wait();
86 m_post_processing_delay_max_wait = mcfg->get_module_configuration()->get_post_processing_delay_max_wait();
87
88
89 // Configure implementations:
90 m_raw_processor_impl->conf(mcfg);
91 // Configure the latency buffer before the request handler so the request handler can check for alignment
92 // restrictions
93 try {
94 m_latency_buffer_impl->conf(mcfg->get_module_configuration()->get_latency_buffer());
95 } catch (const std::bad_alloc& be) {
97 datahandlinglibs::ConfigurationError(ERS_HERE, m_sourceid, "Latency Buffer can't be allocated with size!"));
98 }
99 m_request_handler_impl->conf(mcfg);
100}
101
102template<class RDT, class RHT, class LBT, class RPT, class IDT>
103void
104SNBDataHandlingModel<RDT, RHT, LBT, RPT, IDT>::conf(const appfwk::DAQModule::CommandData_t& /*args*/)
105{
106 // Register callbacks if operating in that mode.
107 if (m_callback_mode) {
108 // Configure and register consume callback
109 m_consume_callback = std::bind(&SNBDataHandlingModel<RDT, RHT, LBT, RPT, IDT>::consume_callback, this, std::placeholders::_1);
110
111 // Register callback
113 dmcbr->register_callback<IDT>(m_raw_data_receiver_connection_name, m_consume_callback);
114 }
115
116 // Configure threads:
117 m_consumer_thread.set_name("consumer", m_sourceid.id);
118 if (m_generate_timesync) {
119 m_timesync_thread.set_name("timesync", m_sourceid.id);
120 }
121 if (m_processing_delay_ticks) {
122 m_postprocess_scheduler_thread.set_name("pprocsched", m_sourceid.id);
123 m_timekeeper = std::make_unique<folly::ThreadWheelTimekeeper>();
124 }
125}
126
127
128template<class RDT, class RHT, class LBT, class RPT, class IDT>
129void
130SNBDataHandlingModel<RDT, RHT, LBT, RPT, IDT>::start(const appfwk::DAQModule::CommandData_t& args)
131{
132 // Reset opmon variables
133 m_sum_payloads = 0;
134 m_num_payloads = 0;
135 m_sum_requests = 0;
136 m_num_requests = 0;
137 m_num_lb_insert_failures = 0;
138 m_stats_packet_count = 0;
139 m_rawq_timeout_count = 0;
140 m_num_post_processing_delay_max_waits = 0;
141
142 m_t0 = std::chrono::high_resolution_clock::now();
143
144 m_run_number = args.value<dunedaq::daqdataformats::run_number_t>("run", 1);
145
146 TLOG_DEBUG(TLVL_WORK_STEPS) << "Starting threads...";
147 m_raw_processor_impl->start(args);
148 m_request_handler_impl->start(args);
149 if (!m_callback_mode) {
150 m_consumer_thread.set_work(&SNBDataHandlingModel<RDT, RHT, LBT, RPT, IDT>::run_consume, this);
151 }
152 if (m_generate_timesync) {
153 m_timesync_thread.set_work(&SNBDataHandlingModel<RDT, RHT, LBT, RPT, IDT>::run_timesync, this);
154 }
155 if (m_processing_delay_ticks) {
156 m_postprocess_scheduler_thread.set_work(&SNBDataHandlingModel<RDT, RHT, LBT, RPT, IDT>::run_postprocess_scheduler, this);
157 }
158 // Register callback to receive and dispatch data requests
159 m_data_request_receiver->add_callback(
160 std::bind(&SNBDataHandlingModel<RDT, RHT, LBT, RPT, IDT>::dispatch_requests, this, std::placeholders::_1));
161}
162
163template<class RDT, class RHT, class LBT, class RPT, class IDT>
164void
165SNBDataHandlingModel<RDT, RHT, LBT, RPT, IDT>::stop(const appfwk::DAQModule::CommandData_t& args)
166{
167 TLOG_DEBUG(TLVL_WORK_STEPS) << "Stoppping threads...";
168
169 // Stop receiving data requests as first thing
170 m_data_request_receiver->remove_callback();
171 // Stop the other threads
172 m_request_handler_impl->stop(args);
173 if (m_generate_timesync) {
174 while (!m_timesync_thread.get_readiness()) {
175 std::this_thread::sleep_for(std::chrono::milliseconds(10));
176 }
177 }
178 if (!m_callback_mode) {
179 while (!m_consumer_thread.get_readiness()) {
180 std::this_thread::sleep_for(std::chrono::milliseconds(10));
181 }
182 }
183 if (m_processing_delay_ticks) {
184 m_baton.post(); // In case the coroutine is still waiting when the consumer has stopped
185 while (!m_postprocess_scheduler_thread.get_readiness()) {
186 std::this_thread::sleep_for(std::chrono::milliseconds(10));
187 }
188 }
189 TLOG_DEBUG(TLVL_WORK_STEPS) << "Flushing latency buffer with occupancy: " << m_latency_buffer_impl->occupancy();
190 m_latency_buffer_impl->flush();
191 m_raw_processor_impl->stop(args);
192 m_raw_processor_impl->reset_last_daq_time();
193}
194
195template<class RDT, class RHT, class LBT, class RPT, class IDT>
196void
198 {
200 ri.set_sum_payloads(m_sum_payloads.load());
201 ri.set_num_payloads(m_num_payloads.exchange(0));
202
203 ri.set_num_data_input_timeouts(m_rawq_timeout_count.exchange(0));
204
205 auto now = std::chrono::high_resolution_clock::now();
206 int new_packets = m_stats_packet_count.exchange(0);
207 double seconds = std::chrono::duration_cast<std::chrono::microseconds>(now - m_t0).count() / 1000000.;
208 m_t0 = now;
209
210 // 08-May-2025, KAB: added a message to warn users when latency buffer inserts are failing.
211 int local_num_lb_insert_failures = m_num_lb_insert_failures.exchange(0);
212 if (local_num_lb_insert_failures != 0) {
213 ers::warning(datahandlinglibs::NonZeroLatencyBufferInsertFailures(
214 ERS_HERE, m_sourceid, local_num_lb_insert_failures, ri.num_payloads()));
215 }
216
217 ri.set_rate_payloads_consumed(new_packets / seconds / 1000.);
218 ri.set_num_lb_insert_failures(local_num_lb_insert_failures);
219 ri.set_sum_requests(m_sum_requests.load());
220 ri.set_num_requests(m_num_requests.exchange(0));
221 ri.set_num_post_processing_delay_max_waits(m_num_post_processing_delay_max_waits.exchange(0));
222 ri.set_last_daq_timestamp(m_raw_processor_impl->get_last_daq_time());
223 ri.set_newest_timestamp(m_raw_processor_impl->get_last_daq_time());
224 ri.set_oldest_timestamp(m_request_handler_impl->get_oldest_time());
225
226 this->publish(std::move(ri));
227 }
228
229template<class RDT, class RHT, class LBT, class RPT, class IDT>
230void
232 if constexpr (std::is_same_v<IDT, RDT>) {
233 process_item(std::move(payload));
234 } else {
235 auto transformed = transform_payload(payload);
236 for (auto& i : transformed) {
237 process_item(std::move(i));
238 }
239 }
240}
241
242template<class RDT, class RHT, class LBT, class RPT, class IDT>
243void
245 transform_and_process(std::move(payload));
246}
247
248template<class RDT, class RHT, class LBT, class RPT, class IDT>
249void
251{
252 m_raw_processor_impl->preprocess_item(&payload);
253 if (m_request_handler_supports_cutoff_timestamp) {
254 int64_t diff1 = payload.get_timestamp() - m_request_handler_impl->get_cutoff_timestamp();
255 if (diff1 <= 0) {
256 //m_request_handler_impl->increment_tardy_tp_count();
257 ers::warning(datahandlinglibs::DataPacketArrivedTooLate(ERS_HERE,
258 m_sourceid,
259 m_run_number,
260 payload.get_timestamp(),
261 m_request_handler_impl->get_cutoff_timestamp(), diff1,
262 (static_cast<double>(diff1)/62500.0)));
263 }
264 }
265 while (m_latency_buffer_impl->isFull()) {
266 std::this_thread::sleep_for(std::chrono::milliseconds(1));
267 }
268 if (!m_latency_buffer_impl->write(std::move(payload))) {
269 // TLOG_DEBUG(TLVL_TAKE_NOTE) << "***ERROR: Latency buffer insert failed! (Payload timestamp=" << payload.get_timestamp() << ")";
270 m_num_lb_insert_failures++;
271 return;
272 }
273
274 if (m_processing_delay_ticks == 0) {
275 m_raw_processor_impl->postprocess_item(m_latency_buffer_impl->back());
276 ++m_num_payloads;
277 ++m_sum_payloads;
278 ++m_stats_packet_count;
279 } else {
280 m_baton.post();
281 }
282}
283
284template<class RDT, class RHT, class LBT, class RPT, class IDT>
285void
287{
288 folly::coro::blockingWait(postprocess_schedule());
289}
290
291template<class RDT, class RHT, class LBT, class RPT, class IDT>
292void
294{
295
296 TLOG_DEBUG(TLVL_WORK_STEPS) << "Consumer thread started...";
297 m_rawq_timeout_count = 0;
298 m_num_payloads = 0;
299 m_sum_payloads = 0;
300 m_stats_packet_count = 0;
301 m_num_post_processing_delay_max_waits = 0;
302
303 while (m_run_marker.load()) {
304 // Try to acquire data
305
306 auto opt_payload = m_raw_data_receiver->try_receive(m_raw_receiver_timeout_ms);
307
308 if (opt_payload) {
309 IDT& payload = opt_payload.value();
310 transform_and_process(std::move(payload));
311 } else {
312 ++m_rawq_timeout_count;
313 // Protection against a zero sleep becoming a yield
314 if ( m_raw_receiver_sleep_us != std::chrono::microseconds::zero())
315 std::this_thread::sleep_for(m_raw_receiver_sleep_us);
316 }
317 }
318 TLOG_DEBUG(TLVL_WORK_STEPS) << "Consumer thread joins... ";
319}
320
321template<class RDT, class RHT, class LBT, class RPT, class IDT>
322folly::coro::Task<void>
324
325 TLOG_DEBUG(TLVL_WORK_STEPS) << "Postprocess schedule coroutine started...";
326 timestamp_t newest_ts = 0;
327 timestamp_t end_win_ts = 0;
328 bool first_cycle = true;
329 auto last_post_proc_time = std::chrono::system_clock::now();
330 auto now = last_post_proc_time;
331 std::chrono::milliseconds milliseconds;
332 RDT processed_element;
333
334 // Deferral of the post processing, to allow elements being reordered in the LB
335 // Basically, find data older than a certain timestamp and process all data since the last post-processed element up to that value
336 while (m_run_marker.load()) {
337 try {
338 co_await folly::coro::timeout(
339 m_baton.operator co_await(),
340 std::chrono::milliseconds{m_post_processing_delay_max_wait},
341 m_timekeeper.get());
342 m_baton.reset();
343 } catch (const folly::FutureTimeout&) {
344 ++m_num_post_processing_delay_max_waits;
345 }
346
347 if (m_latency_buffer_impl->occupancy() == 0) {
348 continue;
349 }
350
351 now = std::chrono::system_clock::now();
352 milliseconds = std::chrono::duration_cast<std::chrono::milliseconds>(now - last_post_proc_time);
353
354 if (milliseconds.count() <= m_post_processing_delay_min_wait) {
355 continue;
356 }
357
358 last_post_proc_time = now;
359
360 // Get the LB boundaries
361 auto tail = m_latency_buffer_impl->back();
362 newest_ts = tail->get_timestamp();
363
364 if (first_cycle) {
365 auto head = m_latency_buffer_impl->front();
366 processed_element.set_timestamp(head->get_timestamp());
367 first_cycle = false;
368 TLOG() << "***** First pass post processing *****";
369 }
370
371 if (newest_ts - processed_element.get_timestamp() > m_processing_delay_ticks) {
372 end_win_ts = newest_ts - m_processing_delay_ticks;
373 auto start_iter = m_latency_buffer_impl->lower_bound(processed_element, false);
374 processed_element.set_timestamp(end_win_ts);
375 auto end_iter = m_latency_buffer_impl->lower_bound(processed_element, false);
376
377 for (auto it = start_iter; it != end_iter; ++it) {
378 m_raw_processor_impl->postprocess_item(&(*it));
379 ++m_num_payloads;
380 ++m_sum_payloads;
381 ++m_stats_packet_count;
382 }
383 }
384 }
385}
386
387template<class RDT, class RHT, class LBT, class RPT, class IDT>
388void
390{
391 TLOG_DEBUG(TLVL_WORK_STEPS) << "TimeSync thread started...";
392 m_num_requests = 0;
393 m_sum_requests = 0;
394 uint64_t msg_seqno = 0;
395 timestamp_t prev_timestamp = 0;
396 auto once_per_run = true;
397 size_t zero_timestamp_count = 0;
398 size_t duplicate_timestamp_count = 0;
399 size_t total_timestamp_count = 0;
400 while (m_run_marker.load()) {
401 try {
402 auto timesyncmsg = dfmessages::TimeSync(m_raw_processor_impl->get_last_daq_time());
403 ++total_timestamp_count;
404 // daq_time is zero for the first received timesync, and may
405 // be the same as the previous daq_time if the data has
406 // stopped flowing. In both cases we don't send the TimeSync
407 if (timesyncmsg.daq_time != 0 && timesyncmsg.daq_time != prev_timestamp) {
408 prev_timestamp = timesyncmsg.daq_time;
409 timesyncmsg.run_number = m_run_number;
410 timesyncmsg.sequence_number = ++msg_seqno;
411 timesyncmsg.source_id = m_sourceid.id;
412 TLOG_DEBUG(TLVL_TIME_SYNCS) << "New timesync: daq=" << timesyncmsg.daq_time
413 << " wall=" << timesyncmsg.system_time << " run=" << timesyncmsg.run_number
414 << " seqno=" << timesyncmsg.sequence_number << " source_id=" << timesyncmsg.source_id;
415 try {
416 dfmessages::TimeSync timesyncmsg_copy(timesyncmsg);
417 m_timesync_sender->send(std::move(timesyncmsg_copy), std::chrono::milliseconds(500));
418 } catch (ers::Issue& excpt) {
420 datahandlinglibs::TimeSyncTransmissionFailed(ERS_HERE, m_sourceid, m_timesync_connection_name, excpt));
421 }
422
423 if (m_fake_trigger) {
425 ++m_current_fake_trigger_id;
426 dr.trigger_number = m_current_fake_trigger_id;
427 dr.trigger_timestamp = timesyncmsg.daq_time > 500 * us ? timesyncmsg.daq_time - 500 * us : 0;
428 auto width = 300000;
429 uint offset = 100;
432 dr.request_information.component = m_sourceid;
433 dr.data_destination = "data_fragments_q";
434 TLOG_DEBUG(TLVL_WORK_STEPS) << "Issuing fake trigger based on timesync. "
435 << " ts=" << dr.trigger_timestamp
436 << " window_begin=" << dr.request_information.window_begin
437 << " window_end=" << dr.request_information.window_end;
438 m_request_handler_impl->issue_request(dr);
439
440 ++m_num_requests;
441 ++m_sum_requests;
442 }
443 } else {
444 if (timesyncmsg.daq_time == 0) {++zero_timestamp_count;}
445 if (timesyncmsg.daq_time == prev_timestamp) {++duplicate_timestamp_count;}
446 if (once_per_run) {
447 TLOG() << "Timesync with DAQ time 0 won't be sent out as it's an invalid sync.";
448 once_per_run = false;
449 }
450 }
451 } catch (const iomanager::TimeoutExpired& excpt) {
452 // ++m_timesyncqueue_timeout;
453 }
454 // Split up the 100ms sleep into 10 sleeps of 10ms, so we respond to "stop" quicker
455 for (size_t i=0; i<10; ++i) {
456 std::this_thread::sleep_for(std::chrono::milliseconds(10));
457 if (!m_run_marker.load()) {
458 break;
459 }
460 }
461 }
462 once_per_run = true;
463 TLOG_DEBUG(TLVL_WORK_STEPS) << "TimeSync thread joins... (timestamp count, zero/same/total = "
464 << zero_timestamp_count << "/" << duplicate_timestamp_count << "/"
465 << total_timestamp_count << ")";
466}
467
468template<class RDT, class RHT, class LBT, class RPT, class IDT>
469void
471{
472 if (data_request.request_information.component != m_sourceid) {
473 ers::error(datahandlinglibs::RequestSourceIDMismatch(ERS_HERE, m_sourceid, data_request.request_information.component));
474 return;
475 }
476 TLOG_DEBUG(TLVL_QUEUE_POP) << "Received DataRequest"
477 << " for trig/seq_number " << data_request.trigger_number << "." << data_request.sequence_number
478 << ", runno " << data_request.run_number
479 << ", trig timestamp " << data_request.trigger_timestamp
480 << ", SourceID: " << data_request.request_information.component
481 << ", window begin/end " << data_request.request_information.window_begin
482 << "/" << data_request.request_information.window_end
483 << ", dest: " << data_request.data_destination;
484 m_request_handler_impl->issue_request(data_request);
485 ++m_num_requests;
486 ++m_sum_requests;
487}
488
489} // namespace snbmodules
490} // namespace dunedaq
#define ERS_HERE
const dunedaq::appmodel::LatencyBuffer * get_latency_buffer() const
Get "latency_buffer" relationship value.
uint64_t get_post_processing_delay_ticks() const
Get "post_processing_delay_ticks" attribute value. Number of clock tick by which post processing of i...
const dunedaq::appmodel::DataProcessor * get_data_processor() const
Get "data_processor" relationship value.
uint64_t get_post_processing_delay_max_wait() const
Get "post_processing_delay_max_wait" attribute value. Maximum wait time (ms) before post processing c...
uint64_t get_post_processing_delay_min_wait() const
Get "post_processing_delay_min_wait" attribute value. Minimum time (ms) between consecutive post proc...
const dunedaq::appmodel::RequestHandler * get_request_handler() const
Get "request_handler" relationship value.
const dunedaq::appmodel::DataHandlerConf * get_module_configuration() const
Get "module_configuration" relationship value.
uint32_t get_source_id() const
Get "source_id" attribute value.
bool get_post_processing_enabled() const
Get "post_processing_enabled" attribute value.
const std::string & UID() const noexcept
const std::vector< const dunedaq::confmodel::Connection * > & get_inputs() const
Get "inputs" relationship value. List of connections to/from this module.
const std::vector< const dunedaq::confmodel::Connection * > & get_outputs() const
Get "outputs" relationship value. Output connections from this module.
static std::shared_ptr< DataMoveCallbackRegistry > get()
void start(const appfwk::DAQModule::CommandData_t &args)
void init(const appmodel::DataHandlerModule *modconf)
Forward calls from the appfwk.
void conf(const appfwk::DAQModule::CommandData_t &args)
void stop(const appfwk::DAQModule::CommandData_t &args)
void dispatch_requests(dfmessages::DataRequest &data_request)
void run_timesync()
Function that will be run in its own thread and sends periodic timesync messages by pushing them to t...
void run_consume()
Function that will be run in its own thread to read the raw packets from the connection and add them ...
Base class for any user define issue.
Definition Issue.hpp:69
double offset
static int64_t now()
#define TLOG_DEBUG(lvl,...)
Definition Logging.hpp:112
#define TLOG(...)
Definition macro.hpp:22
uint32_t run_number_t
Type used to represent run number.
Definition Types.hpp:20
The DUNE-DAQ namespace.
static std::shared_ptr< iomanager::SenderConcept< Datatype > > get_iom_sender(iomanager::ConnectionId const &id)
static std::shared_ptr< iomanager::ReceiverConcept< Datatype > > get_iom_receiver(iomanager::ConnectionId const &id)
void warning(const Issue &issue)
Definition ers.hpp:115
void error(const Issue &issue)
Definition ers.hpp:81
timestamp_t window_end
End of the data collection window.
SourceID component
The Requested Component.
timestamp_t window_begin
Start of the data collection window.
This message represents a request for data sent to a single component of the DAQ.
sequence_number_t sequence_number
Sequence Number of the request.
trigger_number_t trigger_number
Trigger number the request corresponds to.
timestamp_t trigger_timestamp
Timestamp of trigger.
run_number_t run_number
The current run number.
A synthetic message used to ensure that all elements of a DAQ system are synchronized.
Definition TimeSync.hpp:25