DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
SNBDataHandlingModel.hxx
Go to the documentation of this file.
1// Declarations for SNBDataHandlingModel
2
3#include <folly/coro/BlockingWait.h>
4#include <folly/coro/Timeout.h>
5
6#include <typeinfo>
7
8namespace dunedaq {
9namespace snbmodules {
10
11template<class RDT, class RHT, class LBT, class RPT, class IDT>
12void
14{
15 // Setup request queues
16 // setup_request_queues(mcfg);
17 try {
18 for (auto input : mcfg->get_inputs()) {
19 if (input->get_data_type() == "DataRequest") {
20 m_data_request_receiver = get_iom_receiver<dfmessages::DataRequest>(input->UID());
21 }
22 }
23 for (auto output : mcfg->get_outputs()) {
24 if (output->get_data_type() == "TimeSync") {
25 m_generate_timesync = true;
26 m_timesync_sender = get_iom_sender<dfmessages::TimeSync>(output->UID());
27 m_timesync_connection_name = output->UID();
28 break;
29 }
30 }
31 } catch (const ers::Issue& excpt) {
32 throw datahandlinglibs::ResourceQueueError(ERS_HERE, "raw_input or frag_output", "SNBDataHandlingModel", excpt);
33 }
34
35 m_raw_data_callback_conf = mcfg->get_raw_data_callback();
36
37 // Instantiate functionalities
38 m_error_registry.reset(new datahandlinglibs::FrameErrorRegistry());
39 m_error_registry->set_ers_metadata("DLH of SourceID[" + std::to_string(mcfg->get_source_id()) + "] ");
40 m_latency_buffer_impl.reset(new LBT());
41 m_raw_processor_impl.reset(new RPT(m_error_registry, mcfg->get_post_processing_enabled()));
42 m_request_handler_impl.reset(new RHT(m_latency_buffer_impl, m_error_registry));
43
44 register_node(mcfg->get_module_configuration()->get_latency_buffer()->UID(), m_latency_buffer_impl);
45 register_node(mcfg->get_module_configuration()->get_data_processor()->UID(), m_raw_processor_impl);
46 register_node(mcfg->get_module_configuration()->get_request_handler()->UID(), m_request_handler_impl);
47
48 // m_request_handler_impl->init(args);
49 // m_raw_processor_impl->init(args);
50 m_request_handler_supports_cutoff_timestamp = m_request_handler_impl->supports_cutoff_timestamp();
51 m_fake_trigger = false;
52 m_sourceid.id = mcfg->get_source_id();
53 m_sourceid.subsystem = RDT::subsystem;
54 m_processing_delay_ticks = mcfg->get_module_configuration()->get_post_processing_delay_ticks();
55 m_post_processing_delay_min_wait = mcfg->get_module_configuration()->get_post_processing_delay_min_wait();
56 m_post_processing_delay_max_wait = mcfg->get_module_configuration()->get_post_processing_delay_max_wait();
57
58 // Configure implementations:
59 m_raw_processor_impl->conf(mcfg);
60 // Configure the latency buffer before the request handler so the request handler can check for alignment
61 // restrictions
62 try {
63 m_latency_buffer_impl->conf(mcfg->get_module_configuration()->get_latency_buffer());
64 } catch (const std::bad_alloc& be) {
66 datahandlinglibs::ConfigurationError(ERS_HERE, m_sourceid, "Latency Buffer can't be allocated with size!"));
67 }
68 m_request_handler_impl->conf(mcfg);
69}
70
71template<class RDT, class RHT, class LBT, class RPT, class IDT>
72void
73SNBDataHandlingModel<RDT, RHT, LBT, RPT, IDT>::conf(const appfwk::DAQModule::CommandData_t& /*args*/)
74{
75 // Configure and register consume callback
76 m_consume_callback =
77 std::bind(&SNBDataHandlingModel<RDT, RHT, LBT, RPT, IDT>::consume_callback, this, std::placeholders::_1);
78
79 // Register callback
81 dmcbr->register_callback<IDT>(m_raw_data_callback_conf, m_consume_callback);
82
83 // Configure threads:
84 if (m_generate_timesync) {
85 m_timesync_thread.set_name("timesync", m_sourceid.id);
86 }
87 if (m_processing_delay_ticks) {
88 m_postprocess_scheduler_thread.set_name("pprocsched", m_sourceid.id);
89 m_timekeeper = std::make_unique<folly::ThreadWheelTimekeeper>();
90 }
91}
92
93template<class RDT, class RHT, class LBT, class RPT, class IDT>
94void
95SNBDataHandlingModel<RDT, RHT, LBT, RPT, IDT>::start(const appfwk::DAQModule::CommandData_t& args)
96{
97 // Reset opmon variables
98 m_sum_payloads = 0;
99 m_num_payloads = 0;
100 m_sum_requests = 0;
101 m_num_requests = 0;
102 m_num_lb_insert_failures = 0;
103 m_stats_packet_count = 0;
104 m_rawq_timeout_count = 0;
105 m_num_post_processing_delay_max_waits = 0;
106
107 m_t0 = std::chrono::high_resolution_clock::now();
108
109 m_run_number = args.value<dunedaq::daqdataformats::run_number_t>("run", 1);
110
111 TLOG_DEBUG(TLVL_WORK_STEPS) << "Starting threads...";
112 m_raw_processor_impl->start(args);
113 m_request_handler_impl->start(args);
114 if (m_generate_timesync) {
115 m_timesync_thread.set_work(&SNBDataHandlingModel<RDT, RHT, LBT, RPT, IDT>::run_timesync, this);
116 }
117 if (m_processing_delay_ticks) {
119 this);
120 }
121 // Register callback to receive and dispatch data requests
122 m_data_request_receiver->add_callback(
123 std::bind(&SNBDataHandlingModel<RDT, RHT, LBT, RPT, IDT>::dispatch_requests, this, std::placeholders::_1));
124}
125
126template<class RDT, class RHT, class LBT, class RPT, class IDT>
127void
128SNBDataHandlingModel<RDT, RHT, LBT, RPT, IDT>::stop(const appfwk::DAQModule::CommandData_t& args)
129{
130 TLOG_DEBUG(TLVL_WORK_STEPS) << "Stoppping threads...";
131
132 // Stop receiving data requests as first thing
133 m_data_request_receiver->remove_callback();
134 // Stop the other threads
135 m_request_handler_impl->stop(args);
136 if (m_generate_timesync) {
137 while (!m_timesync_thread.get_readiness()) {
138 std::this_thread::sleep_for(std::chrono::milliseconds(10));
139 }
140 }
141 if (m_processing_delay_ticks) {
142 m_baton.post(); // In case the coroutine is still waiting when the consumer has stopped
143 while (!m_postprocess_scheduler_thread.get_readiness()) {
144 std::this_thread::sleep_for(std::chrono::milliseconds(10));
145 }
146 }
147 TLOG_DEBUG(TLVL_WORK_STEPS) << "Flushing latency buffer with occupancy: " << m_latency_buffer_impl->occupancy();
148 m_latency_buffer_impl->flush();
149 m_raw_processor_impl->stop(args);
150 m_raw_processor_impl->reset_last_daq_time();
151}
152
153template<class RDT, class RHT, class LBT, class RPT, class IDT>
154void
156{
158 ri.set_sum_payloads(m_sum_payloads.load());
159 ri.set_num_payloads(m_num_payloads.exchange(0));
160
161 ri.set_num_data_input_timeouts(m_rawq_timeout_count.exchange(0));
162
163 auto now = std::chrono::high_resolution_clock::now();
164 int new_packets = m_stats_packet_count.exchange(0);
165 double seconds = std::chrono::duration_cast<std::chrono::microseconds>(now - m_t0).count() / 1000000.;
166 m_t0 = now;
167
168 // 08-May-2025, KAB: added a message to warn users when latency buffer inserts are failing.
169 int local_num_lb_insert_failures = m_num_lb_insert_failures.exchange(0);
170 if (local_num_lb_insert_failures != 0) {
171 ers::warning(datahandlinglibs::NonZeroLatencyBufferInsertFailures(
172 ERS_HERE, m_sourceid, local_num_lb_insert_failures, ri.num_payloads()));
173 }
174
175 ri.set_rate_payloads_consumed(new_packets / seconds / 1000.);
176 ri.set_num_lb_insert_failures(local_num_lb_insert_failures);
177 ri.set_sum_requests(m_sum_requests.load());
178 ri.set_num_requests(m_num_requests.exchange(0));
179 ri.set_num_post_processing_delay_max_waits(m_num_post_processing_delay_max_waits.exchange(0));
180 ri.set_last_daq_timestamp(m_raw_processor_impl->get_last_daq_time());
181 ri.set_newest_timestamp(m_raw_processor_impl->get_last_daq_time());
182 ri.set_oldest_timestamp(m_request_handler_impl->get_oldest_time());
183
184 this->publish(std::move(ri));
185}
186
187template<class RDT, class RHT, class LBT, class RPT, class IDT>
188void
190{
191 if constexpr (std::is_same_v<IDT, RDT>) {
192 process_item(std::move(payload));
193 } else {
194 auto transformed = transform_payload(payload);
195 for (auto& i : transformed) {
196 process_item(std::move(i));
197 }
198 }
199}
200
201template<class RDT, class RHT, class LBT, class RPT, class IDT>
202void
204{
205 transform_and_process(std::move(payload));
206}
207
208template<class RDT, class RHT, class LBT, class RPT, class IDT>
209void
211{
212 m_raw_processor_impl->preprocess_item(&payload);
213 if (m_request_handler_supports_cutoff_timestamp) {
214 int64_t diff1 = payload.get_timestamp() - m_request_handler_impl->get_cutoff_timestamp();
215 if (diff1 <= 0) {
216 // m_request_handler_impl->increment_tardy_tp_count();
217 ers::warning(datahandlinglibs::DataPacketArrivedTooLate(ERS_HERE,
218 m_sourceid,
219 m_run_number,
220 payload.get_timestamp(),
221 m_request_handler_impl->get_cutoff_timestamp(),
222 diff1,
223 (static_cast<double>(diff1) / 62500.0)));
224 }
225 }
226 while (m_latency_buffer_impl->isFull()) {
227 std::this_thread::sleep_for(std::chrono::milliseconds(1));
228 }
229 if (!m_latency_buffer_impl->write(std::move(payload))) {
230 // TLOG_DEBUG(TLVL_TAKE_NOTE) << "***ERROR: Latency buffer insert failed! (Payload timestamp=" <<
231 // payload.get_timestamp() << ")";
232 m_num_lb_insert_failures++;
233 return;
234 }
235
236 if (m_processing_delay_ticks == 0) {
237 m_raw_processor_impl->postprocess_item(m_latency_buffer_impl->back());
238 ++m_num_payloads;
239 ++m_sum_payloads;
240 ++m_stats_packet_count;
241 } else {
242 m_baton.post();
243 }
244}
245
246template<class RDT, class RHT, class LBT, class RPT, class IDT>
247void
249{
250 folly::coro::blockingWait(postprocess_schedule());
251}
252
253template<class RDT, class RHT, class LBT, class RPT, class IDT>
254folly::coro::Task<void>
256{
257
258 // TLOG_DEBUG(TLVL_WORK_STEPS) << "Postprocess schedule coroutine started...";
259 timestamp_t newest_ts = 0;
260 timestamp_t end_win_ts = 0;
261 bool first_cycle = true;
262 auto last_post_proc_time = std::chrono::system_clock::now();
263 auto now = last_post_proc_time;
264 std::chrono::milliseconds milliseconds;
265 RDT processed_element;
266
267 // Deferral of the post processing, to allow elements being reordered in the LB
268 // Basically, find data older than a certain timestamp and process all data since the last post-processed element up
269 // to that value
270 while (m_run_marker.load()) {
271 try {
272 co_await folly::coro::timeout(
273 m_baton.operator co_await(), std::chrono::milliseconds{ m_post_processing_delay_max_wait }, m_timekeeper.get());
274 m_baton.reset();
275 } catch (const folly::FutureTimeout&) {
276 ++m_num_post_processing_delay_max_waits;
277 }
278
279 if (m_latency_buffer_impl->occupancy() == 0) {
280 continue;
281 }
282
283 now = std::chrono::system_clock::now();
284 milliseconds = std::chrono::duration_cast<std::chrono::milliseconds>(now - last_post_proc_time);
285
286 if (static_cast<uint64_t>(milliseconds.count()) <= m_post_processing_delay_min_wait) {
287 continue;
288 }
289
290 last_post_proc_time = now;
291
292 // Get the LB boundaries
293 auto tail = m_latency_buffer_impl->back();
294 newest_ts = tail->get_timestamp();
295
296 if (first_cycle) {
297 auto head = m_latency_buffer_impl->front();
298 processed_element.set_timestamp(head->get_timestamp());
299 first_cycle = false;
300 // TLOG() << "***** First pass post processing *****";
301 }
302
303 if (newest_ts - processed_element.get_timestamp() > m_processing_delay_ticks) {
304 end_win_ts = newest_ts - m_processing_delay_ticks;
305 auto start_iter = m_latency_buffer_impl->lower_bound(processed_element, false);
306 processed_element.set_timestamp(end_win_ts);
307 auto end_iter = m_latency_buffer_impl->lower_bound(processed_element, false);
308
309 for (auto it = start_iter; it != end_iter; ++it) {
310 m_raw_processor_impl->postprocess_item(&(*it));
311 ++m_num_payloads;
312 ++m_sum_payloads;
313 ++m_stats_packet_count;
314 }
315 }
316 }
317}
318
319template<class RDT, class RHT, class LBT, class RPT, class IDT>
320void
322{
323 TLOG_DEBUG(TLVL_WORK_STEPS) << "TimeSync thread started...";
324 m_num_requests = 0;
325 m_sum_requests = 0;
326 uint64_t msg_seqno = 0;
327 timestamp_t prev_timestamp = 0;
328 auto once_per_run = true;
329 size_t zero_timestamp_count = 0;
330 size_t duplicate_timestamp_count = 0;
331 size_t total_timestamp_count = 0;
332 while (m_run_marker.load()) {
333 try {
334 auto timesyncmsg = dfmessages::TimeSync(m_raw_processor_impl->get_last_daq_time());
335 ++total_timestamp_count;
336 // daq_time is zero for the first received timesync, and may
337 // be the same as the previous daq_time if the data has
338 // stopped flowing. In both cases we don't send the TimeSync
339 if (timesyncmsg.daq_time != 0 && timesyncmsg.daq_time != prev_timestamp) {
340 prev_timestamp = timesyncmsg.daq_time;
341 timesyncmsg.run_number = m_run_number;
342 timesyncmsg.sequence_number = ++msg_seqno;
343 timesyncmsg.source_id = m_sourceid.id;
344 TLOG_DEBUG(TLVL_TIME_SYNCS) << "New timesync: daq=" << timesyncmsg.daq_time
345 << " wall=" << timesyncmsg.system_time << " run=" << timesyncmsg.run_number
346 << " seqno=" << timesyncmsg.sequence_number
347 << " source_id=" << timesyncmsg.source_id;
348 try {
349 dfmessages::TimeSync timesyncmsg_copy(timesyncmsg);
350 m_timesync_sender->send(std::move(timesyncmsg_copy), std::chrono::milliseconds(500));
351 } catch (ers::Issue& excpt) {
353 datahandlinglibs::TimeSyncTransmissionFailed(ERS_HERE, m_sourceid, m_timesync_connection_name, excpt));
354 }
355
356 if (m_fake_trigger) {
358 ++m_current_fake_trigger_id;
359 dr.trigger_number = m_current_fake_trigger_id;
360 dr.trigger_timestamp = timesyncmsg.daq_time > 500 * us ? timesyncmsg.daq_time - 500 * us : 0;
361 auto width = 300000;
362 uint offset = 100;
365 dr.request_information.component = m_sourceid;
366 dr.data_destination = "data_fragments_q";
367 TLOG_DEBUG(TLVL_WORK_STEPS) << "Issuing fake trigger based on timesync. " << " ts=" << dr.trigger_timestamp
368 << " window_begin=" << dr.request_information.window_begin
369 << " window_end=" << dr.request_information.window_end;
370 m_request_handler_impl->issue_request(dr);
371
372 ++m_num_requests;
373 ++m_sum_requests;
374 }
375 } else {
376 if (timesyncmsg.daq_time == 0) {
377 ++zero_timestamp_count;
378 }
379 if (timesyncmsg.daq_time == prev_timestamp) {
380 ++duplicate_timestamp_count;
381 }
382 if (once_per_run) {
383 TLOG() << "Timesync with DAQ time 0 won't be sent out as it's an invalid sync.";
384 once_per_run = false;
385 }
386 }
387 } catch (const iomanager::TimeoutExpired& excpt) {
388 // ++m_timesyncqueue_timeout;
389 }
390 // Split up the 100ms sleep into 10 sleeps of 10ms, so we respond to "stop" quicker
391 for (size_t i = 0; i < 10; ++i) {
392 std::this_thread::sleep_for(std::chrono::milliseconds(10));
393 if (!m_run_marker.load()) {
394 break;
395 }
396 }
397 }
398 once_per_run = true;
399 TLOG_DEBUG(TLVL_WORK_STEPS) << "TimeSync thread joins... (timestamp count, zero/same/total = "
400 << zero_timestamp_count << "/" << duplicate_timestamp_count << "/"
401 << total_timestamp_count << ")";
402}
403
404template<class RDT, class RHT, class LBT, class RPT, class IDT>
405void
407{
408 if (data_request.request_information.component != m_sourceid) {
410 datahandlinglibs::RequestSourceIDMismatch(ERS_HERE, m_sourceid, data_request.request_information.component));
411 return;
412 }
413 TLOG_DEBUG(TLVL_QUEUE_POP) << "Received DataRequest" << " for trig/seq_number " << data_request.trigger_number << "."
414 << data_request.sequence_number << ", runno " << data_request.run_number
415 << ", trig timestamp " << data_request.trigger_timestamp
416 << ", SourceID: " << data_request.request_information.component << ", window begin/end "
417 << data_request.request_information.window_begin << "/"
418 << data_request.request_information.window_end
419 << ", dest: " << data_request.data_destination;
420 m_request_handler_impl->issue_request(data_request);
421 ++m_num_requests;
422 ++m_sum_requests;
423}
424
425} // namespace snbmodules
426} // namespace dunedaq
#define ERS_HERE
const dunedaq::appmodel::LatencyBuffer * get_latency_buffer() const
Get "latency_buffer" relationship value.
uint64_t get_post_processing_delay_ticks() const
Get "post_processing_delay_ticks" attribute value. Number of clock tick by which post processing of i...
const dunedaq::appmodel::DataProcessor * get_data_processor() const
Get "data_processor" relationship value.
uint64_t get_post_processing_delay_max_wait() const
Get "post_processing_delay_max_wait" attribute value. Maximum wait time (ms) before post processing c...
uint64_t get_post_processing_delay_min_wait() const
Get "post_processing_delay_min_wait" attribute value. Minimum time (ms) between consecutive post proc...
const dunedaq::appmodel::RequestHandler * get_request_handler() const
Get "request_handler" relationship value.
const dunedaq::appmodel::DataHandlerConf * get_module_configuration() const
Get "module_configuration" relationship value.
const dunedaq::appmodel::DataMoveCallbackConf * get_raw_data_callback() const
Get "raw_data_callback" relationship value. Configuration for raw data callback.
uint32_t get_source_id() const
Get "source_id" attribute value.
bool get_post_processing_enabled() const
Get "post_processing_enabled" attribute value.
const std::string & UID() const noexcept
const std::vector< const dunedaq::confmodel::Connection * > & get_inputs() const
Get "inputs" relationship value. List of connections to/from this module.
const std::vector< const dunedaq::confmodel::Connection * > & get_outputs() const
Get "outputs" relationship value. Output connections from this module.
static std::shared_ptr< DataMoveCallbackRegistry > get()
void start(const appfwk::DAQModule::CommandData_t &args)
void init(const appmodel::DataHandlerModule *modconf)
Forward calls from the appfwk.
void conf(const appfwk::DAQModule::CommandData_t &args)
void stop(const appfwk::DAQModule::CommandData_t &args)
void dispatch_requests(dfmessages::DataRequest &data_request)
void run_timesync()
Function that will be run in its own thread and sends periodic timesync messages by pushing them to t...
Base class for any user define issue.
Definition Issue.hpp:69
double offset
static int64_t now()
#define TLOG_DEBUG(lvl,...)
Definition Logging.hpp:112
#define TLOG(...)
Definition macro.hpp:22
uint32_t run_number_t
Type used to represent run number.
Definition Types.hpp:20
Including Qt Headers.
static std::shared_ptr< iomanager::SenderConcept< Datatype > > get_iom_sender(iomanager::ConnectionId const &id)
static std::shared_ptr< iomanager::ReceiverConcept< Datatype > > get_iom_receiver(iomanager::ConnectionId const &id)
void warning(const Issue &issue)
Definition ers.hpp:115
void error(const Issue &issue)
Definition ers.hpp:81
timestamp_t window_end
End of the data collection window.
SourceID component
The Requested Component.
timestamp_t window_begin
Start of the data collection window.
This message represents a request for data sent to a single component of the DAQ.
sequence_number_t sequence_number
Sequence Number of the request.
trigger_number_t trigger_number
Trigger number the request corresponds to.
timestamp_t trigger_timestamp
Timestamp of trigger.
run_number_t run_number
The current run number.
A synthetic message used to ensure that all elements of a DAQ system are synchronized.
Definition TimeSync.hpp:25