DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
DataHandlingModel.hxx
Go to the documentation of this file.
1// Declarations for DataHandlingModel
2
3#include <typeinfo>
4
5namespace dunedaq {
6namespace datahandlinglibs {
7
8template<class RDT, class RHT, class LBT, class RPT, class IDT>
9void
11{
12 // Setup request queues
13 //setup_request_queues(mcfg);
14 try {
15 for (auto input : mcfg->get_inputs()) {
16 if (input->get_data_type() == "DataRequest") {
17 m_data_request_receiver = get_iom_receiver<dfmessages::DataRequest>(input->UID()) ;
18 }
19 else {
20 m_raw_data_receiver_connection_name = input->UID();
21 // Parse for prefix
22 std::string conn_name = input->UID();
23 const char delim = '_';
24 std::vector<std::string> words;
25 std::size_t start;
26 std::size_t end = 0;
27 while ((start = conn_name.find_first_not_of(delim, end)) != std::string::npos) {
28 end = conn_name.find(delim, start);
29 words.push_back(conn_name.substr(start, end - start));
30 }
31
32 TLOG_DEBUG(TLVL_WORK_STEPS) << "Initialize connection based on uid: "
33 << m_raw_data_receiver_connection_name << " front word: " << words.front();
34
35 std::string cb_prefix("cb");
36 if (words.front() == cb_prefix) {
37 m_callback_mode = true;
38 }
39
40 if (!m_callback_mode) {
41 m_raw_data_receiver = get_iom_receiver<IDT>(m_raw_data_receiver_connection_name);
42 m_raw_receiver_timeout_ms = std::chrono::milliseconds(input->get_recv_timeout_ms());
43 }
44 }
45 }
46 for (auto output : mcfg->get_outputs()) {
47 if (output->get_data_type() == "TimeSync") {
48 m_generate_timesync = true;
49 m_timesync_sender = get_iom_sender<dfmessages::TimeSync>(output->UID()) ;
50 m_timesync_connection_name = output->UID();
51 break;
52 }
53 }
54 } catch (const ers::Issue& excpt) {
55 throw ResourceQueueError(ERS_HERE, "raw_input or frag_output", "DataHandlingModel", excpt);
56 }
57
58 // Raw input connection sensibility check
59 if (!m_callback_mode && m_raw_data_receiver == nullptr) {
60 ers::error(ConfigurationError(ERS_HERE, m_sourceid, "Non callback mode, and receiver is unset!"));
61 }
62
63 // Instantiate functionalities
64 m_error_registry.reset(new FrameErrorRegistry());
65 m_error_registry->set_ers_metadata("DLH of SourceID[" + std::to_string(mcfg->get_source_id()) + "] ");
66 m_latency_buffer_impl.reset(new LBT());
67 m_raw_processor_impl.reset(new RPT(m_error_registry, mcfg->get_post_processing_enabled()));
68 m_request_handler_impl.reset(new RHT(m_latency_buffer_impl, m_error_registry));
69
70 register_node(mcfg->get_module_configuration()->get_latency_buffer()->UID(), m_latency_buffer_impl);
71 register_node(mcfg->get_module_configuration()->get_data_processor()->UID(), m_raw_processor_impl);
72 register_node(mcfg->get_module_configuration()->get_request_handler()->UID(), m_request_handler_impl);
73
74 //m_request_handler_impl->init(args);
75 //m_raw_processor_impl->init(args);
76 m_request_handler_supports_cutoff_timestamp = m_request_handler_impl->supports_cutoff_timestamp();
77 m_fake_trigger = false;
78 m_raw_receiver_sleep_us = std::chrono::microseconds::zero();
79 m_sourceid.id = mcfg->get_source_id();
80 m_sourceid.subsystem = RDT::subsystem;
81 m_processing_delay_ticks = mcfg->get_module_configuration()->get_post_processing_delay_ticks();
82
83
84 // Configure implementations:
85 m_raw_processor_impl->conf(mcfg);
86 // Configure the latency buffer before the request handler so the request handler can check for alignment
87 // restrictions
88 try {
89 m_latency_buffer_impl->conf(mcfg->get_module_configuration()->get_latency_buffer());
90 } catch (const std::bad_alloc& be) {
91 ers::error(ConfigurationError(ERS_HERE, m_sourceid, "Latency Buffer can't be allocated with size!"));
92 }
93 m_request_handler_impl->conf(mcfg);
94}
95
96template<class RDT, class RHT, class LBT, class RPT, class IDT>
97void
99{
100 // Register callbacks if operating in that mode.
101 if (m_callback_mode) {
102 // Configure and register consume callback
103 m_consume_callback = std::bind(&DataHandlingModel<RDT, RHT, LBT, RPT, IDT>::consume_payload, this, std::placeholders::_1);
104
105 // Register callback
106 auto dmcbr = DataMoveCallbackRegistry::get();
107 dmcbr->register_callback<RDT>(m_raw_data_receiver_connection_name, m_consume_callback);
108 }
109
110 // Configure threads:
111 m_consumer_thread.set_name("consumer", m_sourceid.id);
112 if (m_generate_timesync)
113 m_timesync_thread.set_name("timesync", m_sourceid.id);
114}
115
116
117template<class RDT, class RHT, class LBT, class RPT, class IDT>
118void
120{
121 // Reset opmon variables
122 m_sum_payloads = 0;
123 m_num_payloads = 0;
124 m_sum_requests = 0;
125 m_num_requests = 0;
126 m_num_lb_insert_failures = 0;
127 m_stats_packet_count = 0;
128 m_rawq_timeout_count = 0;
129
130 m_t0 = std::chrono::high_resolution_clock::now();
131
132 m_run_number = args.value<dunedaq::daqdataformats::run_number_t>("run", 1);
133
134 TLOG_DEBUG(TLVL_WORK_STEPS) << "Starting threads...";
135 m_raw_processor_impl->start(args);
136 m_request_handler_impl->start(args);
137 if (!m_callback_mode) {
138 m_consumer_thread.set_work(&DataHandlingModel<RDT, RHT, LBT, RPT, IDT>::run_consume, this);
139 }
140 if (m_generate_timesync) m_timesync_thread.set_work(&DataHandlingModel<RDT, RHT, LBT, RPT, IDT>::run_timesync, this);
141 // Register callback to receive and dispatch data requests
142 m_data_request_receiver->add_callback(
143 std::bind(&DataHandlingModel<RDT, RHT, LBT, RPT, IDT>::dispatch_requests, this, std::placeholders::_1));
144}
145
146template<class RDT, class RHT, class LBT, class RPT, class IDT>
147void
149{
150 TLOG_DEBUG(TLVL_WORK_STEPS) << "Stoppping threads...";
151
152 // Stop receiving data requests as first thing
153 m_data_request_receiver->remove_callback();
154 // Stop the other threads
155 m_request_handler_impl->stop(args);
156 if (m_generate_timesync) {
157 while (!m_timesync_thread.get_readiness()) {
158 std::this_thread::sleep_for(std::chrono::milliseconds(10));
159 }
160 }
161 if (!m_callback_mode) {
162 while (!m_consumer_thread.get_readiness()) {
163 std::this_thread::sleep_for(std::chrono::milliseconds(10));
164 }
165 }
166 TLOG_DEBUG(TLVL_WORK_STEPS) << "Flushing latency buffer with occupancy: " << m_latency_buffer_impl->occupancy();
167 m_latency_buffer_impl->flush();
168 m_raw_processor_impl->stop(args);
169 m_raw_processor_impl->reset_last_daq_time();
170}
171
172template<class RDT, class RHT, class LBT, class RPT, class IDT>
173void
175 {
177 ri.set_sum_payloads(m_sum_payloads.load());
178 ri.set_num_payloads(m_num_payloads.exchange(0));
179
180 ri.set_num_data_input_timeouts(m_rawq_timeout_count.exchange(0));
181
182 auto now = std::chrono::high_resolution_clock::now();
183 int new_packets = m_stats_packet_count.exchange(0);
184 double seconds = std::chrono::duration_cast<std::chrono::microseconds>(now - m_t0).count() / 1000000.;
185 m_t0 = now;
186
187 // 08-May-2025, KAB: added a message to warn users when latency buffer inserts are failing.
188 int local_num_lb_insert_failures = m_num_lb_insert_failures.exchange(0);
189 if (local_num_lb_insert_failures != 0) {
190 ers::warning(NonZeroLatencyBufferInsertFailures(ERS_HERE, local_num_lb_insert_failures, ri.num_payloads()));
191 }
192
193 ri.set_rate_payloads_consumed(new_packets / seconds / 1000.);
194 ri.set_num_lb_insert_failures(local_num_lb_insert_failures);
195 ri.set_sum_requests(m_sum_requests.load());
196 ri.set_num_requests(m_num_requests.exchange(0));
197 ri.set_last_daq_timestamp(m_raw_processor_impl->get_last_daq_time());
198
199 this->publish(std::move(ri));
200 }
201
202template<class RDT, class RHT, class LBT, class RPT, class IDT>
203void
205{
206 m_raw_processor_impl->preprocess_item(&payload);
207 if (m_request_handler_supports_cutoff_timestamp) {
208 int64_t diff1 = payload.get_timestamp() - m_request_handler_impl->get_cutoff_timestamp();
209 if (diff1 <= 0) {
210 //m_request_handler_impl->increment_tardy_tp_count();
211 ers::warning(DataPacketArrivedTooLate(ERS_HERE, m_run_number, payload.get_timestamp(),
212 m_request_handler_impl->get_cutoff_timestamp(), diff1,
213 (static_cast<double>(diff1)/62500.0)));
214 }
215 }
216 if (!m_latency_buffer_impl->write(std::move(payload))) {
217 //TLOG_DEBUG(TLVL_TAKE_NOTE) << "***ERROR: Latency buffer is full and data was overwritten!";
218 m_num_lb_insert_failures++;
219 }
220 if (m_processing_delay_ticks ==0) {
221 m_raw_processor_impl->postprocess_item(m_latency_buffer_impl->back());
222 ++m_num_payloads;
223 ++m_sum_payloads;
224 ++m_stats_packet_count;
225 }
226}
227
228template<class RDT, class RHT, class LBT, class RPT, class IDT>
229void
231{
232
233 TLOG_DEBUG(TLVL_WORK_STEPS) << "Consumer thread started...";
234 m_rawq_timeout_count = 0;
235 m_num_payloads = 0;
236 m_sum_payloads = 0;
237 m_stats_packet_count = 0;
238
239 // Variables for book-keeping of delayed post-processing
240 //timestamp_t oldest_ts=0;
241 timestamp_t newest_ts=0;
242 timestamp_t end_win_ts=0;
243 bool first_cycle = true;
244 auto last_post_proc_time = std::chrono::system_clock::now();
245 auto now = last_post_proc_time;
246 std::chrono::milliseconds milliseconds;
247 RDT processed_element;
248
249 while (m_run_marker.load()) {
250 // Try to acquire data
251
252 auto opt_payload = m_raw_data_receiver->try_receive(m_raw_receiver_timeout_ms);
253
254 if (opt_payload) {
255
256 IDT& original = opt_payload.value();
257
258 if constexpr (std::is_same_v<IDT, RDT>) {
259 process_item(original);
260 } else {
261 auto transformed = transform_payload(original);
262 for (auto& i : transformed) {
263 process_item(i);
264 }
265 }
266 } else {
267 ++m_rawq_timeout_count;
268 // Protection against a zero sleep becoming a yield
269 if ( m_raw_receiver_sleep_us != std::chrono::microseconds::zero())
270 std::this_thread::sleep_for(m_raw_receiver_sleep_us);
271 }
272
273 // Add here a possible deferral of the post processing, to allow elements being reordered in the LB
274 // Basically, find data older than a certain timestamp and process all data since the last post-processed element up to that value
275 if (m_processing_delay_ticks !=0 && m_latency_buffer_impl->occupancy() > 0) {
276 now = std::chrono::system_clock::now();
277 milliseconds = std::chrono::duration_cast<std::chrono::milliseconds>(now - last_post_proc_time);
278
279 if (milliseconds.count() > 1) {
280 last_post_proc_time = now;
281 // Get the LB boundaries
282 auto tail = m_latency_buffer_impl->back();
283 newest_ts = tail->get_timestamp();
284
285 if (first_cycle) {
286 auto head = m_latency_buffer_impl->front();
287 processed_element.set_timestamp(head->get_timestamp());
288 first_cycle = false;
289 TLOG() << "***** First pass post processing *****" ;
290 }
291
292 if (newest_ts - processed_element.get_timestamp() > m_processing_delay_ticks) {
293 end_win_ts = newest_ts - m_processing_delay_ticks;
294 auto start_iter=m_latency_buffer_impl->lower_bound(processed_element, false);
295 processed_element.set_timestamp(end_win_ts);
296 auto end_iter=m_latency_buffer_impl->lower_bound(processed_element, false);
297
298 for (auto it = start_iter; it!= end_iter; ++it) {
299 m_raw_processor_impl->postprocess_item(&(*it));
300 ++m_num_payloads;
301 ++m_sum_payloads;
302 ++m_stats_packet_count;
303 }
304 }
305 }
306 }
307 }
308 TLOG_DEBUG(TLVL_WORK_STEPS) << "Consumer thread joins... ";
309}
310
311template<class RDT, class RHT, class LBT, class RPT, class IDT>
312void
314{
315 //m_rawq_timeout_count = 0;
316 //m_num_payloads = 0;
317 //m_sum_payloads = 0;
318 //m_stats_packet_count = 0;
319 m_raw_processor_impl->preprocess_item(&payload);
320 if (m_request_handler_supports_cutoff_timestamp) {
321 int64_t diff1 = payload.get_timestamp() - m_request_handler_impl->get_cutoff_timestamp();
322 if (diff1 <= 0) {
323 //m_request_handler_impl->increment_tardy_tp_count();
324 ers::warning(DataPacketArrivedTooLate(ERS_HERE, m_run_number, payload.get_timestamp(),
325 m_request_handler_impl->get_cutoff_timestamp(), diff1,
326 (static_cast<double>(diff1)/62500.0)));
327 }
328 }
329 if (!m_latency_buffer_impl->write(std::move(payload))) {
330 TLOG_DEBUG(TLVL_TAKE_NOTE) << "***ERROR: Latency buffer is full and data was overwritten!";
331 m_num_lb_insert_failures++;
332 }
333#warning RS FIXME: Post-processing delay feature is not implemented in callback consume!
334 m_raw_processor_impl->postprocess_item(m_latency_buffer_impl->back());
335 ++m_num_payloads;
336 ++m_sum_payloads;
337 ++m_stats_packet_count;
338}
339
340template<class RDT, class RHT, class LBT, class RPT, class IDT>
341void
343{
344 TLOG_DEBUG(TLVL_WORK_STEPS) << "TimeSync thread started...";
345 m_num_requests = 0;
346 m_sum_requests = 0;
347 uint64_t msg_seqno = 0;
348 timestamp_t prev_timestamp = 0;
349 auto once_per_run = true;
350 size_t zero_timestamp_count = 0;
351 size_t duplicate_timestamp_count = 0;
352 size_t total_timestamp_count = 0;
353 while (m_run_marker.load()) {
354 try {
355 auto timesyncmsg = dfmessages::TimeSync(m_raw_processor_impl->get_last_daq_time());
356 ++total_timestamp_count;
357 // daq_time is zero for the first received timesync, and may
358 // be the same as the previous daq_time if the data has
359 // stopped flowing. In both cases we don't send the TimeSync
360 if (timesyncmsg.daq_time != 0 && timesyncmsg.daq_time != prev_timestamp) {
361 prev_timestamp = timesyncmsg.daq_time;
362 timesyncmsg.run_number = m_run_number;
363 timesyncmsg.sequence_number = ++msg_seqno;
364 timesyncmsg.source_pid = m_pid_of_current_process;
365 TLOG_DEBUG(TLVL_TIME_SYNCS) << "New timesync: daq=" << timesyncmsg.daq_time
366 << " wall=" << timesyncmsg.system_time << " run=" << timesyncmsg.run_number
367 << " seqno=" << timesyncmsg.sequence_number << " pid=" << timesyncmsg.source_pid;
368 try {
369 dfmessages::TimeSync timesyncmsg_copy(timesyncmsg);
370 m_timesync_sender->send(std::move(timesyncmsg_copy), std::chrono::milliseconds(500));
371 } catch (ers::Issue& excpt) {
373 TimeSyncTransmissionFailed(ERS_HERE, m_sourceid, m_timesync_connection_name, excpt));
374 }
375
376 if (m_fake_trigger) {
378 ++m_current_fake_trigger_id;
379 dr.trigger_number = m_current_fake_trigger_id;
380 dr.trigger_timestamp = timesyncmsg.daq_time > 500 * us ? timesyncmsg.daq_time - 500 * us : 0;
381 auto width = 300000;
382 uint offset = 100;
385 dr.request_information.component = m_sourceid;
386 dr.data_destination = "data_fragments_q";
387 TLOG_DEBUG(TLVL_WORK_STEPS) << "Issuing fake trigger based on timesync. "
388 << " ts=" << dr.trigger_timestamp
389 << " window_begin=" << dr.request_information.window_begin
390 << " window_end=" << dr.request_information.window_end;
391 m_request_handler_impl->issue_request(dr);
392
393 ++m_num_requests;
394 ++m_sum_requests;
395 }
396 } else {
397 if (timesyncmsg.daq_time == 0) {++zero_timestamp_count;}
398 if (timesyncmsg.daq_time == prev_timestamp) {++duplicate_timestamp_count;}
399 if (once_per_run) {
400 TLOG() << "Timesync with DAQ time 0 won't be sent out as it's an invalid sync.";
401 once_per_run = false;
402 }
403 }
404 } catch (const iomanager::TimeoutExpired& excpt) {
405 // ++m_timesyncqueue_timeout;
406 }
407 // Split up the 100ms sleep into 10 sleeps of 10ms, so we respond to "stop" quicker
408 for (size_t i=0; i<10; ++i) {
409 std::this_thread::sleep_for(std::chrono::milliseconds(10));
410 if (!m_run_marker.load()) {
411 break;
412 }
413 }
414 }
415 once_per_run = true;
416 TLOG_DEBUG(TLVL_WORK_STEPS) << "TimeSync thread joins... (timestamp count, zero/same/total = "
417 << zero_timestamp_count << "/" << duplicate_timestamp_count << "/"
418 << total_timestamp_count << ")";
419}
420
421template<class RDT, class RHT, class LBT, class RPT, class IDT>
422void
424{
425 if (data_request.request_information.component != m_sourceid) {
426 ers::error(RequestSourceIDMismatch(ERS_HERE, m_sourceid, data_request.request_information.component));
427 return;
428 }
429 TLOG_DEBUG(TLVL_QUEUE_POP) << "Received DataRequest"
430 << " for trig/seq_number " << data_request.trigger_number << "." << data_request.sequence_number
431 << ", runno " << data_request.run_number
432 << ", trig timestamp " << data_request.trigger_timestamp
433 << ", SourceID: " << data_request.request_information.component
434 << ", window begin/end " << data_request.request_information.window_begin
435 << "/" << data_request.request_information.window_end
436 << ", dest: " << data_request.data_destination;
437 m_request_handler_impl->issue_request(data_request);
438 ++m_num_requests;
439 ++m_sum_requests;
440}
441
442} // namespace datahandlinglibs
443} // namespace dunedaq
#define ERS_HERE
const dunedaq::appmodel::LatencyBuffer * get_latency_buffer() const
Get "latency_buffer" relationship value.
uint64_t get_post_processing_delay_ticks() const
Get "post_processing_delay_ticks" attribute value. Number of clock tick by which post processing of i...
const dunedaq::appmodel::DataProcessor * get_data_processor() const
Get "data_processor" relationship value.
const dunedaq::appmodel::RequestHandler * get_request_handler() const
Get "request_handler" relationship value.
const dunedaq::appmodel::DataHandlerConf * get_module_configuration() const
Get "module_configuration" relationship value.
uint32_t get_source_id() const
Get "source_id" attribute value.
bool get_post_processing_enabled() const
Get "post_processing_enabled" attribute value.
const std::string & UID() const noexcept
const std::vector< const dunedaq::confmodel::Connection * > & get_inputs() const
Get "inputs" relationship value. List of connections to/from this module.
const std::vector< const dunedaq::confmodel::Connection * > & get_outputs() const
Get "outputs" relationship value. Output connections from this module.
void init(const appmodel::DataHandlerModule *modconf)
Forward calls from the appfwk.
void dispatch_requests(dfmessages::DataRequest &data_request)
void run_consume()
Function that will be run in its own thread to read the raw packets from the connection and add them ...
void run_timesync()
Function that will be run in its own thread and sends periodic timesync messages by pushing them to t...
static std::shared_ptr< DataMoveCallbackRegistry > get()
Base class for any user define issue.
Definition Issue.hpp:69
double offset
static int64_t now()
#define TLOG_DEBUG(lvl,...)
Definition Logging.hpp:112
#define TLOG(...)
Definition macro.hpp:22
uint32_t run_number_t
Type used to represent run number.
Definition Types.hpp:20
Including Qt Headers.
static std::shared_ptr< iomanager::SenderConcept< Datatype > > get_iom_sender(iomanager::ConnectionId const &id)
ConfigurationError
Definition util.hpp:27
SourceID[" << sourceid << "] Command daqdataformats::SourceID Readout Initialization std::string initerror Configuration std::string conferror Configuration std::string conferror TimeSyncTransmissionFailed
static std::shared_ptr< iomanager::ReceiverConcept< Datatype > > get_iom_receiver(iomanager::ConnectionId const &id)
void warning(const Issue &issue)
Definition ers.hpp:115
void error(const Issue &issue)
Definition ers.hpp:81
timestamp_t window_end
End of the data collection window.
SourceID component
The Requested Component.
timestamp_t window_begin
Start of the data collection window.
This message represents a request for data sent to a single component of the DAQ.
sequence_number_t sequence_number
Sequence Number of the request.
trigger_number_t trigger_number
Trigger number the request corresponds to.
timestamp_t trigger_timestamp
Timestamp of trigger.
run_number_t run_number
The current run number.
A synthetic message used to ensure that all elements of a DAQ system are synchronized.
Definition TimeSync.hpp:25