DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
TCProcessor.cpp
Go to the documentation of this file.
1
8#include "trigger/TCProcessor.hpp" // NOLINT(build/include)
9
10#include "iomanager/Sender.hpp"
11#include "logging/Logging.hpp"
12
17#include "trigger/TCWrapper.hpp"
19
22
25
26// THIS SHOULDN'T BE HERE!!!!! But it is necessary.....
28
29namespace dunedaq {
30namespace trigger {
31
32TCProcessor::TCProcessor(std::unique_ptr<datahandlinglibs::FrameErrorRegistry>& error_registry, bool post_processing_enabled)
33 : datahandlinglibs::TaskRawDataProcessorModel<TCWrapper>(error_registry, post_processing_enabled)
34{
35}
36
39
40void
41TCProcessor::start(const nlohmann::json& args)
42{
43 TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << "TCProcessor: Entering start() method";
44
45 m_running_flag.store(true);
47 pthread_setname_np(m_send_trigger_decisions_thread.native_handle(), "mlt-dec"); // TODO: originally mlt-trig-dec
48
49 // Reset stats
50 m_tds_created_count.store(0);
51 m_tds_sent_count.store(0);
52 m_tds_dropped_count.store(0);
54 m_tds_cleared_count.store(0);
55 // per TC
56 m_tc_received_count.store(0);
58 m_tds_sent_tc_count.store(0);
62 m_tc_ignored_count.store(0);
63 inherited::start(args);
64
65 TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << "TCProcessor: Exiting start() method";
66}
67
68void
69TCProcessor::stop(const nlohmann::json& args)
70{
71 TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << "TCProcessor: Entering stop() method";
72
73 inherited::stop(args);
74 m_running_flag.store(false);
75
76 // Make sure condition_variable knows we flipped running flag
77 {
78 std::lock_guard<std::mutex> lock(m_td_vector_mutex);
79 m_cv.notify_all();
80 }
81
82 // Wait for the TD-sending thread to stop
84
85 // Drop all TDs in vectors at run stage change. Have to do this
86 // after joining m_send_trigger_decisions_thread so we don't
87 // concurrently access the vectors
89
91
92 TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << "TCProcessor: Exiting stop() method";
93}
94
95void
97{
98 TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << "TCProcessor: Entering conf() method";
99
100 auto mtrg = cfg->cast<appmodel::TriggerDataHandlerModule>();
101 if (mtrg == nullptr) {
102 throw(InvalidConfiguration(ERS_HERE, "Provided null TriggerDataHandlerModule configuration!"));
103 }
104 for (auto output : mtrg->get_outputs()) {
105 try {
106 if (output->get_data_type() == "TriggerDecision") {
108 }
109 } catch (const ers::Issue& excpt) {
110 ers::error(datahandlinglibs::ResourceQueueError(ERS_HERE, "td", "DefaultRequestHandlerModel", excpt));
111 }
112 }
113
114 auto dp = mtrg->get_module_configuration()->get_data_processor();
115 auto proc_conf = dp->cast<appmodel::TCDataProcessor>();
116
117 // Add all Source IDs to mandatoy links for now...
118 for(auto const& link : mtrg->get_mandatory_source_ids()){
119 m_mandatory_links.push_back(
122 link->get_sid()});
123 }
124 for(auto const& link : mtrg->get_enabled_source_ids()){
125 m_mandatory_links.push_back(
128 link->get_sid()});
129 }
130
131 // TODO: Group links!
132 //m_group_links_data = conf->get_groups_links();
136 TLOG_DEBUG(3) << "Total group links: " << m_total_group_links;
137
138 m_tc_merging = proc_conf->get_merge_overlapping_tcs();
139 m_ignore_tc_pileup = proc_conf->get_ignore_overlapping_tcs();
140 m_buffer_timeout = proc_conf->get_buffer_timeout();
141 m_send_timed_out_tds = (m_ignore_tc_pileup) ? false : proc_conf->get_td_out_of_timeout();
142 m_td_readout_limit = proc_conf->get_td_readout_limit();
143 m_ignored_tc_types = proc_conf->get_ignore_tc();
145
146 // Trigger bitwords
147 std::vector<const appmodel::TriggerBitword*> bitwords = proc_conf->get_trigger_bitwords();
148 m_use_bitwords = !bitwords.empty();
149 if(m_use_bitwords){
150 set_trigger_bitwords(bitwords);
152 }
153 TLOG_DEBUG(3) << "Use bitwords: " << m_use_bitwords;
154 TLOG_DEBUG(3) << "Allow merging: " << m_tc_merging;
155 TLOG_DEBUG(3) << "Ignore pileup: " << m_ignore_tc_pileup;
156 TLOG_DEBUG(3) << "Buffer timeout: " << m_buffer_timeout;
157 TLOG_DEBUG(3) << "Should send timed out TDs: " << m_send_timed_out_tds;
158 TLOG_DEBUG(3) << "TD readout limit: " << m_td_readout_limit;
159
160 // ROI map
161 m_roi_conf_data = proc_conf->get_roi_group_conf();
163 if (m_use_roi_readout) {
166 }
167 TLOG_DEBUG(3) << "Use ROI readout?: " << m_use_roi_readout;
168
169 // Custom readout map
170 m_readout_window_map_data = proc_conf->get_tc_readout_map();
172 if (m_use_readout_map) {
175 }
176 TLOG_DEBUG(3) << "Use readout map: " << m_use_readout_map;
177
178 // Ignoring TC types
179 TLOG_DEBUG(3) << "Ignoring TC types: " << m_ignoring_tc_types;
181 TLOG_DEBUG(3) << "TC types to ignore: ";
182 for (std::vector<unsigned int>::iterator it = m_ignored_tc_types.begin(); it != m_ignored_tc_types.end();) {
183 TLOG_DEBUG(3) << *it;
184 ++it;
185 }
186 }
187 m_latency_monitoring.store( dp->get_latency_monitoring() );
188 inherited::add_postprocess_task(std::bind(&TCProcessor::make_td, this, std::placeholders::_1));
189
190 inherited::conf(mtrg);
191
192 TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << "TCProcessor: Exiting conf() method";
193}
194
195void
196TCProcessor::scrap(const nlohmann::json& args)
197{
198 TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << "TCProcessor: Entering scrap() method";
199
200 m_mandatory_links.clear();
201 m_group_links.clear();
202 m_roi_conf.clear();
203 m_roi_conf_data.clear();
204 m_roi_conf_ids.clear();
205 m_roi_conf_probs.clear();
206 m_roi_conf_probs_c.clear();
207 m_pending_tds.clear();
209 m_readout_window_map.clear();
210 m_ignored_tc_types.clear();
211
212 m_td_sink.reset();
213
214 m_group_links_data.clear();
215
216 inherited::scrap(args);
217
218 TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << "TCProcessor: Exiting scrap() method";
219}
220
221void
223{
225
226 info.set_tds_created_count( m_tds_created_count.load() );
227 info.set_tds_sent_count( m_tds_sent_count.load() );
228 info.set_tds_dropped_count( m_tds_dropped_count.load() );
229 info.set_tds_failed_bitword_count( m_tds_failed_bitword_count.load() );
230 info.set_tds_cleared_count( m_tds_cleared_count.load() );
231 info.set_tc_received_count( m_tc_received_count.load() );
232 info.set_tc_ignored_count( m_tc_ignored_count.load() );
233 info.set_tds_created_tc_count( m_tds_created_tc_count.load() );
234 info.set_tds_sent_tc_count( m_tds_sent_tc_count.load() );
235 info.set_tds_dropped_tc_count( m_tds_dropped_tc_count.load() );
236 info.set_tds_failed_bitword_tc_count( m_tds_failed_bitword_tc_count.load() );
237 info.set_tds_cleared_tc_count( m_tds_cleared_tc_count.load() );
238
239 this->publish(std::move(info));
240
241 if ( m_latency_monitoring.load() && m_running_flag.load() ) {
242 opmon::TriggerLatency lat_info;
243
246
247 this->publish(std::move(lat_info));
248 }
249}
250
254void
256{
257
258 auto tc = tcw->candidate;
259 if (m_latency_monitoring.load()) m_latency_instance.update_latency_in( tc.time_start );
261
262 if ( (m_use_readout_map) && (m_readout_window_map.count(tc.type)) ) {
263 TLOG_DEBUG(3) << "Got TC of type " << static_cast<int>(tc.type) << ", timestamp " << tc.time_candidate
264 << ", start/end " << tc.time_start << "/" << tc.time_end << ", readout start/end "
265 << tc.time_candidate - m_readout_window_map[tc.type].first << "/"
266 << tc.time_candidate + m_readout_window_map[tc.type].second;
267 } else {
268 TLOG_DEBUG(3) << "Got TC of type " << static_cast<int>(tc.type) << ", timestamp " << tc.time_candidate
269 << ", start/end " << tc.time_start << "/" << tc.time_end;
270 }
271
272 // Option to ignore TC types (if given by config)
273 if (m_ignoring_tc_types == true && check_trigger_type_ignore(static_cast<unsigned int>(tc.type)) == true) {
274 TLOG_DEBUG(3) << " Ignore TC type: " << static_cast<unsigned int>(tc.type);
276
277 /*FIXME: comment out this block: if a TC is to be ignored it shall just be ignored!
278 if (m_tc_merging) {
279 // Still need to check for overlap with existing TD, if overlaps, include in the TD, but don't extend
280 // readout
281 std::lock_guard<std::mutex> lock(m_td_vector_mutex);
282 add_tc_ignored(*tc);
283 }
284 */
285 }
286 else {
287 std::lock_guard<std::mutex> lock(m_td_vector_mutex);
288 add_tc(tc);
289 m_cv.notify_one();
290 TLOG_DEBUG(10) << "pending tds size: " << m_pending_tds.size();
291 }
292 m_last_processed_daq_ts = tc.time_start;
293 return;
294}
295
298{
300 TLOG_DEBUG(5) << "earliest TC index: " << m_earliest_tc_index;
301
302 if (pending_td.contributing_tcs.size() > 1) {
303 TLOG_DEBUG(5) << "!!! TD created from " << pending_td.contributing_tcs.size() << " TCs !!!";
304 }
305
307 decision.trigger_number = 0; // filled by MLT
308 decision.run_number = 0; // filled by MLT
309 decision.trigger_timestamp = pending_td.contributing_tcs[m_earliest_tc_index].time_candidate;
311
312 TDBitset td_bitword = get_TD_bitword(pending_td);
313 TLOG_DEBUG(5) << "[MLT] TD has bitword: " << td_bitword << " "
314 << static_cast<dfmessages::trigger_type_t>(td_bitword.to_ulong());
315 decision.trigger_type = static_cast<dfmessages::trigger_type_t>(td_bitword.to_ulong()); // m_trigger_type;
316
317 //decision.trigger_type = 1; // m_trigger_type;
318
319 TLOG_DEBUG(3) << ", TC detid: " << pending_td.contributing_tcs[m_earliest_tc_index].detid
320 << ", TC type: " << static_cast<int>(pending_td.contributing_tcs[m_earliest_tc_index].type)
321 << ", TC cont number: " << pending_td.contributing_tcs.size()
322 << ", DECISION trigger type: " << decision.trigger_type
323 << ", DECISION timestamp: " << decision.trigger_timestamp
324 << ", request window begin: " << pending_td.readout_start
325 << ", request window end: " << pending_td.readout_end;
326
327 std::vector<dfmessages::ComponentRequest> requests =
329 add_requests_to_decision(decision, requests);
330
331 if (!m_use_roi_readout) {
332 for (const auto& [key, value] : m_group_links) {
333 std::vector<dfmessages::ComponentRequest> group_requests =
334 create_all_decision_requests(value, pending_td.readout_start, pending_td.readout_end);
335 add_requests_to_decision(decision, group_requests);
336 }
337 } else { // using ROI readout
339 }
340
342 m_tds_created_tc_count += pending_td.contributing_tcs.size();
343
344 return decision;
345}
346
347
348void
350 // A unique lock that can be locked and unlocked
351 std::unique_lock<std::mutex> lock(m_td_vector_mutex);
352
353 while (m_running_flag) {
354 // Either there are pending TDs, or wait for a bit
355 m_cv.wait(lock, [this] {
356 return !m_pending_tds.empty() || !m_running_flag;
357 });
358 auto ready_tds = get_ready_tds(m_pending_tds);
359 TLOG_DEBUG(10) << "ready tds: " << ready_tds.size() << ", updated pending tds: " << m_pending_tds.size();
360
361 for (std::vector<PendingTD>::iterator it = ready_tds.begin(); it != ready_tds.end();) {
362 call_tc_decision(*it);
363 ++it;
364 }
365 }
366}
367
368void
370{
371
372 if (m_use_bitwords) {
373 // Check trigger bitwords
374 TDBitset td_bitword = get_TD_bitword(pending_td);
375 if (!check_trigger_bitwords(td_bitword)) {
376 // Don't process further if the bitword check failed
379 return;
380 }
381 }
382
383 dfmessages::TriggerDecision decision = create_decision(pending_td);
384 auto tn = decision.trigger_number;
385 auto td_ts = decision.trigger_timestamp;
386
387 if (m_latency_monitoring.load()) m_latency_instance.update_latency_out( pending_td.contributing_tcs.front().time_start );
388 if(!m_td_sink->try_send(std::move(decision), iomanager::Sender::s_no_block)) {
389 ers::warning(TDDropped(ERS_HERE, tn, td_ts));
391 m_tds_dropped_tc_count += pending_td.contributing_tcs.size();
392 }
393 else {
395 m_tds_sent_tc_count += pending_td.contributing_tcs.size();
396 }
397}
398
399
400void
402{
403 bool tc_dealt = false;
404 int64_t tc_wallclock_arrived =
405 std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
406
408
409 for (std::vector<PendingTD>::iterator it = m_pending_tds.begin(); it != m_pending_tds.end();) {
410 // Don't deal with TC here if there's no overlap
411 if (!check_overlap(tc, *it)) {
412 it++;
413 continue;
414 }
415
416 // If overlap and ignoring, we drop the TC and flag it as dealt with.
417 if (m_ignore_tc_pileup) {
419 tc_dealt = true;
420 TLOG_DEBUG(3) << "TC overlapping with a previous TD, dropping!";
421 break;
422 }
423
424 // If we're here, TC merging must be on, in which case we're actually
425 // going to merge the TC into the TD.
426 it->contributing_tcs.push_back(tc);
427 if ( (m_use_readout_map) && (m_readout_window_map.count(tc.type)) ){
428 TLOG_DEBUG(3) << "TC with start/end times " << tc.time_candidate - m_readout_window_map[tc.type].first << "/"
429 << tc.time_candidate + m_readout_window_map[tc.type].second
430 << " overlaps with pending TD with start/end times " << it->readout_start << "/"
431 << it->readout_end;
432 it->readout_start = ((tc.time_candidate - m_readout_window_map[tc.type].first) >= it->readout_start)
433 ? it->readout_start
434 : (tc.time_candidate - m_readout_window_map[tc.type].first);
435 it->readout_end = ((tc.time_candidate + m_readout_window_map[tc.type].second) >= it->readout_end)
436 ? (tc.time_candidate + m_readout_window_map[tc.type].second)
437 : it->readout_end;
438 } else {
439 TLOG_DEBUG(3) << "TC with start/end times " << tc.time_start << "/" << tc.time_end
440 << " overlaps with pending TD with start/end times " << it->readout_start << "/"
441 << it->readout_end;
442 it->readout_start = (tc.time_start >= it->readout_start) ? it->readout_start : tc.time_start;
443 it->readout_end = (tc.time_end >= it->readout_end) ? tc.time_end : it->readout_end;
444 }
445 it->walltime_expiration = tc_wallclock_arrived + m_buffer_timeout;
446 tc_dealt = true;
447 break;
448 }
449 }
450
451 // Don't do anything else if we've already dealt with the TC
452 if (tc_dealt) {
453 return;
454 }
455
456 // Create a new TD out of the TC
457 PendingTD td_candidate;
458 td_candidate.contributing_tcs.push_back(tc);
459 if ( (m_use_readout_map) && (m_readout_window_map.count(tc.type)) ){
460 td_candidate.readout_start = tc.time_candidate - m_readout_window_map[tc.type].first;
461 td_candidate.readout_end = tc.time_candidate + m_readout_window_map[tc.type].second;
462 } else {
463 td_candidate.readout_start = tc.time_start;
464 td_candidate.readout_end = tc.time_end;
465 }
466 td_candidate.walltime_expiration = tc_wallclock_arrived + m_buffer_timeout;
467 m_pending_tds.push_back(td_candidate);
468}
469
470void
472{
473 for (std::vector<PendingTD>::iterator it = m_pending_tds.begin(); it != m_pending_tds.end();) {
474 if (check_overlap(tc, *it)) {
475 if ( (m_use_readout_map) && (m_readout_window_map.count(tc.type)) ) {
476 TLOG_DEBUG(3) << "!Ignored! TC with start/end times " << tc.time_candidate - m_readout_window_map[tc.type].first
477 << "/" << tc.time_candidate + m_readout_window_map[tc.type].second
478 << " overlaps with pending TD with start/end times " << it->readout_start << "/"
479 << it->readout_end;
480 } else {
481 TLOG_DEBUG(3) << "!Ignored! TC with start/end times " << tc.time_start << "/" << tc.time_end
482 << " overlaps with pending TD with start/end times " << it->readout_start << "/"
483 << it->readout_end;
484 }
485 it->contributing_tcs.push_back(tc);
486 break;
487 }
488 ++it;
489 }
490 return;
491}
492
493bool
495{
496 if ( (m_use_readout_map) && (m_readout_window_map.count(tc.type)) ) {
497 return !(((tc.time_candidate + m_readout_window_map[tc.type].second) < pending_td.readout_start) ||
498 ((tc.time_candidate - m_readout_window_map[tc.type].first > pending_td.readout_end)));
499 } else {
500 return !((tc.time_end < pending_td.readout_start) || (tc.time_start > pending_td.readout_end));
501 }
502}
503
504std::vector<TCProcessor::PendingTD>
505TCProcessor::get_ready_tds(std::vector<PendingTD>& pending_tds)
506{
507 std::vector<PendingTD> return_tds;
508 for (std::vector<PendingTD>::iterator it = pending_tds.begin(); it != pending_tds.end();) {
509 auto timestamp_now =
510 std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch())
511 .count();
512 if (timestamp_now >= it->walltime_expiration) {
513 return_tds.push_back(*it);
514 it = pending_tds.erase(it);
515 } else if (check_td_readout_length(*it)) { // Also pass on TDs with (too) long readout window
516 return_tds.push_back(*it);
517 it = pending_tds.erase(it);
518 } else {
519 ++it;
520 }
521 }
522 return return_tds;
523}
524
525int
527{
528 int earliest_tc_index = -1;
529 triggeralgs::timestamp_t earliest_tc_time;
530 for (int i = 0; i < static_cast<int>(pending_td.contributing_tcs.size()); i++) {
531 if (earliest_tc_index == -1) {
532 earliest_tc_time = pending_td.contributing_tcs[i].time_candidate;
533 earliest_tc_index = i;
534 } else {
535 if (pending_td.contributing_tcs[i].time_candidate < earliest_tc_time) {
536 earliest_tc_time = pending_td.contributing_tcs[i].time_candidate;
537 earliest_tc_index = i;
538 }
539 }
540 }
541 return earliest_tc_index;
542}
543
545{
546 bool td_too_long = false;
547 if (static_cast<int64_t>(pending_td.readout_end - pending_td.readout_start) >= m_td_readout_limit) {
548 td_too_long = true;
549 TLOG_DEBUG(3) << "Too long readout window: " << (pending_td.readout_end - pending_td.readout_start)
550 << ", sending immediate TD!";
551 }
552 return td_too_long;
553}
554
555void
557{
558 std::lock_guard<std::mutex> lock(m_td_vector_mutex);
560 // Use std::accumulate to sum up the sizes of all contributing_tcs vectors
561 size_t tds_cleared_tc_count = std::accumulate(
562 m_pending_tds.begin(), m_pending_tds.end(), 0,
563 [](size_t sum, const PendingTD& ptd) {
564 return sum + ptd.contributing_tcs.size();
565 }
566 );
567 m_tds_cleared_tc_count += tds_cleared_tc_count;
568 m_pending_tds.clear();
569}
570bool
572{
573 bool ignore = false;
574 for (std::vector<unsigned int>::iterator it = m_ignored_tc_types.begin(); it != m_ignored_tc_types.end();) {
575 if (tc_type == *it) {
576 ignore = true;
577 break;
578 }
579 ++it;
580 }
581 return ignore;
582}
583
584void
586{
587 TLOG_DEBUG(3) << "Configured trigger words:";
588 for (const auto& bitword : m_trigger_bitwords) {
589 TLOG_DEBUG(3) << bitword;
590 }
591}
592
593bool
595{
596 bool trigger_check = false;
597 for (const auto& bitword : m_trigger_bitwords) {
598 TLOG_DEBUG(15) << "TD word: " << td_bitword << ", bitword: " << bitword;
599 trigger_check = ((td_bitword & bitword) == bitword);
600 TLOG_DEBUG(15) << "&: " << (td_bitword & bitword);
601 TLOG_DEBUG(15) << "trigger?: " << trigger_check;
602 if (trigger_check == true)
603 break;
604 }
605 return trigger_check;
606}
607
608void
609TCProcessor::set_trigger_bitwords(const std::vector<const appmodel::TriggerBitword*>& _bitwords)
610{
611 for (const appmodel::TriggerBitword* bitword : _bitwords) {
612 TDBitset temp_bitword;
613
614 for (const std::string& tctype_str: bitword->get_bitword()) {
616
617 if (tc_type == TCType::kUnknown) {
618 throw(InvalidConfiguration(ERS_HERE, "Provided an unknown/non-existent TC type as a trigger bitword!"));
619 }
620
621 temp_bitword.set(static_cast<uint64_t>(tc_type));
622 }
623
624 m_trigger_bitwords.push_back(temp_bitword);
625 }
626}
627
628void
629TCProcessor::parse_readout_map(const std::vector<const appmodel::TCReadoutMap*>& data)
630{
631 for (auto readout_type : data) {
632 TCType tc_type = static_cast<TCType>(
633 dunedaq::trgdataformats::string_to_trigger_candidate_type(readout_type->get_tc_type_name()));
634
635 // Throw error if unknown TC type
636 if (tc_type == TCType::kUnknown) {
637 throw(InvalidConfiguration(ERS_HERE, "Provided an unknown TC type in the TCReadoutMap for the TCProcessor"));
638 }
639
640 m_readout_window_map[tc_type] = {
641 readout_type->get_time_before(), readout_type->get_time_after()
642 };
643 }
644 return;
645}
646void
648 std::pair<triggeralgs::timestamp_t, triggeralgs::timestamp_t>> map)
649{
650 TLOG_DEBUG(3) << "MLT TD Readout map:";
651 for (auto const& [key, val] : map) {
652 TLOG_DEBUG(3) << "Type: " << static_cast<int>(key) << ", before: " << val.first << ", after: " << val.second;
653 }
654 return;
655}
656
657void
658TCProcessor::parse_group_links(const nlohmann::json& data)
659{
660 for (auto group : data) {
661 const nlohmann::json& temp_links_data = group["links"];
662 std::vector<dfmessages::SourceID> temp_links;
663 for (auto link : temp_links_data) {
664 temp_links.push_back(
665 dfmessages::SourceID{ daqdataformats::SourceID::string_to_subsystem(link["subsystem"]), link["element"] });
666 }
667 m_group_links.insert({ group["group"], temp_links });
668 }
669 return;
670}
671
672void
674{
675 TLOG_DEBUG(3) << "MLT Group Links:";
676 for (auto const& [key, val] : m_group_links) {
677 TLOG_DEBUG(3) << "Group: " << key;
678 for (auto const& link : val) {
679 TLOG_DEBUG(3) << link;
680 }
681 }
682 TLOG_DEBUG(3) << " ";
683 return;
684}
689{
691 request.component = link;
692 request.window_begin = start;
693 request.window_end = end;
694
695 TLOG_DEBUG(10) << "setting request start: " << request.window_begin;
696 TLOG_DEBUG(10) << "setting request end: " << request.window_end;
697
698 return request;
699}
700
701std::vector<dfmessages::ComponentRequest>
702TCProcessor::create_all_decision_requests(std::vector<dfmessages::SourceID> links,
705{
706 std::vector<dfmessages::ComponentRequest> requests;
707 for (auto link : links) {
708 requests.push_back(create_request_for_link(link, start, end));
709 }
710 return requests;
711}
712
713void
715 std::vector<dfmessages::ComponentRequest> requests)
716{
717 for (auto request : requests) {
718 decision.components.push_back(request);
719 }
720}
721
722void
723TCProcessor::parse_roi_conf(const std::vector<const appmodel::ROIGroupConf*>& data)
724{
725 int counter = 0;
726 float run_sum = 0;
727 for (auto group : data) {
728 roi_group temp_roi_group;
729 temp_roi_group.n_links = group->get_number_of_link_groups();
730 temp_roi_group.prob = group->get_probability();
731 temp_roi_group.time_window = group->get_time_window();
732 temp_roi_group.mode = group->get_groups_selection_mode();
733 m_roi_conf.insert({ counter, temp_roi_group });
734 m_roi_conf_ids.push_back(counter);
735 m_roi_conf_probs.push_back(group->get_probability());
736 run_sum += static_cast<float>(group->get_probability());
737 m_roi_conf_probs_c.push_back(run_sum);
738 counter++;
739 }
740 return;
741}
742
743void
744TCProcessor::print_roi_conf(std::map<int, roi_group> roi_conf)
745{
746 TLOG_DEBUG(3) << "ROI CONF";
747 for (const auto& [key, value] : roi_conf) {
748 TLOG_DEBUG(3) << "ID: " << key;
749 TLOG_DEBUG(3) << "n links: " << value.n_links;
750 TLOG_DEBUG(3) << "prob: " << value.prob;
751 TLOG_DEBUG(3) << "time: " << value.time_window;
752 TLOG_DEBUG(3) << "mode: " << value.mode;
753 }
754 TLOG_DEBUG(3) << " ";
755 return;
756}
757
758float
760{
761 float rnd = (double)rand() / RAND_MAX;
762 return rnd * (limit);
763}
764
765int
767{
768 float rnd_num = get_random_num_float(m_roi_conf_probs_c.back());
769 for (int i = 0; i < static_cast<int>(m_roi_conf_probs_c.size()); i++) {
770 if (rnd_num < m_roi_conf_probs_c[i]) {
771 return i;
772 }
773 }
774 return -1;
775}
776
777int
779{
781 int rnd = rand() % range;
782 return rnd;
783}
784void
786{
787 // Get configuration at random (weighted)
788 int group_pick = pick_roi_group_conf();
789 if (group_pick != -1) {
790 roi_group this_group = m_roi_conf[m_roi_conf_ids[group_pick]];
791 std::vector<dfmessages::SourceID> links;
792
793 // If mode is random, pick groups to request at random
794 if (this_group.mode == "kRandom") {
795 TLOG_DEBUG(10) << "RAND";
796 std::set<int> groups;
797 while (static_cast<int>(groups.size()) < this_group.n_links) {
798 groups.insert(get_random_num_int());
799 }
800 for (auto r_id : groups) {
801 links.insert(links.end(), m_group_links[r_id].begin(), m_group_links[r_id].end());
802 }
803 // Otherwise, read sequntially by IDs, starting at 0
804 } else {
805 TLOG_DEBUG(10) << "SEQ";
806 int r_id = 0;
807 while (r_id < this_group.n_links) {
808 links.insert(links.end(), m_group_links[r_id].begin(), m_group_links[r_id].end());
809 r_id++;
810 }
811 }
812
813 TLOG_DEBUG(10) << "TD timestamp: " << decision.trigger_timestamp;
814 TLOG_DEBUG(10) << "group window: " << this_group.time_window;
815
816 // Once the components are prepared, create requests and append them to decision
817 std::vector<dfmessages::ComponentRequest> requests =
819 decision.trigger_timestamp + this_group.time_window);
820 add_requests_to_decision(decision, requests);
821 links.clear();
822 }
823 return;
824}
825
828{
829 // get only unique types
830 std::vector<int> tc_types;
831 for (auto tc : ready_td.contributing_tcs) {
832 tc_types.push_back(static_cast<int>(tc.type));
833 }
834 tc_types.erase(std::unique(tc_types.begin(), tc_types.end()), tc_types.end());
835
836 // form TD bitword
837 TDBitset td_bitword;
838 for (auto tc_type : tc_types) {
839 td_bitword.set(tc_type);
840 }
841 return td_bitword;
842}
843
844void
846{
847 TLOG() << "TCProcessor opmon counters summary:";
848 TLOG() << "------------------------------";
849 TLOG() << "TDs created: \t\t\t" << m_tds_created_count << " \t(" << m_tds_created_tc_count << " TCs)";
850 TLOG() << "TDs sent: \t\t\t" << m_tds_sent_count << " \t(" << m_tds_sent_tc_count << " TCs)";
851 TLOG() << "TDs dropped: \t\t\t" << m_tds_dropped_count << " \t(" << m_tds_dropped_tc_count << " TCs)";
852 TLOG() << "TDs failed bitword check: \t" << m_tds_failed_bitword_count << " \t(" << m_tds_failed_bitword_tc_count << " TCs)";
853 TLOG() << "TDs cleared: \t\t\t" << m_tds_cleared_count << " \t(" << m_tds_cleared_tc_count << " TCs)";
854 TLOG() << "------------------------------";
855 TLOG() << "TCs received: \t" << m_tc_received_count;
856 TLOG() << "TCs ignored: \t" << m_tc_ignored_count;
857 TLOG();
858}
859
860} // namespace fdreadoutlibs
861} // namespace dunedaq
#define ERS_HERE
#define DUNE_DAQ_TYPESTRING(Type, typestring)
const dunedaq::appmodel::DataProcessor * get_data_processor() const
Get "data_processor" relationship value.
const dunedaq::appmodel::DataHandlerConf * get_module_configuration() const
Get "module_configuration" relationship value.
const TARGET * cast() const noexcept
Casts object to different class.
void conf(const appmodel::DataHandlerModule *conf) override
static constexpr timeout_t s_no_block
Definition Sender.hpp:26
void publish(google::protobuf::Message &&, CustomOrigin &&co={}, OpMonLevel l=to_level(EntryOpMonLevel::kDefault)) const noexcept
void update_latency_out(uint64_t latency)
Definition Latency.hpp:43
latency get_latency_in() const
Definition Latency.hpp:48
latency get_latency_out() const
Definition Latency.hpp:53
void update_latency_in(uint64_t latency)
Definition Latency.hpp:38
std::vector< const appmodel::TCReadoutMap * > m_readout_window_map_data
std::atomic< metric_counter_type > m_tc_ignored_count
std::atomic< metric_counter_type > m_tds_dropped_count
void stop(const nlohmann::json &args) override
Stop operation.
bool check_trigger_type_ignore(unsigned int tc_type)
void call_tc_decision(const PendingTD &pending_td)
dunedaq::trigger::Latency m_latency_instance
std::vector< unsigned int > m_ignored_tc_types
std::map< int, std::vector< daqdataformats::SourceID > > m_group_links
void print_roi_conf(std::map< int, roi_group > roi_conf)
TDBitset get_TD_bitword(const PendingTD &ready_td) const
void parse_roi_conf(const std::vector< const appmodel::ROIGroupConf * > &data)
std::thread m_send_trigger_decisions_thread
void scrap(const nlohmann::json &args) override
Unconfigure.
std::atomic< metric_counter_type > m_tds_failed_bitword_tc_count
void add_requests_to_decision(dfmessages::TriggerDecision &decision, std::vector< dfmessages::ComponentRequest > requests)
std::shared_ptr< iomanager::SenderConcept< dfmessages::TriggerDecision > > m_td_sink
dfmessages::TriggerDecision create_decision(const PendingTD &pending_td)
std::vector< dfmessages::ComponentRequest > create_all_decision_requests(std::vector< daqdataformats::SourceID > links, triggeralgs::timestamp_t start, triggeralgs::timestamp_t end)
std::atomic< metric_counter_type > m_tds_cleared_count
void conf(const appmodel::DataHandlerModule *conf) override
Set the emulator mode, if active, timestamps of processed packets are overwritten with new ones.
std::atomic< bool > m_tc_merging
bool check_trigger_bitwords(const TDBitset &td_bitword) const
void roi_readout_make_requests(dfmessages::TriggerDecision &decision)
std::map< int, roi_group > m_roi_conf
std::atomic< metric_counter_type > m_tds_sent_tc_count
void print_readout_map(std::map< TCType, std::pair< triggeralgs::timestamp_t, triggeralgs::timestamp_t > > map)
void make_td(const TCWrapper *tc)
void generate_opmon_data() override
std::atomic< metric_counter_type > m_tds_cleared_tc_count
void set_trigger_bitwords(const std::vector< const appmodel::TriggerBitword * > &_bitwords)
bool check_td_readout_length(const PendingTD &)
std::atomic< metric_counter_type > m_tds_dropped_tc_count
std::atomic< metric_counter_type > m_tds_failed_bitword_count
std::vector< int > m_roi_conf_ids
std::vector< PendingTD > get_ready_tds(std::vector< PendingTD > &pending_tds)
std::vector< float > m_roi_conf_probs_c
std::vector< PendingTD > m_pending_tds
std::atomic< metric_counter_type > m_tc_received_count
std::atomic< metric_counter_type > m_tds_created_tc_count
void start(const nlohmann::json &args) override
Start operation.
bool m_ignore_tc_pileup
Ignore TCs that overlap with already made TD.
dfmessages::ComponentRequest create_request_for_link(daqdataformats::SourceID link, triggeralgs::timestamp_t start, triggeralgs::timestamp_t end)
std::vector< const appmodel::ROIGroupConf * > m_roi_conf_data
std::atomic< bool > m_running_flag
void parse_readout_map(const std::vector< const appmodel::TCReadoutMap * > &data)
std::condition_variable m_cv
void add_tc(const triggeralgs::TriggerCandidate tc)
std::atomic< metric_counter_type > m_tds_sent_count
std::vector< float > m_roi_conf_probs
std::atomic< bool > m_latency_monitoring
bool check_overlap(const triggeralgs::TriggerCandidate &tc, const PendingTD &pending_td)
void parse_group_links(const nlohmann::json &data)
std::atomic< metric_counter_type > m_tds_created_count
float get_random_num_float(float limit)
std::vector< TDBitset > m_trigger_bitwords
void add_tc_ignored(const triggeralgs::TriggerCandidate tc)
std::atomic< bool > m_send_timed_out_tds
std::vector< daqdataformats::SourceID > m_mandatory_links
int get_earliest_tc_index(const PendingTD &pending_td)
std::map< TCType, std::pair< triggeralgs::timestamp_t, triggeralgs::timestamp_t > > m_readout_window_map
Base class for any user define issue.
Definition Issue.hpp:69
#define TLVL_ENTER_EXIT_METHODS
#define TLOG_DEBUG(lvl,...)
Definition Logging.hpp:112
#define TLOG(...)
Definition macro.hpp:22
@ kLocalized
Local readout, send Fragments to dataflow.
daqdataformats::trigger_type_t trigger_type_t
Copy daqdataformats::trigger_type_t.
Definition Types.hpp:45
int string_to_trigger_candidate_type(const std::string &name)
Including Qt Headers.
DAC value out of range
Message.
Definition DACNode.hpp:32
static std::shared_ptr< iomanager::SenderConcept< Datatype > > get_iom_sender(iomanager::ConnectionId const &id)
void warning(const Issue &issue)
Definition ers.hpp:115
void error(const Issue &issue)
Definition ers.hpp:81
dunedaq::trgdataformats::timestamp_t timestamp_t
Definition Types.hpp:16
A request sent to a Component, including the SourceID of the component and the window offset and widt...
SourceID component
The Requested Component.
SourceID is a generalized representation of the source of a piece of data in the DAQ....
Definition SourceID.hpp:32
static Subsystem string_to_subsystem(const std::string &typestring)
Definition SourceID.hxx:106
A message containing information about a Trigger from Data Selection (or a TriggerDecisionEmulator)
std::vector< ComponentRequest > components
The DAQ components which should be read out to create the TriggerRecord.
ReadoutType readout_type
The type of readout to use (i.e. where to route data)
run_number_t run_number
The current run number.
trigger_number_t trigger_number
The trigger number assigned to this TriggerDecision.
timestamp_t trigger_timestamp
The DAQ timestamp.
trigger_type_t trigger_type
The type of the trigger.
std::vector< triggeralgs::TriggerCandidate > contributing_tcs
triggeralgs::TriggerCandidate candidate
Definition TCWrapper.hpp:23
Factory couldn t std::string alg_name InvalidConfiguration
Definition Issues.hpp:33