DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
TPCEthFrameProcessor.hxx
Go to the documentation of this file.
2DUNE_DAQ_TYPESTRING(std::vector<dunedaq::trigger::TriggerPrimitiveTypeAdapter>, "TriggerPrimitiveVector")
3
4namespace dunedaq {
5namespace fdreadoutlibs {
6
7using datahandlinglibs::logging::TLVL_BOOKKEEPING;
8using datahandlinglibs::logging::TLVL_TAKE_NOTE;
9
10template <class ReadoutTypeAdapter>
11TPCEthFrameProcessor<ReadoutTypeAdapter>::TPCEthFrameProcessor(std::unique_ptr<datahandlinglibs::FrameErrorRegistry>& error_registry, bool processing_enabled)
12 : datahandlinglibs::TaskRawDataProcessorModel<ReadoutTypeAdapter>(error_registry, processing_enabled)
13{
14}
15
16template <class ReadoutTypeAdapter>
17void
18TPCEthFrameProcessor<ReadoutTypeAdapter>::start(const appfwk::DAQModule::CommandData_t& args)
19{
20 // Reset software TPG resources
21 if (this->m_post_processing_enabled) {
22 m_tps_suppressed_too_long = 0;
23 m_tps_send_failed = 0;
24 }
25
26 // Reset timestamp check
27 m_previous_ts = 0;
28 m_current_ts = 0;
29 m_first_ts_missmatch = true;
30 m_ts_problem_reported = false;
31 m_ts_error_state = false;
32 m_ts_error_ctr = 0;
33
34 m_first_seq_id_mismatch = true;
35 m_seq_id_problem_reported = false;
36 m_seq_id_error_state = false;
37 m_seq_id_error_ctr = 0;
38
39
40 // Reset stats
41 m_t0 = std::chrono::high_resolution_clock::now();
42 m_num_new_tps.exchange(0);
43 inherited::start(args);
44}
45
46template <class ReadoutTypeAdapter>
47void
48TPCEthFrameProcessor<ReadoutTypeAdapter>::stop(const appfwk::DAQModule::CommandData_t& args)
49{
50 inherited::stop(args);
51 if (this->m_post_processing_enabled) {
52 if (m_tpg_metric_collect_enabled) {
53 m_tp_generator->free_metric_collector();
54 }
55 // Clears the pipelines and resets with the given configs.
56 m_tp_generator->set_metric_collector_enable_state(m_tpg_metric_collect_enabled);
57 m_tp_generator->configure(m_tpg_configs, m_channel_plane_numbers, ReadoutTypeAdapter::samples_tick_difference);
58 }
59}
60
61template <class ReadoutTypeAdapter>
62void
64{
65 m_sourceid.id = conf->get_source_id();
66 m_sourceid.subsystem = ReadoutTypeAdapter::subsystem;
67 auto geo_id = conf->get_geo_id();
68 if (geo_id != nullptr) {
69 m_det_id = geo_id->get_detector_id();
70 m_crate_id = geo_id->get_crate_id();
71 m_slot_id = geo_id->get_slot_id();
72 m_stream_id = geo_id->get_stream_id();
73 }
75
76template <class ReadoutTypeAdapter>
77void
80 m_emulator_mode = conf->get_emulation_mode();
81 if (!m_emulator_mode) {
82 inherited::add_preprocess_task(std::bind(&TPCEthFrameProcessor<ReadoutTypeAdapter>::sequence_check, this, std::placeholders::_1));
83 }
84
85 inherited::add_preprocess_task(std::bind(&TPCEthFrameProcessor<ReadoutTypeAdapter>::timestamp_check, this, std::placeholders::_1));
86}
88template <class ReadoutTypeAdapter>
89void
92 const std::shared_ptr<detchannelmaps::TPCChannelMap> channel_map = dunedaq::detchannelmaps::make_tpc_map(proc_conf->get_channel_map());
93 const std::vector<unsigned int> channel_mask_vec = proc_conf->get_channel_mask();
94
95 for (int chan = 0; chan < 64; chan++) {
96 trgdataformats::channel_t off_channel = channel_map->get_offline_channel_from_det_crate_slot_stream_chan(m_det_id, m_crate_id, m_slot_id, m_stream_id, chan);
97 int16_t plane = channel_map->get_plane_from_offline_channel(off_channel);
98 m_channel_plane_numbers.push_back(std::make_pair(off_channel, plane));
99
100 // This processor only needs to handle some (maybe 0) of the masked channels.
101 // Only get those relevant channels for the later check.
102 // Only get the planes for the channels that are not masked.
103 if (std::find(channel_mask_vec.begin(), channel_mask_vec.end(), off_channel) != channel_mask_vec.end()) {
104 m_channel_mask_set.insert(off_channel);
105 } else {
106 m_channel_plane_map[off_channel] = plane;
107 m_plane_numbers_set.insert(plane);
108 }
109 }
110}
111
112template <class ReadoutTypeAdapter>
113void
115{
116 // Setting TP sinks.
117 // Configurations currently have the sinks iterate in order, but there may be more sinks than planes.
118 int plane_number = 0;
119 for (auto output : conf->get_outputs()) {
120 try {
121 if (output->get_data_type() == "TriggerPrimitiveVector") {
122 if (m_plane_numbers_set.contains(plane_number)) {
123 m_plane_to_tp_sink_map[plane_number] = get_iom_sender<std::vector<trigger::TriggerPrimitiveTypeAdapter>>(output->UID());
124 }
125 plane_number++;
126 }
127 } catch (const ers::Issue& excpt) {
128 ers::error(datahandlinglibs::ResourceQueueError(ERS_HERE, "tp", "DefaultRequestHandlerModel", excpt));
129 }
130 }
132 // We do need a coverage for all planes.
133 if (m_plane_numbers_set.size() > m_plane_to_tp_sink_map.size()) {
134 ers::error(DetectorPlaneToTPSinkMismatch(ERS_HERE, m_plane_numbers_set.size(), m_plane_to_tp_sink_map.size()));
135 }
136
137 m_tp_generator = std::make_unique<tpglibs::TPGenerator>();
138
139 // Set the minimum TP samples over threshold.
140 auto conf_sot_minima = proc_conf->get_sot_minima();
141 std::vector<uint16_t> sot_minima{conf_sot_minima->get_sot_minimum_plane0(),
142 conf_sot_minima->get_sot_minimum_plane1(),
143 conf_sot_minima->get_sot_minimum_plane2()};
144 m_tp_generator->set_sot_minima(sot_minima);
145
146 std::vector<const appmodel::ProcessingStep*> processing_steps = proc_conf->get_processing_steps();
147 for (auto step : processing_steps) {
148 m_tpg_configs.push_back(std::make_pair(step->class_name(), step->to_json(false).back()));
149 }
150
151 // Let the TPG generator configure
152 m_tp_generator->configure(m_tpg_configs, m_channel_plane_numbers, ReadoutTypeAdapter::samples_tick_difference);
153
154 // Set the limits on when to send TPs and check that we can actually send on these limits.
155 m_frame_count_limit = proc_conf->get_frame_count_limit();
156 m_tp_count_limit = proc_conf->get_tp_count_limit();
157 m_frame_limit_enabled = m_frame_count_limit > 0;
158 m_tp_limit_enabled = m_tp_count_limit > 0;
159
160 if (!m_frame_limit_enabled && !m_tp_limit_enabled) {
162 }
163
164 // After it sees the configs, it will set the metric collector enable state
165 m_tpg_metric_collect_enabled = m_tp_generator->get_metric_collector_enable_state();
166 m_metric_collect_opmon_period = proc_conf->get_metric_collect_opmon_rate();
167
168 inherited::add_postprocess_task(std::bind(&TPCEthFrameProcessor<ReadoutTypeAdapter>::find_tps, this, std::placeholders::_1));
169}
170
171template <class ReadoutTypeAdapter>
172void
174{
176 if (dp == nullptr) {
177 return;
178 }
179
181 if (proc_conf == nullptr) {
182 return;
183 }
184
185 // Need TPCRawDataProcessor configurations to configure the following.
186 configure_channel_plane_numbers(proc_conf);
187 configure_find_tps(conf, proc_conf);
188}
189
190template <class ReadoutTypeAdapter>
191void
193{
194 configure_source_and_geo_ids(conf);
195
196 configure_preprocessing(conf);
197
198 if (this->m_post_processing_enabled) {
199 configure_postprocessing(conf);
200 }
201
202 inherited::conf(conf);
203}
204
205template <class ReadoutTypeAdapter>
206void
208{
209 m_sourceid = daqdataformats::SourceID();
210
211 m_det_id = 0;
212 m_crate_id = 0;
213 m_slot_id = 0;
214 m_stream_id = 0;
215}
216
217template <class ReadoutTypeAdapter>
218void
220{
221 m_emulator_mode = false;
222 m_first_frame = true;
223
224 // Timestamps.
225 m_previous_ts = 0;
226 m_current_ts = 0;
227
228 m_pattern_generator_previous_ts = 0;
229 m_pattern_generator_current_ts = 0;
230
231 m_first_ts_missmatch = true;
232 m_ts_problem_reported = false;
233 m_ts_error_state = false;
234 m_ts_error_ctr = 0;
235
236 // Sequence ID.
237 m_previous_seq_id = 0;
238 m_current_seq_id = 0;
239
240 m_first_seq_id_mismatch = true;
241 m_seq_id_problem_reported = false;
242 m_seq_id_error_state = false;
243 m_seq_id_error_ctr = 0;
244 m_seq_id_min_jump = 0;
245 m_seq_id_max_jump = 0;
246
247 // The preprocessing tasks scrap is handled by inherited::scrap().
248}
249
250template <class ReadoutTypeAdapter>
251void
253{
254 // Channel-plane variables
255 m_channel_mask_set.clear();
256 m_plane_numbers_set.clear();
257 m_channel_plane_numbers.clear();
258 m_channel_plane_map.clear();
259
260 // TP variables
261 m_tp_generator.reset(new tpglibs::TPGenerator());
262 m_tpg_configs.clear();
263 m_plane_to_tpa_vector_map.clear();
264 m_plane_to_tp_sink_map.clear();
265
266 m_frame_limit_enabled = false;
267 m_tp_limit_enabled = false;
268 m_current_tp_count = 0;
269 m_tp_count_limit = 0;
270 m_frame_count_at_last_send = 0;
271
272 // OpMon variables
273 m_tpg_metric_collect_enabled = false;
274 m_metric_collect_opmon_period = 128;
275 m_tp_channel_rate_map.clear();
276 m_num_new_tps.exchange(0);
277 m_tps_suppressed_too_long.exchange(0);
278 m_tps_send_failed.exchange(0);
279 m_frame_counter.exchange(0);
280 m_t0 = std::chrono::high_resolution_clock::now();
281}
282
283template <class ReadoutTypeAdapter>
284void
285TPCEthFrameProcessor<ReadoutTypeAdapter>::scrap(const appfwk::DAQModule::CommandData_t& cfg)
286{
287 scrap_source_and_geo_ids();
288 scrap_preprocessing();
289
290 if (this->m_post_processing_enabled) {
291 scrap_postprocessing();
292 }
293
294 inherited::scrap(cfg);
295}
296
297
298template <class ReadoutTypeAdapter>
299void
301{
303
304 info.set_num_seq_id_errors(m_seq_id_error_ctr.load());
305 info.set_min_seq_id_jump(m_seq_id_min_jump.exchange(0));
306 info.set_max_seq_id_jump(m_seq_id_max_jump.exchange(0));
307
308 info.set_num_ts_errors(m_ts_error_ctr.load());
309
310 this->publish(std::move(info));
311
312 this->m_error_registry->log_registered_errors();
313
314 if (this->m_post_processing_enabled) {
315 auto now = std::chrono::high_resolution_clock::now();
316 int num_new_tps = m_num_new_tps.exchange(0);
317 int num_new_tps_suppressed_too_long = m_tps_suppressed_too_long.exchange(0);
318 int num_new_tps_send_failed = m_tps_send_failed.exchange(0);
319 double seconds = std::chrono::duration_cast<std::chrono::microseconds>(now - m_t0).count() / 1000000.;
320 TLOG_DEBUG(TLVL_BOOKKEEPING) << "TP rate: " << std::to_string(num_new_tps / seconds / 1000.) << " [kHz]";
321 TLOG_DEBUG(TLVL_BOOKKEEPING) << "Total new TPs: " << num_new_tps;
322
324 tp_info.set_rate_tp_hits(num_new_tps / seconds / 1000.);
325
326 tp_info.set_num_tps_sent(num_new_tps);
327 tp_info.set_num_tps_suppressed_too_long(num_new_tps_suppressed_too_long);
328 tp_info.set_num_tps_send_failed(num_new_tps_send_failed);
329
330 this->publish(std::move(tp_info));
331 // Find the channels with the top TP rates
332 // Create a vector of pairs to store the map elements
333 std::vector<std::pair<uint, int>> channel_tp_rate_vec(m_tp_channel_rate_map.begin(), m_tp_channel_rate_map.end());
334 // Sort the vector in descending order of the value of the pairs
335 sort(channel_tp_rate_vec.begin(), channel_tp_rate_vec.end(), [](std::pair<uint, int>& a, std::pair<uint, int>& b) { return a.second > b.second; });
336 // Add the metrics to opmon
337 // For convenience we are selecting only the top 10 elements
338 if (channel_tp_rate_vec.size() != 0) {
339 int top_highest_values = 10;
340 if (channel_tp_rate_vec.size() < 10) {
341 top_highest_values = channel_tp_rate_vec.size();
342 }
343 //datahandlinglibs::opmon::TPChannelsInfo channels_info;
344 for (int i = 0; i < top_highest_values; i++) {
346 tpc_info.set_number_of_tps(channel_tp_rate_vec[i].second);
347 tpc_info.set_channel_id(channel_tp_rate_vec[i].first);
348 this->publish(std::move(tpc_info), {{"channel", std::to_string(channel_tp_rate_vec[i].first)}});
349 }
350 }
351
352 // Reset the counter in the channel rate map
353 for (auto& el : m_tp_channel_rate_map) {
354 el.second = 0;
355 }
356 m_t0 = now;
357
358 if (m_tpg_metric_collect_enabled && m_tp_generator) {
359 publish_processor_metric_to_opmon();
360 publish_processor_metric_to_opmon_with_aggregation();
361 }
362 }
363
364 inherited::generate_opmon_data();
365 }
366
367template <class ReadoutTypeAdapter>
368void
370 auto metrics = m_tp_generator->get_processor_metrics();
371 for (const auto& [channel, vec] : metrics) {
373 for (const auto& [name, val] : vec) {
374 if (name == "m_pedestal") {
375 tpg_proc_info.set_pedestal(val);
376 } else if (name == "m_accum") {
377 tpg_proc_info.set_accum(val);
378 }
379 }
380 this->publish(std::move(tpg_proc_info), {{"channel", std::to_string(channel)}});
381 }
382}
383
384template <class ReadoutTypeAdapter>
385std::map<int16_t, std::map<std::string, std::tuple<float, int16_t, int16_t, float, dunedaq::trgdataformats::channel_t, dunedaq::trgdataformats::channel_t>>>
386TPCEthFrameProcessor<ReadoutTypeAdapter>::calculate_all_metric_summaries_across_planes(const std::unordered_map<dunedaq::trgdataformats::channel_t, std::vector<std::pair<std::string, int16_t>>>& metrics) {
387 // Structure to hold all statistics: plane -> metric -> (mean, min, max, stddev, min_channel_id, max_channel_id)
388 std::map<int16_t, std::map<std::string, std::tuple<float, int16_t, int16_t, float, dunedaq::trgdataformats::channel_t, dunedaq::trgdataformats::channel_t>>> all_stats;
389
390 // Structure to accumulate statistics: plane -> metric -> (count, mean, M2, min, max, min_channel_id, max_channel_id)
391 std::map<int16_t, std::map<std::string, std::tuple<size_t, double, double, int16_t, int16_t, dunedaq::trgdataformats::channel_t, dunedaq::trgdataformats::channel_t>>> accumulators;
392
393 // Single pass through all metrics to collect data using Welford's online algorithm for variance
394 for (const auto& [channel, vec] : metrics) {
395 if (m_channel_plane_map.empty()) continue;
396
397 int16_t plane = m_channel_plane_map[channel];
398
399 for (const auto& [name, val] : vec) {
400 auto& [count, mean, M2, min, max, min_channel_id, max_channel_id] = accumulators[plane][name];
401
402 count++;
403
404 if (count == 1 || val < min) {
405 min = val;
406 min_channel_id = channel;
407 }
408 if (count == 1 || val > max) {
409 max = val;
410 max_channel_id = channel;
411 }
412
413 // Welford's online algorithm for variance calculation
414 if (count == 1) {
415 // First value: initialize mean and M2
416 mean = val;
417 M2 = 0.0;
418 } else {
419 // Update mean and M2 using Welford's algorithm
420 double delta = val - mean;
421 mean += delta / count;
422 double delta2 = val - mean;
423 M2 += delta * delta2;
424 }
425 }
426 }
427
428 // Calculate final statistics from accumulated data
429 for (const auto& [plane, metric_map] : accumulators) {
430 for (const auto& [metric_name, acc_data] : metric_map) {
431 const auto& [count, mean, M2, min, max, min_channel_id, max_channel_id] = acc_data;
432
433 if (count == 0) continue;
434
435 float stddev = 0.0f;
436
437 // Calculate standard deviation using accumulated M2
438 if (count > 1) {
439 stddev = std::sqrt(M2 / (count - 1));
440 }
441
442 all_stats[plane][metric_name] = std::make_tuple(static_cast<float>(mean), min, max, stddev, min_channel_id, max_channel_id);
443 }
444 }
445
446 return all_stats;
447}
448
449template <class ReadoutTypeAdapter>
450void
452 auto metrics = m_tp_generator->get_processor_metrics();
453
454 // Use optimized single-pass calculation for all metrics across all planes
455 auto all_stats = calculate_all_metric_summaries_across_planes(metrics);
456
457 // Publish all calculated statistics
458 for (const auto& [plane, metric_map] : all_stats) {
459 for (const auto& [metric_name, stats] : metric_map) {
460 const auto& [mean, min, max, stddev, min_channel_id, max_channel_id] = stats;
461
463 info.set_average(mean);
464 info.set_max(max);
465 info.set_min(min);
466 info.set_standard_dev(stddev);
467 info.set_max_channel_id(max_channel_id);
468 info.set_min_channel_id(min_channel_id);
469 this->publish(std::move(info), {{"plane", std::to_string(plane)}, {"metric", metric_name}});
470 }
471 }
472}
473
474
478template <class ReadoutTypeAdapter>
479void
481{
482 // Acquire timestamp
483 auto wfptr = reinterpret_cast<tpcframeptr>(fp); // NOLINT
484 m_current_seq_id = wfptr->daq_header.seq_id;
485
486 // Check that the system is properly configured from the first frame.
487 if (m_first_frame) [[unlikely]] {
488 if (wfptr->daq_header.crate_id != m_crate_id || wfptr->daq_header.slot_id != m_slot_id || wfptr->daq_header.stream_id != m_stream_id) {
489 ers::error(LinkMisconfiguration(ERS_HERE, wfptr->daq_header.crate_id, wfptr->daq_header.slot_id, wfptr->daq_header.stream_id, m_crate_id, m_slot_id, m_stream_id));
490 }
491
492 m_first_frame = false;
493 }
494
495 // Check sequence id
496 // Calculate the next sequence id (12 bits)
497 uint16_t expected_seq_id = (m_previous_seq_id + fp->get_num_frames()) & 0xfff;
498 int16_t delta_seq_id = m_current_seq_id-expected_seq_id;
499 if ( delta_seq_id > 0x800) {
500 delta_seq_id -= 0x1000;
501 } else if ( delta_seq_id < -0x7ff) {
502 delta_seq_id += 0x1000;
503 }
504
505 if (delta_seq_id == 0) {
506 m_seq_id_error_state = false;
507 } else {
508 // uint16_t delta_seq_id = (m_current_seq_id-expected_seq_id);
509 ++m_seq_id_error_ctr;
510 m_seq_id_max_jump = std::max(delta_seq_id, m_seq_id_max_jump.load());
511 m_seq_id_min_jump = std::min(delta_seq_id, m_seq_id_min_jump.load());
512
513 if (m_first_seq_id_mismatch) { // log once
514 TLOG_DEBUG(TLVL_BOOKKEEPING) << "First sequence id MISMATCH! -> | previous: " << std::to_string(m_previous_seq_id) << " current: " + std::to_string(m_current_seq_id);
515 m_first_seq_id_mismatch = false;
516 } else {
517 if (!m_seq_id_error_state) {
518 this->m_error_registry->add_error("Sequence ID jump", datahandlinglibs::FrameErrorRegistry::ErrorInterval(expected_seq_id, m_current_seq_id));
519 m_seq_id_error_state = true;
520 }
521 }
522 }
523
524 if (m_seq_id_error_ctr > 1000) {
525 if (!m_seq_id_problem_reported) {
526 TLOG() << "*** Data Integrity ERROR *** Sequence ID continuity is completely broken! "
527 << "Something is wrong with the FE source or with the configuration!";
528 m_seq_id_problem_reported = true;
529 }
530 }
531
532 m_previous_seq_id = m_current_seq_id;
533
534}
535
539template <class ReadoutTypeAdapter>
540void
542{
543
544 uint16_t tpceth_tick_difference = ReadoutTypeAdapter::expected_tick_difference;
545 uint16_t tpceth_frame_tick_difference = tpceth_tick_difference * fp->get_num_frames();
546
547 auto wfptr = reinterpret_cast<tpcframeptr>(fp); // NOLINT
548 m_current_ts = wfptr->get_timestamp();
549
550 // Check timestamp
551 if (m_previous_ts > 0 &&
552 m_current_ts - m_previous_ts != tpceth_frame_tick_difference) [[unlikely]] {
553 ++m_ts_error_ctr;
554 if (m_first_ts_missmatch) { // log once
555 TLOG_DEBUG(TLVL_BOOKKEEPING) << "First timestamp MISMATCH! -> | previous: " << std::to_string(m_previous_ts) << " current: " + std::to_string(m_current_ts);
556 m_first_ts_missmatch = false;
557 } else {
558 if (!m_ts_error_state) {
559 this->m_error_registry->add_error("Timestamp jump", datahandlinglibs::FrameErrorRegistry::ErrorInterval(m_previous_ts + tpceth_frame_tick_difference, m_current_ts));
560 m_ts_error_state = true;
561 }
562 }
563 } else {
564 m_ts_error_state = false;
565 }
566
567 if (m_ts_error_ctr > 1000) {
568 if (!m_ts_problem_reported) {
569 TLOG() << "*** Data Integrity ERROR *** Timestamp continuity is completely broken! "
570 << "Something is wrong with the FE source or with the configuration!";
571 m_ts_problem_reported = true;
572 }
573 }
574
575 m_previous_ts = m_current_ts;
576 this->m_last_processed_daq_ts = m_current_ts;
577}
578
582template <class ReadoutTypeAdapter>
583void
585{
586 if (!fp)
587 return;
588 auto wfptr = reinterpret_cast<tpcframeptr>((uint8_t*)fp); // NOLINT
589
590 std::vector<trgdataformats::TriggerPrimitive> tps = (*m_tp_generator)(wfptr);
591
592 uint64_t current_frame_count = m_frame_counter.fetch_add(1, std::memory_order_relaxed) + 1;
593 if (m_tpg_metric_collect_enabled && current_frame_count % m_metric_collect_opmon_period == 0) {
594 m_tp_generator->signal_metric_collection();
595 }
596
597 for (const auto& tp : tps) {
598 // If this TP is on a masked channel, skip it.
599 if (std::binary_search(m_channel_mask_set.begin(), m_channel_mask_set.end(), uint32_t(tp.channel)))
600 continue;
601 // Need to move into a type adapter.
603 tpa.tp = tp;
604
605 tpa.tp.detid = m_det_id; // Last missing piece.
606 m_plane_to_tpa_vector_map[m_channel_plane_map[uint32_t(tp.channel)]].push_back(tpa);
607 m_tp_channel_rate_map[uint32_t(tp.channel)]++;
608 m_current_tp_count++;
609 }
610
611 const bool frame_limit_reached = m_frame_limit_enabled && (current_frame_count - m_frame_count_at_last_send >= m_frame_count_limit);
612 const bool tp_limit_reached = m_tp_limit_enabled && (m_current_tp_count >= m_tp_count_limit);
613
614 if (frame_limit_reached || tp_limit_reached) [[unlikely]] {
615 m_current_tp_count = 0;
616 m_frame_count_at_last_send = current_frame_count;
617 for (auto& [plane_num, tpa_vector] : m_plane_to_tpa_vector_map) {
618 int num_new_tps = tpa_vector.size();
619 if (num_new_tps == 0) {
620 continue;
621 }
622 const auto ts_begin = tpa_vector.front().tp.time_start;
623 const auto channel_begin = tpa_vector.front().tp.channel;
624 const auto ts_end = tpa_vector.back().tp.time_start;
625 const auto channel_end = tpa_vector.back().tp.channel;
626 if (!m_plane_to_tp_sink_map[plane_num]->try_send(std::move(tpa_vector), iomanager::Sender::s_no_block)) {
627 ers::warning(FailedToSendTPVector(ERS_HERE, ts_begin, channel_begin, ts_end, channel_end));
628 m_tps_send_failed++;
629 } else {
630 m_num_new_tps += num_new_tps;
631 }
632 }
633 }
634 return;
635}
636
637} // namespace fdreadoutlibs
638} // namespace dunedaq
#define ERS_HERE
#define DUNE_DAQ_TYPESTRING(Type, typestring)
Declare the datatype_to_string method for the given type.
const dunedaq::appmodel::DataProcessor * get_data_processor() const
Get "data_processor" relationship value.
const dunedaq::appmodel::DataHandlerConf * get_module_configuration() const
Get "module_configuration" relationship value.
bool get_emulation_mode() const
Get "emulation_mode" attribute value.
uint32_t get_source_id() const
Get "source_id" attribute value.
const dunedaq::confmodel::GeoId * get_geo_id() const
Get "geo_id" relationship value.
const std::vector< uint32_t > & get_channel_mask() const
Get "channel_mask" attribute value. List of channels to be masked from TP generation.
const std::string & get_channel_map() const
Get "channel_map" attribute value.
uint32_t get_frame_count_limit() const
Get "frame_count_limit" attribute value. When this number of frames is reached the TPs are sent to th...
const dunedaq::appmodel::SamplesOverThresholdMinima * get_sot_minima() const
Get "sot_minima" relationship value. TP samples over threshold minimum requirement by plane.
const std::vector< const dunedaq::appmodel::ProcessingStep * > & get_processing_steps() const
Get "processing_steps" relationship value.
uint32_t get_metric_collect_opmon_rate() const
Get "metric_collect_opmon_rate" attribute value. The rate at which processor metric is polled from pr...
uint32_t get_tp_count_limit() const
Get "tp_count_limit" attribute value. When this number of TPs is reached, the TPs are sent to the sin...
const TARGET * cast() const noexcept
Casts object to different class.
const std::vector< const dunedaq::confmodel::Connection * > & get_outputs() const
Get "outputs" relationship value. Output connections from this module.
void configure_postprocessing(const appmodel::DataHandlerModule *conf)
void configure_channel_plane_numbers(const appmodel::TPCRawDataProcessor *proc_conf)
void stop(const appfwk::DAQModule::CommandData_t &args) override
Stop operation.
void configure_find_tps(const appmodel::DataHandlerModule *conf, const appmodel::TPCRawDataProcessor *proc_conf)
void scrap(const appfwk::DAQModule::CommandData_t &cfg) override
Unconfigure.
void configure_source_and_geo_ids(const appmodel::DataHandlerModule *conf)
std::map< int16_t, std::map< std::string, std::tuple< float, int16_t, int16_t, float, dunedaq::trgdataformats::channel_t, dunedaq::trgdataformats::channel_t > > > calculate_all_metric_summaries_across_planes(const std::unordered_map< dunedaq::trgdataformats::channel_t, std::vector< std::pair< std::string, int16_t > > > &metrics)
void start(const appfwk::DAQModule::CommandData_t &args) override
Start operation.
void configure_preprocessing(const appmodel::DataHandlerModule *conf)
void conf(const appmodel::DataHandlerModule *conf) override
Set the emulator mode, if active, timestamps of processed packets are overwritten with new ones.
static constexpr timeout_t s_no_block
Definition Sender.hpp:26
Base class for any user define issue.
Definition Issue.hpp:69
TPG driving class.
static int64_t now()
#define TLOG_DEBUG(lvl,...)
Definition Logging.hpp:112
#define TLOG(...)
Definition macro.hpp:22
The DUNE-DAQ namespace.
static std::shared_ptr< iomanager::SenderConcept< Datatype > > get_iom_sender(iomanager::ConnectionId const &id)
Both frame_count_limit and tp_count_limit were set FailedToSendTPVector
void warning(const Issue &issue)
Definition ers.hpp:115
void error(const Issue &issue)
Definition ers.hpp:81
SourceID is a generalized representation of the source of a piece of data in the DAQ....
Definition SourceID.hpp:32