DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
dunedaq::trigger::TCProcessor Class Reference

#include <TCProcessor.hpp>

Inheritance diagram for dunedaq::trigger::TCProcessor:
[legend]
Collaboration diagram for dunedaq::trigger::TCProcessor:
[legend]

Classes

struct  PendingTD
 
struct  roi_group
 

Public Types

using inherited = datahandlinglibs::TaskRawDataProcessorModel<TCWrapper>
 
using tcptr = TCWrapper*
 
using consttcptr = const TCWrapper*
 
using TCType = triggeralgs::TriggerCandidate::Type
 
using TDBitset = std::bitset<64>
 
- Public Types inherited from dunedaq::opmonlib::MonitorableObject
using NodePtr = std::weak_ptr<MonitorableObject>
 
using NewNodePtr = std::shared_ptr<MonitorableObject>
 
using ElementId = std::string
 

Public Member Functions

 TCProcessor (std::unique_ptr< datahandlinglibs::FrameErrorRegistry > &error_registry, bool post_processing_enabled)
 
 ~TCProcessor ()
 
void start (const nlohmann::json &args) override
 Start operation.
 
void stop (const nlohmann::json &args) override
 Stop operation.
 
void conf (const appmodel::DataHandlerModule *conf) override
 Set the emulator mode, if active, timestamps of processed packets are overwritten with new ones.
 
void scrap (const nlohmann::json &args) override
 Unconfigure.
 
void generate_opmon_data () override
 
- Public Member Functions inherited from dunedaq::datahandlinglibs::TaskRawDataProcessorModel< TCWrapper >
 TaskRawDataProcessorModel (std::unique_ptr< FrameErrorRegistry > &error_registry, bool post_processing_enabled)
 
 ~TaskRawDataProcessorModel ()
 
void reset_last_daq_time ()
 
std::uint64_t get_last_daq_time () override
 Get newest timestamp of last seen packet.
 
void preprocess_item (TCWrapper *item) override
 Preprocess one element.
 
void postprocess_item (const TCWrapper *item) override
 Postprocess one element.
 
void add_preprocess_task (Task &&task)
 
void add_postprocess_task (Task &&task)
 
void invoke_all_preprocess_functions (TCWrapper *item)
 
void launch_all_preprocess_functions (TCWrapper *item)
 
- Public Member Functions inherited from dunedaq::datahandlinglibs::RawDataProcessorConcept< TCWrapper >
 RawDataProcessorConcept ()
 
 RawDataProcessorConcept (const RawDataProcessorConcept &)=delete
 RawDataProcessorConcept is not copy-constructible.
 
 RawDataProcessorConcept (RawDataProcessorConcept &&)=delete
 RawDataProcessorConcept is not move-constructible.
 
virtual ~RawDataProcessorConcept ()
 
RawDataProcessorConceptoperator= (const RawDataProcessorConcept &)=delete
 RawDataProcessorConcept is not copy-assginable.
 
RawDataProcessorConceptoperator= (RawDataProcessorConcept &&)=delete
 RawDataProcessorConcept is not move-assignable.
 
- Public Member Functions inherited from dunedaq::opmonlib::MonitorableObject
 MonitorableObject (const MonitorableObject &)=delete
 
MonitorableObjectoperator= (const MonitorableObject &)=delete
 
 MonitorableObject (MonitorableObject &&)=delete
 
MonitorableObjectoperator= (MonitorableObject &&)=delete
 
virtual ~MonitorableObject ()=default
 
auto get_opmon_id () const noexcept
 
auto get_opmon_level () const noexcept
 

Protected Member Functions

void make_td (const TCWrapper *tc)
 
- Protected Member Functions inherited from dunedaq::datahandlinglibs::TaskRawDataProcessorModel< TCWrapper >
void run_post_processing_thread (std::function< void(const TCWrapper *)> &function, folly::ProducerConsumerQueue< const TCWrapper * > &queue)
 
- Protected Member Functions inherited from dunedaq::opmonlib::MonitorableObject
 MonitorableObject ()=default
 
void register_node (ElementId name, NewNodePtr)
 
void publish (google::protobuf::Message &&, CustomOrigin &&co={}, OpMonLevel l=to_level(EntryOpMonLevel::kDefault)) const noexcept
 

Private Types

using metric_counter_type = uint64_t
 

Private Member Functions

void send_trigger_decisions ()
 
void parse_group_links (const nlohmann::json &data)
 
void print_group_links ()
 
dfmessages::ComponentRequest create_request_for_link (daqdataformats::SourceID link, triggeralgs::timestamp_t start, triggeralgs::timestamp_t end)
 
std::vector< dfmessages::ComponentRequestcreate_all_decision_requests (std::vector< daqdataformats::SourceID > links, triggeralgs::timestamp_t start, triggeralgs::timestamp_t end)
 
void add_requests_to_decision (dfmessages::TriggerDecision &decision, std::vector< dfmessages::ComponentRequest > requests)
 
void parse_roi_conf (const std::vector< const appmodel::ROIGroupConf * > &data)
 
void print_roi_conf (std::map< int, roi_group > roi_conf)
 
float get_random_num_float (float limit)
 
int get_random_num_int ()
 
int pick_roi_group_conf ()
 
void roi_readout_make_requests (dfmessages::TriggerDecision &decision)
 
void add_tc (const triggeralgs::TriggerCandidate tc)
 
void add_tc_ignored (const triggeralgs::TriggerCandidate tc)
 
void call_tc_decision (const PendingTD &pending_td)
 
bool check_overlap (const triggeralgs::TriggerCandidate &tc, const PendingTD &pending_td)
 
bool check_td_readout_length (const PendingTD &)
 
void clear_td_vectors ()
 
std::vector< PendingTDget_ready_tds (std::vector< PendingTD > &pending_tds)
 
int get_earliest_tc_index (const PendingTD &pending_td)
 
TDBitset get_TD_bitword (const PendingTD &ready_td) const
 
void print_trigger_bitwords ()
 
bool check_trigger_bitwords (const TDBitset &td_bitword) const
 
void set_trigger_bitwords (const std::vector< const appmodel::TriggerBitword * > &_bitwords)
 
void parse_readout_map (const std::vector< const appmodel::TCReadoutMap * > &data)
 
void print_readout_map (std::map< TCType, std::pair< triggeralgs::timestamp_t, triggeralgs::timestamp_t > > map)
 
dfmessages::TriggerDecision create_decision (const PendingTD &pending_td)
 
bool check_trigger_type_ignore (unsigned int tc_type)
 
void print_opmon_stats ()
 

Private Attributes

std::thread m_send_trigger_decisions_thread
 
std::vector< daqdataformats::SourceIDm_mandatory_links
 
std::map< int, std::vector< daqdataformats::SourceID > > m_group_links
 
nlohmann::json m_group_links_data
 
int m_total_group_links
 
bool m_use_roi_readout
 
std::map< int, roi_groupm_roi_conf
 
std::vector< const appmodel::ROIGroupConf * > m_roi_conf_data
 
std::vector< int > m_roi_conf_ids
 
std::vector< float > m_roi_conf_probs
 
std::vector< float > m_roi_conf_probs_c
 
int m_repeat_trigger_count { 1 }
 
std::atomic< bool > m_tc_merging
 
bool m_ignore_tc_pileup
 Ignore TCs that overlap with already made TD.
 
dfmessages::trigger_number_t m_last_trigger_number
 
dfmessages::run_number_t m_run_number
 
std::atomic< bool > m_running_flag { false }
 
std::vector< PendingTDm_pending_tds
 
std::mutex m_td_vector_mutex
 
std::condition_variable m_cv
 
int64_t m_buffer_timeout
 
int64_t m_td_readout_limit
 
std::atomic< bool > m_send_timed_out_tds
 
int m_earliest_tc_index
 
bool m_use_bitwords
 
std::vector< TDBitsetm_trigger_bitwords
 
bool m_use_readout_map
 
std::vector< const appmodel::TCReadoutMap * > m_readout_window_map_data
 
std::map< TCType, std::pair< triggeralgs::timestamp_t, triggeralgs::timestamp_t > > m_readout_window_map
 
dfmessages::trigger_type_t m_trigger_type_shifted
 
std::vector< unsigned int > m_ignored_tc_types
 
bool m_ignoring_tc_types
 
std::shared_ptr< iomanager::SenderConcept< dfmessages::TriggerDecision > > m_td_sink
 
std::atomic< metric_counter_typem_tds_created_count { 0 }
 
std::atomic< metric_counter_typem_tds_sent_count { 0 }
 
std::atomic< metric_counter_typem_tds_dropped_count { 0 }
 
std::atomic< metric_counter_typem_tds_failed_bitword_count { 0 }
 
std::atomic< metric_counter_typem_tds_cleared_count { 0 }
 
std::atomic< metric_counter_typem_tc_received_count { 0 }
 
std::atomic< metric_counter_typem_tds_created_tc_count { 0 }
 
std::atomic< metric_counter_typem_tds_sent_tc_count { 0 }
 
std::atomic< metric_counter_typem_tds_dropped_tc_count { 0 }
 
std::atomic< metric_counter_typem_tds_failed_bitword_tc_count { 0 }
 
std::atomic< metric_counter_typem_tds_cleared_tc_count { 0 }
 
std::atomic< metric_counter_typem_tc_ignored_count { 0 }
 
std::atomic< bool > m_latency_monitoring { false }
 
dunedaq::trigger::Latency m_latency_instance
 
std::atomic< metric_counter_typem_latency_in { 0 }
 
std::atomic< metric_counter_typem_latency_out { 0 }
 

Additional Inherited Members

- Static Public Member Functions inherited from dunedaq::opmonlib::MonitorableObject
static bool publishable_metric (OpMonLevel entry, OpMonLevel system) noexcept
 
- Protected Attributes inherited from dunedaq::datahandlinglibs::TaskRawDataProcessorModel< TCWrapper >
std::vector< std::function< void(TCWrapper *)> > m_preprocess_functions
 
std::unique_ptr< FrameErrorRegistry > & m_error_registry
 
bool m_post_processing_enabled
 
std::atomic< bool > m_run_marker
 
std::vector< std::function< void(const TCWrapper *)> > m_post_process_functions
 
std::vector< std::unique_ptr< folly::ProducerConsumerQueue< const TCWrapper * > > > m_items_to_postprocess_queues
 
std::vector< std::unique_ptr< utilities::ReusableThread > > m_post_process_threads
 
size_t m_postprocess_queue_sizes
 
daqdataformats::SourceID m_sourceid
 
std::atomic< uint64_t > m_last_processed_daq_ts
 

Detailed Description

Definition at line 40 of file TCProcessor.hpp.

Member Typedef Documentation

◆ consttcptr

Definition at line 46 of file TCProcessor.hpp.

◆ inherited

◆ metric_counter_type

Definition at line 179 of file TCProcessor.hpp.

◆ tcptr

Definition at line 45 of file TCProcessor.hpp.

◆ TCType

◆ TDBitset

using dunedaq::trigger::TCProcessor::TDBitset = std::bitset<64>

Definition at line 48 of file TCProcessor.hpp.

Constructor & Destructor Documentation

◆ TCProcessor()

dunedaq::trigger::TCProcessor::TCProcessor ( std::unique_ptr< datahandlinglibs::FrameErrorRegistry > & error_registry,
bool post_processing_enabled )
explicit

Definition at line 32 of file TCProcessor.cpp.

33 : datahandlinglibs::TaskRawDataProcessorModel<TCWrapper>(error_registry, post_processing_enabled)
34{
35}

◆ ~TCProcessor()

dunedaq::trigger::TCProcessor::~TCProcessor ( )

Definition at line 37 of file TCProcessor.cpp.

38{}

Member Function Documentation

◆ add_requests_to_decision()

void dunedaq::trigger::TCProcessor::add_requests_to_decision ( dfmessages::TriggerDecision & decision,
std::vector< dfmessages::ComponentRequest > requests )
private

Definition at line 714 of file TCProcessor.cpp.

716{
717 for (auto request : requests) {
718 decision.components.push_back(request);
719 }
720}

◆ add_tc()

void dunedaq::trigger::TCProcessor::add_tc ( const triggeralgs::TriggerCandidate tc)
private

Definition at line 401 of file TCProcessor.cpp.

402{
403 bool tc_dealt = false;
404 int64_t tc_wallclock_arrived =
405 std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
406
408
409 for (std::vector<PendingTD>::iterator it = m_pending_tds.begin(); it != m_pending_tds.end();) {
410 // Don't deal with TC here if there's no overlap
411 if (!check_overlap(tc, *it)) {
412 it++;
413 continue;
414 }
415
416 // If overlap and ignoring, we drop the TC and flag it as dealt with.
417 if (m_ignore_tc_pileup) {
419 tc_dealt = true;
420 TLOG_DEBUG(3) << "TC overlapping with a previous TD, dropping!";
421 break;
422 }
423
424 // If we're here, TC merging must be on, in which case we're actually
425 // going to merge the TC into the TD.
426 it->contributing_tcs.push_back(tc);
427 if ( (m_use_readout_map) && (m_readout_window_map.count(tc.type)) ){
428 TLOG_DEBUG(3) << "TC with start/end times " << tc.time_candidate - m_readout_window_map[tc.type].first << "/"
429 << tc.time_candidate + m_readout_window_map[tc.type].second
430 << " overlaps with pending TD with start/end times " << it->readout_start << "/"
431 << it->readout_end;
432 it->readout_start = ((tc.time_candidate - m_readout_window_map[tc.type].first) >= it->readout_start)
433 ? it->readout_start
434 : (tc.time_candidate - m_readout_window_map[tc.type].first);
435 it->readout_end = ((tc.time_candidate + m_readout_window_map[tc.type].second) >= it->readout_end)
436 ? (tc.time_candidate + m_readout_window_map[tc.type].second)
437 : it->readout_end;
438 } else {
439 TLOG_DEBUG(3) << "TC with start/end times " << tc.time_start << "/" << tc.time_end
440 << " overlaps with pending TD with start/end times " << it->readout_start << "/"
441 << it->readout_end;
442 it->readout_start = (tc.time_start >= it->readout_start) ? it->readout_start : tc.time_start;
443 it->readout_end = (tc.time_end >= it->readout_end) ? tc.time_end : it->readout_end;
444 }
445 it->walltime_expiration = tc_wallclock_arrived + m_buffer_timeout;
446 tc_dealt = true;
447 break;
448 }
449 }
450
451 // Don't do anything else if we've already dealt with the TC
452 if (tc_dealt) {
453 return;
454 }
455
456 // Create a new TD out of the TC
457 PendingTD td_candidate;
458 td_candidate.contributing_tcs.push_back(tc);
459 if ( (m_use_readout_map) && (m_readout_window_map.count(tc.type)) ){
460 td_candidate.readout_start = tc.time_candidate - m_readout_window_map[tc.type].first;
461 td_candidate.readout_end = tc.time_candidate + m_readout_window_map[tc.type].second;
462 } else {
463 td_candidate.readout_start = tc.time_start;
464 td_candidate.readout_end = tc.time_end;
465 }
466 td_candidate.walltime_expiration = tc_wallclock_arrived + m_buffer_timeout;
467 m_pending_tds.push_back(td_candidate);
468}
std::atomic< bool > m_tc_merging
std::atomic< metric_counter_type > m_tds_dropped_tc_count
std::vector< PendingTD > m_pending_tds
bool m_ignore_tc_pileup
Ignore TCs that overlap with already made TD.
bool check_overlap(const triggeralgs::TriggerCandidate &tc, const PendingTD &pending_td)
std::map< TCType, std::pair< triggeralgs::timestamp_t, triggeralgs::timestamp_t > > m_readout_window_map
#define TLOG_DEBUG(lvl,...)
Definition Logging.hpp:112

◆ add_tc_ignored()

void dunedaq::trigger::TCProcessor::add_tc_ignored ( const triggeralgs::TriggerCandidate tc)
private

Definition at line 471 of file TCProcessor.cpp.

472{
473 for (std::vector<PendingTD>::iterator it = m_pending_tds.begin(); it != m_pending_tds.end();) {
474 if (check_overlap(tc, *it)) {
475 if ( (m_use_readout_map) && (m_readout_window_map.count(tc.type)) ) {
476 TLOG_DEBUG(3) << "!Ignored! TC with start/end times " << tc.time_candidate - m_readout_window_map[tc.type].first
477 << "/" << tc.time_candidate + m_readout_window_map[tc.type].second
478 << " overlaps with pending TD with start/end times " << it->readout_start << "/"
479 << it->readout_end;
480 } else {
481 TLOG_DEBUG(3) << "!Ignored! TC with start/end times " << tc.time_start << "/" << tc.time_end
482 << " overlaps with pending TD with start/end times " << it->readout_start << "/"
483 << it->readout_end;
484 }
485 it->contributing_tcs.push_back(tc);
486 break;
487 }
488 ++it;
489 }
490 return;
491}

◆ call_tc_decision()

void dunedaq::trigger::TCProcessor::call_tc_decision ( const PendingTD & pending_td)
private

Definition at line 369 of file TCProcessor.cpp.

370{
371
372 if (m_use_bitwords) {
373 // Check trigger bitwords
374 TDBitset td_bitword = get_TD_bitword(pending_td);
375 if (!check_trigger_bitwords(td_bitword)) {
376 // Don't process further if the bitword check failed
378 m_tds_failed_bitword_tc_count += pending_td.contributing_tcs.size();
379 return;
380 }
381 }
382
383 dfmessages::TriggerDecision decision = create_decision(pending_td);
384 auto tn = decision.trigger_number;
385 auto td_ts = decision.trigger_timestamp;
386
387 if (m_latency_monitoring.load()) m_latency_instance.update_latency_out( pending_td.contributing_tcs.front().time_start );
388 if(!m_td_sink->try_send(std::move(decision), iomanager::Sender::s_no_block)) {
389 ers::warning(TDDropped(ERS_HERE, tn, td_ts));
391 m_tds_dropped_tc_count += pending_td.contributing_tcs.size();
392 }
393 else {
395 m_tds_sent_tc_count += pending_td.contributing_tcs.size();
396 }
397}
#define ERS_HERE
static constexpr timeout_t s_no_block
Definition Sender.hpp:26
void update_latency_out(uint64_t latency)
Definition Latency.hpp:43
std::atomic< metric_counter_type > m_tds_dropped_count
dunedaq::trigger::Latency m_latency_instance
TDBitset get_TD_bitword(const PendingTD &ready_td) const
std::atomic< metric_counter_type > m_tds_failed_bitword_tc_count
std::shared_ptr< iomanager::SenderConcept< dfmessages::TriggerDecision > > m_td_sink
dfmessages::TriggerDecision create_decision(const PendingTD &pending_td)
bool check_trigger_bitwords(const TDBitset &td_bitword) const
std::atomic< metric_counter_type > m_tds_sent_tc_count
std::atomic< metric_counter_type > m_tds_failed_bitword_count
std::atomic< metric_counter_type > m_tds_sent_count
std::atomic< bool > m_latency_monitoring
void warning(const Issue &issue)
Definition ers.hpp:115

◆ check_overlap()

bool dunedaq::trigger::TCProcessor::check_overlap ( const triggeralgs::TriggerCandidate & tc,
const PendingTD & pending_td )
private

Definition at line 494 of file TCProcessor.cpp.

495{
496 if ( (m_use_readout_map) && (m_readout_window_map.count(tc.type)) ) {
497 return !(((tc.time_candidate + m_readout_window_map[tc.type].second) < pending_td.readout_start) ||
498 ((tc.time_candidate - m_readout_window_map[tc.type].first > pending_td.readout_end)));
499 } else {
500 return !((tc.time_end < pending_td.readout_start) || (tc.time_start > pending_td.readout_end));
501 }
502}

◆ check_td_readout_length()

bool dunedaq::trigger::TCProcessor::check_td_readout_length ( const PendingTD & pending_td)
private

Definition at line 544 of file TCProcessor.cpp.

545{
546 bool td_too_long = false;
547 if (static_cast<int64_t>(pending_td.readout_end - pending_td.readout_start) >= m_td_readout_limit) {
548 td_too_long = true;
549 TLOG_DEBUG(3) << "Too long readout window: " << (pending_td.readout_end - pending_td.readout_start)
550 << ", sending immediate TD!";
551 }
552 return td_too_long;
553}

◆ check_trigger_bitwords()

bool dunedaq::trigger::TCProcessor::check_trigger_bitwords ( const TDBitset & td_bitword) const
private

Definition at line 594 of file TCProcessor.cpp.

595{
596 bool trigger_check = false;
597 for (const auto& bitword : m_trigger_bitwords) {
598 TLOG_DEBUG(15) << "TD word: " << td_bitword << ", bitword: " << bitword;
599 trigger_check = ((td_bitword & bitword) == bitword);
600 TLOG_DEBUG(15) << "&: " << (td_bitword & bitword);
601 TLOG_DEBUG(15) << "trigger?: " << trigger_check;
602 if (trigger_check == true)
603 break;
604 }
605 return trigger_check;
606}
std::vector< TDBitset > m_trigger_bitwords

◆ check_trigger_type_ignore()

bool dunedaq::trigger::TCProcessor::check_trigger_type_ignore ( unsigned int tc_type)
private

Definition at line 571 of file TCProcessor.cpp.

572{
573 bool ignore = false;
574 for (std::vector<unsigned int>::iterator it = m_ignored_tc_types.begin(); it != m_ignored_tc_types.end();) {
575 if (tc_type == *it) {
576 ignore = true;
577 break;
578 }
579 ++it;
580 }
581 return ignore;
582}
std::vector< unsigned int > m_ignored_tc_types

◆ clear_td_vectors()

void dunedaq::trigger::TCProcessor::clear_td_vectors ( )
private

Definition at line 556 of file TCProcessor.cpp.

557{
558 std::lock_guard<std::mutex> lock(m_td_vector_mutex);
560 // Use std::accumulate to sum up the sizes of all contributing_tcs vectors
561 size_t tds_cleared_tc_count = std::accumulate(
562 m_pending_tds.begin(), m_pending_tds.end(), 0,
563 [](size_t sum, const PendingTD& ptd) {
564 return sum + ptd.contributing_tcs.size();
565 }
566 );
567 m_tds_cleared_tc_count += tds_cleared_tc_count;
568 m_pending_tds.clear();
569}
std::atomic< metric_counter_type > m_tds_cleared_count
std::atomic< metric_counter_type > m_tds_cleared_tc_count

◆ conf()

void dunedaq::trigger::TCProcessor::conf ( const appmodel::DataHandlerModule * conf)
overridevirtual

Set the emulator mode, if active, timestamps of processed packets are overwritten with new ones.

Reimplemented from dunedaq::datahandlinglibs::TaskRawDataProcessorModel< TCWrapper >.

Definition at line 96 of file TCProcessor.cpp.

97{
98 TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << "TCProcessor: Entering conf() method";
99
100 auto mtrg = cfg->cast<appmodel::TriggerDataHandlerModule>();
101 if (mtrg == nullptr) {
102 throw(InvalidConfiguration(ERS_HERE, "Provided null TriggerDataHandlerModule configuration!"));
103 }
104 for (auto output : mtrg->get_outputs()) {
105 try {
106 if (output->get_data_type() == "TriggerDecision") {
108 }
109 } catch (const ers::Issue& excpt) {
110 ers::error(datahandlinglibs::ResourceQueueError(ERS_HERE, "td", "DefaultRequestHandlerModel", excpt));
111 }
112 }
113
114 auto dp = mtrg->get_module_configuration()->get_data_processor();
115 auto proc_conf = dp->cast<appmodel::TCDataProcessor>();
116
117 // Add all Source IDs to mandatoy links for now...
118 for(auto const& link : mtrg->get_mandatory_source_ids()){
119 m_mandatory_links.push_back(
122 link->get_sid()});
123 }
124 for(auto const& link : mtrg->get_enabled_source_ids()){
125 m_mandatory_links.push_back(
128 link->get_sid()});
129 }
130
131 // TODO: Group links!
132 //m_group_links_data = conf->get_groups_links();
136 TLOG_DEBUG(3) << "Total group links: " << m_total_group_links;
137
138 m_tc_merging = proc_conf->get_merge_overlapping_tcs();
139 m_ignore_tc_pileup = proc_conf->get_ignore_overlapping_tcs();
140 m_buffer_timeout = proc_conf->get_buffer_timeout();
141 m_send_timed_out_tds = (m_ignore_tc_pileup) ? false : proc_conf->get_td_out_of_timeout();
142 m_td_readout_limit = proc_conf->get_td_readout_limit();
143 m_ignored_tc_types = proc_conf->get_ignore_tc();
145
146 // Trigger bitwords
147 std::vector<const appmodel::TriggerBitword*> bitwords = proc_conf->get_trigger_bitwords();
148 m_use_bitwords = !bitwords.empty();
149 if(m_use_bitwords){
150 set_trigger_bitwords(bitwords);
152 }
153 TLOG_DEBUG(3) << "Use bitwords: " << m_use_bitwords;
154 TLOG_DEBUG(3) << "Allow merging: " << m_tc_merging;
155 TLOG_DEBUG(3) << "Ignore pileup: " << m_ignore_tc_pileup;
156 TLOG_DEBUG(3) << "Buffer timeout: " << m_buffer_timeout;
157 TLOG_DEBUG(3) << "Should send timed out TDs: " << m_send_timed_out_tds;
158 TLOG_DEBUG(3) << "TD readout limit: " << m_td_readout_limit;
159
160 // ROI map
161 m_roi_conf_data = proc_conf->get_roi_group_conf();
163 if (m_use_roi_readout) {
166 }
167 TLOG_DEBUG(3) << "Use ROI readout?: " << m_use_roi_readout;
168
169 // Custom readout map
170 m_readout_window_map_data = proc_conf->get_tc_readout_map();
172 if (m_use_readout_map) {
175 }
176 TLOG_DEBUG(3) << "Use readout map: " << m_use_readout_map;
177
178 // Ignoring TC types
179 TLOG_DEBUG(3) << "Ignoring TC types: " << m_ignoring_tc_types;
181 TLOG_DEBUG(3) << "TC types to ignore: ";
182 for (std::vector<unsigned int>::iterator it = m_ignored_tc_types.begin(); it != m_ignored_tc_types.end();) {
183 TLOG_DEBUG(3) << *it;
184 ++it;
185 }
186 }
187 m_latency_monitoring.store( dp->get_latency_monitoring() );
188 inherited::add_postprocess_task(std::bind(&TCProcessor::make_td, this, std::placeholders::_1));
189
190 inherited::conf(mtrg);
191
192 TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << "TCProcessor: Exiting conf() method";
193}
void conf(const appmodel::DataHandlerModule *conf) override
std::vector< const appmodel::TCReadoutMap * > m_readout_window_map_data
std::map< int, std::vector< daqdataformats::SourceID > > m_group_links
void print_roi_conf(std::map< int, roi_group > roi_conf)
void parse_roi_conf(const std::vector< const appmodel::ROIGroupConf * > &data)
std::map< int, roi_group > m_roi_conf
void print_readout_map(std::map< TCType, std::pair< triggeralgs::timestamp_t, triggeralgs::timestamp_t > > map)
void make_td(const TCWrapper *tc)
void set_trigger_bitwords(const std::vector< const appmodel::TriggerBitword * > &_bitwords)
std::vector< const appmodel::ROIGroupConf * > m_roi_conf_data
void parse_readout_map(const std::vector< const appmodel::TCReadoutMap * > &data)
void parse_group_links(const nlohmann::json &data)
std::atomic< bool > m_send_timed_out_tds
std::vector< daqdataformats::SourceID > m_mandatory_links
Base class for any user define issue.
Definition Issue.hpp:69
#define TLVL_ENTER_EXIT_METHODS
daqdataformats::SourceID SourceID
Copy daqdataformats::SourceID.
Definition Types.hpp:32
static std::shared_ptr< iomanager::SenderConcept< Datatype > > get_iom_sender(iomanager::ConnectionId const &id)
void error(const Issue &issue)
Definition ers.hpp:81
static Subsystem string_to_subsystem(const std::string &typestring)
Definition SourceID.hxx:106
Factory couldn t std::string alg_name InvalidConfiguration
Definition Issues.hpp:33

◆ create_all_decision_requests()

std::vector< dfmessages::ComponentRequest > dunedaq::trigger::TCProcessor::create_all_decision_requests ( std::vector< daqdataformats::SourceID > links,
triggeralgs::timestamp_t start,
triggeralgs::timestamp_t end )
private

Definition at line 702 of file TCProcessor.cpp.

705{
706 std::vector<dfmessages::ComponentRequest> requests;
707 for (auto link : links) {
708 requests.push_back(create_request_for_link(link, start, end));
709 }
710 return requests;
711}
void start(const nlohmann::json &args) override
Start operation.
dfmessages::ComponentRequest create_request_for_link(daqdataformats::SourceID link, triggeralgs::timestamp_t start, triggeralgs::timestamp_t end)

◆ create_decision()

dfmessages::TriggerDecision dunedaq::trigger::TCProcessor::create_decision ( const PendingTD & pending_td)
private

Definition at line 297 of file TCProcessor.cpp.

298{
300 TLOG_DEBUG(5) << "earliest TC index: " << m_earliest_tc_index;
301
302 if (pending_td.contributing_tcs.size() > 1) {
303 TLOG_DEBUG(5) << "!!! TD created from " << pending_td.contributing_tcs.size() << " TCs !!!";
304 }
305
306 dfmessages::TriggerDecision decision;
307 decision.trigger_number = 0; // filled by MLT
308 decision.run_number = 0; // filled by MLT
309 decision.trigger_timestamp = pending_td.contributing_tcs[m_earliest_tc_index].time_candidate;
310 decision.readout_type = dfmessages::ReadoutType::kLocalized;
311
312 TDBitset td_bitword = get_TD_bitword(pending_td);
313 TLOG_DEBUG(5) << "[MLT] TD has bitword: " << td_bitword << " "
314 << static_cast<dfmessages::trigger_type_t>(td_bitword.to_ulong());
315 decision.trigger_type = static_cast<dfmessages::trigger_type_t>(td_bitword.to_ulong()); // m_trigger_type;
316
317 //decision.trigger_type = 1; // m_trigger_type;
318
319 TLOG_DEBUG(3) << ", TC detid: " << pending_td.contributing_tcs[m_earliest_tc_index].detid
320 << ", TC type: " << static_cast<int>(pending_td.contributing_tcs[m_earliest_tc_index].type)
321 << ", TC cont number: " << pending_td.contributing_tcs.size()
322 << ", DECISION trigger type: " << decision.trigger_type
323 << ", DECISION timestamp: " << decision.trigger_timestamp
324 << ", request window begin: " << pending_td.readout_start
325 << ", request window end: " << pending_td.readout_end;
326
327 std::vector<dfmessages::ComponentRequest> requests =
328 create_all_decision_requests(m_mandatory_links, pending_td.readout_start, pending_td.readout_end);
329 add_requests_to_decision(decision, requests);
330
331 if (!m_use_roi_readout) {
332 for (const auto& [key, value] : m_group_links) {
333 std::vector<dfmessages::ComponentRequest> group_requests =
334 create_all_decision_requests(value, pending_td.readout_start, pending_td.readout_end);
335 add_requests_to_decision(decision, group_requests);
336 }
337 } else { // using ROI readout
339 }
340
342 m_tds_created_tc_count += pending_td.contributing_tcs.size();
343
344 return decision;
345}
void add_requests_to_decision(dfmessages::TriggerDecision &decision, std::vector< dfmessages::ComponentRequest > requests)
std::vector< dfmessages::ComponentRequest > create_all_decision_requests(std::vector< daqdataformats::SourceID > links, triggeralgs::timestamp_t start, triggeralgs::timestamp_t end)
void roi_readout_make_requests(dfmessages::TriggerDecision &decision)
std::atomic< metric_counter_type > m_tds_created_tc_count
std::atomic< metric_counter_type > m_tds_created_count
int get_earliest_tc_index(const PendingTD &pending_td)
@ kLocalized
Local readout, send Fragments to dataflow.
daqdataformats::trigger_type_t trigger_type_t
Copy daqdataformats::trigger_type_t.
Definition Types.hpp:45

◆ create_request_for_link()

dfmessages::ComponentRequest dunedaq::trigger::TCProcessor::create_request_for_link ( daqdataformats::SourceID link,
triggeralgs::timestamp_t start,
triggeralgs::timestamp_t end )
private

Definition at line 686 of file TCProcessor.cpp.

689{
691 request.component = link;
692 request.window_begin = start;
693 request.window_end = end;
694
695 TLOG_DEBUG(10) << "setting request start: " << request.window_begin;
696 TLOG_DEBUG(10) << "setting request end: " << request.window_end;
697
698 return request;
699}
daqdataformats::ComponentRequest ComponentRequest
Copy daqdataformats::ComponentRequest.
Definition Types.hpp:33
SourceID component
The Requested Component.

◆ generate_opmon_data()

void dunedaq::trigger::TCProcessor::generate_opmon_data ( )
overridevirtual

Hook for customisable pubblication. The function can throw, exception will be caught by the monitoring thread

Reimplemented from dunedaq::datahandlinglibs::TaskRawDataProcessorModel< TCWrapper >.

Definition at line 222 of file TCProcessor.cpp.

223{
224 opmon::TCProcessorInfo info;
225
226 info.set_tds_created_count( m_tds_created_count.load() );
227 info.set_tds_sent_count( m_tds_sent_count.load() );
228 info.set_tds_dropped_count( m_tds_dropped_count.load() );
229 info.set_tds_failed_bitword_count( m_tds_failed_bitword_count.load() );
230 info.set_tds_cleared_count( m_tds_cleared_count.load() );
231 info.set_tc_received_count( m_tc_received_count.load() );
232 info.set_tc_ignored_count( m_tc_ignored_count.load() );
233 info.set_tds_created_tc_count( m_tds_created_tc_count.load() );
234 info.set_tds_sent_tc_count( m_tds_sent_tc_count.load() );
235 info.set_tds_dropped_tc_count( m_tds_dropped_tc_count.load() );
236 info.set_tds_failed_bitword_tc_count( m_tds_failed_bitword_tc_count.load() );
237 info.set_tds_cleared_tc_count( m_tds_cleared_tc_count.load() );
238
239 this->publish(std::move(info));
240
241 if ( m_latency_monitoring.load() && m_running_flag.load() ) {
242 opmon::TriggerLatency lat_info;
243
244 lat_info.set_latency_in( m_latency_instance.get_latency_in() );
245 lat_info.set_latency_out( m_latency_instance.get_latency_out() );
246
247 this->publish(std::move(lat_info));
248 }
249}
void publish(google::protobuf::Message &&, CustomOrigin &&co={}, OpMonLevel l=to_level(EntryOpMonLevel::kDefault)) const noexcept
latency get_latency_in() const
Definition Latency.hpp:48
latency get_latency_out() const
Definition Latency.hpp:53
std::atomic< metric_counter_type > m_tc_ignored_count
std::atomic< metric_counter_type > m_tc_received_count
std::atomic< bool > m_running_flag

◆ get_earliest_tc_index()

int dunedaq::trigger::TCProcessor::get_earliest_tc_index ( const PendingTD & pending_td)
private

Definition at line 526 of file TCProcessor.cpp.

527{
528 int earliest_tc_index = -1;
529 triggeralgs::timestamp_t earliest_tc_time;
530 for (int i = 0; i < static_cast<int>(pending_td.contributing_tcs.size()); i++) {
531 if (earliest_tc_index == -1) {
532 earliest_tc_time = pending_td.contributing_tcs[i].time_candidate;
533 earliest_tc_index = i;
534 } else {
535 if (pending_td.contributing_tcs[i].time_candidate < earliest_tc_time) {
536 earliest_tc_time = pending_td.contributing_tcs[i].time_candidate;
537 earliest_tc_index = i;
538 }
539 }
540 }
541 return earliest_tc_index;
542}
dunedaq::trgdataformats::timestamp_t timestamp_t
Definition Types.hpp:16

◆ get_random_num_float()

float dunedaq::trigger::TCProcessor::get_random_num_float ( float limit)
private

Definition at line 759 of file TCProcessor.cpp.

760{
761 float rnd = (double)rand() / RAND_MAX;
762 return rnd * (limit);
763}

◆ get_random_num_int()

int dunedaq::trigger::TCProcessor::get_random_num_int ( )
private

Definition at line 778 of file TCProcessor.cpp.

779{
781 int rnd = rand() % range;
782 return rnd;
783}
DAC value out of range
Message.
Definition DACNode.hpp:32

◆ get_ready_tds()

std::vector< TCProcessor::PendingTD > dunedaq::trigger::TCProcessor::get_ready_tds ( std::vector< PendingTD > & pending_tds)
private

Definition at line 505 of file TCProcessor.cpp.

506{
507 std::vector<PendingTD> return_tds;
508 for (std::vector<PendingTD>::iterator it = pending_tds.begin(); it != pending_tds.end();) {
509 auto timestamp_now =
510 std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch())
511 .count();
512 if (timestamp_now >= it->walltime_expiration) {
513 return_tds.push_back(*it);
514 it = pending_tds.erase(it);
515 } else if (check_td_readout_length(*it)) { // Also pass on TDs with (too) long readout window
516 return_tds.push_back(*it);
517 it = pending_tds.erase(it);
518 } else {
519 ++it;
520 }
521 }
522 return return_tds;
523}
bool check_td_readout_length(const PendingTD &)

◆ get_TD_bitword()

TCProcessor::TDBitset dunedaq::trigger::TCProcessor::get_TD_bitword ( const PendingTD & ready_td) const
private

Definition at line 827 of file TCProcessor.cpp.

828{
829 // get only unique types
830 std::vector<int> tc_types;
831 for (auto tc : ready_td.contributing_tcs) {
832 tc_types.push_back(static_cast<int>(tc.type));
833 }
834 tc_types.erase(std::unique(tc_types.begin(), tc_types.end()), tc_types.end());
835
836 // form TD bitword
837 TDBitset td_bitword;
838 for (auto tc_type : tc_types) {
839 td_bitword.set(tc_type);
840 }
841 return td_bitword;
842}

◆ make_td()

void dunedaq::trigger::TCProcessor::make_td ( const TCWrapper * tcw)
protected

Pipeline Stage 2.: put valid TCs in a vector for grouping and forming of TDs

Definition at line 255 of file TCProcessor.cpp.

256{
257
258 auto tc = tcw->candidate;
259 if (m_latency_monitoring.load()) m_latency_instance.update_latency_in( tc.time_start );
261
262 if ( (m_use_readout_map) && (m_readout_window_map.count(tc.type)) ) {
263 TLOG_DEBUG(3) << "Got TC of type " << static_cast<int>(tc.type) << ", timestamp " << tc.time_candidate
264 << ", start/end " << tc.time_start << "/" << tc.time_end << ", readout start/end "
265 << tc.time_candidate - m_readout_window_map[tc.type].first << "/"
266 << tc.time_candidate + m_readout_window_map[tc.type].second;
267 } else {
268 TLOG_DEBUG(3) << "Got TC of type " << static_cast<int>(tc.type) << ", timestamp " << tc.time_candidate
269 << ", start/end " << tc.time_start << "/" << tc.time_end;
270 }
271
272 // Option to ignore TC types (if given by config)
273 if (m_ignoring_tc_types == true && check_trigger_type_ignore(static_cast<unsigned int>(tc.type)) == true) {
274 TLOG_DEBUG(3) << " Ignore TC type: " << static_cast<unsigned int>(tc.type);
276
277 /*FIXME: comment out this block: if a TC is to be ignored it shall just be ignored!
278 if (m_tc_merging) {
279 // Still need to check for overlap with existing TD, if overlaps, include in the TD, but don't extend
280 // readout
281 std::lock_guard<std::mutex> lock(m_td_vector_mutex);
282 add_tc_ignored(*tc);
283 }
284 */
285 }
286 else {
287 std::lock_guard<std::mutex> lock(m_td_vector_mutex);
288 add_tc(tc);
289 m_cv.notify_one();
290 TLOG_DEBUG(10) << "pending tds size: " << m_pending_tds.size();
291 }
292 m_last_processed_daq_ts = tc.time_start;
293 return;
294}
void update_latency_in(uint64_t latency)
Definition Latency.hpp:38
bool check_trigger_type_ignore(unsigned int tc_type)
std::condition_variable m_cv
void add_tc(const triggeralgs::TriggerCandidate tc)

◆ parse_group_links()

void dunedaq::trigger::TCProcessor::parse_group_links ( const nlohmann::json & data)
private

Definition at line 658 of file TCProcessor.cpp.

659{
660 for (auto group : data) {
661 const nlohmann::json& temp_links_data = group["links"];
662 std::vector<dfmessages::SourceID> temp_links;
663 for (auto link : temp_links_data) {
664 temp_links.push_back(
665 dfmessages::SourceID{ daqdataformats::SourceID::string_to_subsystem(link["subsystem"]), link["element"] });
666 }
667 m_group_links.insert({ group["group"], temp_links });
668 }
669 return;
670}

◆ parse_readout_map()

void dunedaq::trigger::TCProcessor::parse_readout_map ( const std::vector< const appmodel::TCReadoutMap * > & data)
private

Definition at line 629 of file TCProcessor.cpp.

630{
631 for (auto readout_type : data) {
632 TCType tc_type = static_cast<TCType>(
633 dunedaq::trgdataformats::string_to_trigger_candidate_type(readout_type->get_tc_type_name()));
634
635 // Throw error if unknown TC type
636 if (tc_type == TCType::kUnknown) {
637 throw(InvalidConfiguration(ERS_HERE, "Provided an unknown TC type in the TCReadoutMap for the TCProcessor"));
638 }
639
640 m_readout_window_map[tc_type] = {
641 readout_type->get_time_before(), readout_type->get_time_after()
642 };
643 }
644 return;
645}
triggeralgs::TriggerCandidate::Type TCType
int string_to_trigger_candidate_type(const std::string &name)

◆ parse_roi_conf()

void dunedaq::trigger::TCProcessor::parse_roi_conf ( const std::vector< const appmodel::ROIGroupConf * > & data)
private

Definition at line 723 of file TCProcessor.cpp.

724{
725 int counter = 0;
726 float run_sum = 0;
727 for (auto group : data) {
728 roi_group temp_roi_group;
729 temp_roi_group.n_links = group->get_number_of_link_groups();
730 temp_roi_group.prob = group->get_probability();
731 temp_roi_group.time_window = group->get_time_window();
732 temp_roi_group.mode = group->get_groups_selection_mode();
733 m_roi_conf.insert({ counter, temp_roi_group });
734 m_roi_conf_ids.push_back(counter);
735 m_roi_conf_probs.push_back(group->get_probability());
736 run_sum += static_cast<float>(group->get_probability());
737 m_roi_conf_probs_c.push_back(run_sum);
738 counter++;
739 }
740 return;
741}
std::vector< int > m_roi_conf_ids
std::vector< float > m_roi_conf_probs_c
std::vector< float > m_roi_conf_probs

◆ pick_roi_group_conf()

int dunedaq::trigger::TCProcessor::pick_roi_group_conf ( )
private

Definition at line 766 of file TCProcessor.cpp.

767{
768 float rnd_num = get_random_num_float(m_roi_conf_probs_c.back());
769 for (int i = 0; i < static_cast<int>(m_roi_conf_probs_c.size()); i++) {
770 if (rnd_num < m_roi_conf_probs_c[i]) {
771 return i;
772 }
773 }
774 return -1;
775}
float get_random_num_float(float limit)

◆ print_group_links()

void dunedaq::trigger::TCProcessor::print_group_links ( )
private

Definition at line 673 of file TCProcessor.cpp.

674{
675 TLOG_DEBUG(3) << "MLT Group Links:";
676 for (auto const& [key, val] : m_group_links) {
677 TLOG_DEBUG(3) << "Group: " << key;
678 for (auto const& link : val) {
679 TLOG_DEBUG(3) << link;
680 }
681 }
682 TLOG_DEBUG(3) << " ";
683 return;
684}

◆ print_opmon_stats()

void dunedaq::trigger::TCProcessor::print_opmon_stats ( )
private

Definition at line 845 of file TCProcessor.cpp.

846{
847 TLOG() << "TCProcessor opmon counters summary:";
848 TLOG() << "------------------------------";
849 TLOG() << "TDs created: \t\t\t" << m_tds_created_count << " \t(" << m_tds_created_tc_count << " TCs)";
850 TLOG() << "TDs sent: \t\t\t" << m_tds_sent_count << " \t(" << m_tds_sent_tc_count << " TCs)";
851 TLOG() << "TDs dropped: \t\t\t" << m_tds_dropped_count << " \t(" << m_tds_dropped_tc_count << " TCs)";
852 TLOG() << "TDs failed bitword check: \t" << m_tds_failed_bitword_count << " \t(" << m_tds_failed_bitword_tc_count << " TCs)";
853 TLOG() << "TDs cleared: \t\t\t" << m_tds_cleared_count << " \t(" << m_tds_cleared_tc_count << " TCs)";
854 TLOG() << "------------------------------";
855 TLOG() << "TCs received: \t" << m_tc_received_count;
856 TLOG() << "TCs ignored: \t" << m_tc_ignored_count;
857 TLOG();
858}
#define TLOG(...)
Definition macro.hpp:22

◆ print_readout_map()

void dunedaq::trigger::TCProcessor::print_readout_map ( std::map< TCType, std::pair< triggeralgs::timestamp_t, triggeralgs::timestamp_t > > map)
private

Definition at line 647 of file TCProcessor.cpp.

649{
650 TLOG_DEBUG(3) << "MLT TD Readout map:";
651 for (auto const& [key, val] : map) {
652 TLOG_DEBUG(3) << "Type: " << static_cast<int>(key) << ", before: " << val.first << ", after: " << val.second;
653 }
654 return;
655}

◆ print_roi_conf()

void dunedaq::trigger::TCProcessor::print_roi_conf ( std::map< int, roi_group > roi_conf)
private

Definition at line 744 of file TCProcessor.cpp.

745{
746 TLOG_DEBUG(3) << "ROI CONF";
747 for (const auto& [key, value] : roi_conf) {
748 TLOG_DEBUG(3) << "ID: " << key;
749 TLOG_DEBUG(3) << "n links: " << value.n_links;
750 TLOG_DEBUG(3) << "prob: " << value.prob;
751 TLOG_DEBUG(3) << "time: " << value.time_window;
752 TLOG_DEBUG(3) << "mode: " << value.mode;
753 }
754 TLOG_DEBUG(3) << " ";
755 return;
756}

◆ print_trigger_bitwords()

void dunedaq::trigger::TCProcessor::print_trigger_bitwords ( )
private

Definition at line 585 of file TCProcessor.cpp.

586{
587 TLOG_DEBUG(3) << "Configured trigger words:";
588 for (const auto& bitword : m_trigger_bitwords) {
589 TLOG_DEBUG(3) << bitword;
590 }
591}

◆ roi_readout_make_requests()

void dunedaq::trigger::TCProcessor::roi_readout_make_requests ( dfmessages::TriggerDecision & decision)
private

Definition at line 785 of file TCProcessor.cpp.

786{
787 // Get configuration at random (weighted)
788 int group_pick = pick_roi_group_conf();
789 if (group_pick != -1) {
790 roi_group this_group = m_roi_conf[m_roi_conf_ids[group_pick]];
791 std::vector<dfmessages::SourceID> links;
792
793 // If mode is random, pick groups to request at random
794 if (this_group.mode == "kRandom") {
795 TLOG_DEBUG(10) << "RAND";
796 std::set<int> groups;
797 while (static_cast<int>(groups.size()) < this_group.n_links) {
798 groups.insert(get_random_num_int());
799 }
800 for (auto r_id : groups) {
801 links.insert(links.end(), m_group_links[r_id].begin(), m_group_links[r_id].end());
802 }
803 // Otherwise, read sequntially by IDs, starting at 0
804 } else {
805 TLOG_DEBUG(10) << "SEQ";
806 int r_id = 0;
807 while (r_id < this_group.n_links) {
808 links.insert(links.end(), m_group_links[r_id].begin(), m_group_links[r_id].end());
809 r_id++;
810 }
811 }
812
813 TLOG_DEBUG(10) << "TD timestamp: " << decision.trigger_timestamp;
814 TLOG_DEBUG(10) << "group window: " << this_group.time_window;
815
816 // Once the components are prepared, create requests and append them to decision
817 std::vector<dfmessages::ComponentRequest> requests =
818 create_all_decision_requests(links, decision.trigger_timestamp - this_group.time_window,
819 decision.trigger_timestamp + this_group.time_window);
820 add_requests_to_decision(decision, requests);
821 links.clear();
822 }
823 return;
824}

◆ scrap()

void dunedaq::trigger::TCProcessor::scrap ( const nlohmann::json & )
overridevirtual

Unconfigure.

Reimplemented from dunedaq::datahandlinglibs::TaskRawDataProcessorModel< TCWrapper >.

Definition at line 196 of file TCProcessor.cpp.

197{
198 TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << "TCProcessor: Entering scrap() method";
199
200 m_mandatory_links.clear();
201 m_group_links.clear();
202 m_roi_conf.clear();
203 m_roi_conf_data.clear();
204 m_roi_conf_ids.clear();
205 m_roi_conf_probs.clear();
206 m_roi_conf_probs_c.clear();
207 m_pending_tds.clear();
209 m_readout_window_map.clear();
210 m_ignored_tc_types.clear();
211
212 m_td_sink.reset();
213
214 m_group_links_data.clear();
215
216 inherited::scrap(args);
217
218 TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << "TCProcessor: Exiting scrap() method";
219}

◆ send_trigger_decisions()

void dunedaq::trigger::TCProcessor::send_trigger_decisions ( )
private

Definition at line 349 of file TCProcessor.cpp.

349 {
350 // A unique lock that can be locked and unlocked
351 std::unique_lock<std::mutex> lock(m_td_vector_mutex);
352
353 while (m_running_flag) {
354 // Either there are pending TDs, or wait for a bit
355 m_cv.wait(lock, [this] {
356 return !m_pending_tds.empty() || !m_running_flag;
357 });
358 auto ready_tds = get_ready_tds(m_pending_tds);
359 TLOG_DEBUG(10) << "ready tds: " << ready_tds.size() << ", updated pending tds: " << m_pending_tds.size();
360
361 for (std::vector<PendingTD>::iterator it = ready_tds.begin(); it != ready_tds.end();) {
362 call_tc_decision(*it);
363 ++it;
364 }
365 }
366}
void call_tc_decision(const PendingTD &pending_td)
std::vector< PendingTD > get_ready_tds(std::vector< PendingTD > &pending_tds)

◆ set_trigger_bitwords()

void dunedaq::trigger::TCProcessor::set_trigger_bitwords ( const std::vector< const appmodel::TriggerBitword * > & _bitwords)
private

Definition at line 609 of file TCProcessor.cpp.

610{
611 for (const appmodel::TriggerBitword* bitword : _bitwords) {
612 TDBitset temp_bitword;
613
614 for (const std::string& tctype_str: bitword->get_bitword()) {
616
617 if (tc_type == TCType::kUnknown) {
618 throw(InvalidConfiguration(ERS_HERE, "Provided an unknown/non-existent TC type as a trigger bitword!"));
619 }
620
621 temp_bitword.set(static_cast<uint64_t>(tc_type));
622 }
623
624 m_trigger_bitwords.push_back(temp_bitword);
625 }
626}

◆ start()

void dunedaq::trigger::TCProcessor::start ( const nlohmann::json & )
overridevirtual

Start operation.

Reimplemented from dunedaq::datahandlinglibs::TaskRawDataProcessorModel< TCWrapper >.

Definition at line 41 of file TCProcessor.cpp.

42{
43 TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << "TCProcessor: Entering start() method";
44
45 m_running_flag.store(true);
47 pthread_setname_np(m_send_trigger_decisions_thread.native_handle(), "mlt-dec"); // TODO: originally mlt-trig-dec
48
49 // Reset stats
50 m_tds_created_count.store(0);
51 m_tds_sent_count.store(0);
52 m_tds_dropped_count.store(0);
54 m_tds_cleared_count.store(0);
55 // per TC
56 m_tc_received_count.store(0);
58 m_tds_sent_tc_count.store(0);
62 m_tc_ignored_count.store(0);
63 inherited::start(args);
64
65 TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << "TCProcessor: Exiting start() method";
66}
std::thread m_send_trigger_decisions_thread

◆ stop()

void dunedaq::trigger::TCProcessor::stop ( const nlohmann::json & )
overridevirtual

Stop operation.

Reimplemented from dunedaq::datahandlinglibs::TaskRawDataProcessorModel< TCWrapper >.

Definition at line 69 of file TCProcessor.cpp.

70{
71 TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << "TCProcessor: Entering stop() method";
72
73 inherited::stop(args);
74 m_running_flag.store(false);
75
76 // Make sure condition_variable knows we flipped running flag
77 {
78 std::lock_guard<std::mutex> lock(m_td_vector_mutex);
79 m_cv.notify_all();
80 }
81
82 // Wait for the TD-sending thread to stop
84
85 // Drop all TDs in vectors at run stage change. Have to do this
86 // after joining m_send_trigger_decisions_thread so we don't
87 // concurrently access the vectors
89
91
92 TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << "TCProcessor: Exiting stop() method";
93}

Member Data Documentation

◆ m_buffer_timeout

int64_t dunedaq::trigger::TCProcessor::m_buffer_timeout
private

Definition at line 142 of file TCProcessor.hpp.

◆ m_cv

std::condition_variable dunedaq::trigger::TCProcessor::m_cv
private

Definition at line 132 of file TCProcessor.hpp.

◆ m_earliest_tc_index

int dunedaq::trigger::TCProcessor::m_earliest_tc_index
private

Definition at line 145 of file TCProcessor.hpp.

◆ m_group_links

std::map<int, std::vector<daqdataformats::SourceID> > dunedaq::trigger::TCProcessor::m_group_links
private

Definition at line 74 of file TCProcessor.hpp.

◆ m_group_links_data

nlohmann::json dunedaq::trigger::TCProcessor::m_group_links_data
private

Definition at line 75 of file TCProcessor.hpp.

◆ m_ignore_tc_pileup

bool dunedaq::trigger::TCProcessor::m_ignore_tc_pileup
private

Ignore TCs that overlap with already made TD.

Definition at line 114 of file TCProcessor.hpp.

◆ m_ignored_tc_types

std::vector<unsigned int> dunedaq::trigger::TCProcessor::m_ignored_tc_types
private

Definition at line 170 of file TCProcessor.hpp.

◆ m_ignoring_tc_types

bool dunedaq::trigger::TCProcessor::m_ignoring_tc_types
private

Definition at line 171 of file TCProcessor.hpp.

◆ m_last_trigger_number

dfmessages::trigger_number_t dunedaq::trigger::TCProcessor::m_last_trigger_number
private

Definition at line 116 of file TCProcessor.hpp.

◆ m_latency_in

std::atomic<metric_counter_type> dunedaq::trigger::TCProcessor::m_latency_in { 0 }
private

Definition at line 198 of file TCProcessor.hpp.

198{ 0 };

◆ m_latency_instance

dunedaq::trigger::Latency dunedaq::trigger::TCProcessor::m_latency_instance
private

Definition at line 197 of file TCProcessor.hpp.

◆ m_latency_monitoring

std::atomic<bool> dunedaq::trigger::TCProcessor::m_latency_monitoring { false }
private

Definition at line 196 of file TCProcessor.hpp.

196{ false };

◆ m_latency_out

std::atomic<metric_counter_type> dunedaq::trigger::TCProcessor::m_latency_out { 0 }
private

Definition at line 199 of file TCProcessor.hpp.

199{ 0 };

◆ m_mandatory_links

std::vector<daqdataformats::SourceID> dunedaq::trigger::TCProcessor::m_mandatory_links
private

Definition at line 73 of file TCProcessor.hpp.

◆ m_pending_tds

std::vector<PendingTD> dunedaq::trigger::TCProcessor::m_pending_tds
private

Definition at line 130 of file TCProcessor.hpp.

◆ m_readout_window_map

std::map<TCType, std::pair<triggeralgs::timestamp_t, triggeralgs::timestamp_t> > dunedaq::trigger::TCProcessor::m_readout_window_map
private

Definition at line 160 of file TCProcessor.hpp.

◆ m_readout_window_map_data

std::vector<const appmodel::TCReadoutMap*> dunedaq::trigger::TCProcessor::m_readout_window_map_data
private

Definition at line 158 of file TCProcessor.hpp.

◆ m_repeat_trigger_count

int dunedaq::trigger::TCProcessor::m_repeat_trigger_count { 1 }
private

Definition at line 110 of file TCProcessor.hpp.

110{ 1 };

◆ m_roi_conf

std::map<int, roi_group> dunedaq::trigger::TCProcessor::m_roi_conf
private

Definition at line 98 of file TCProcessor.hpp.

◆ m_roi_conf_data

std::vector<const appmodel::ROIGroupConf*> dunedaq::trigger::TCProcessor::m_roi_conf_data
private

Definition at line 99 of file TCProcessor.hpp.

◆ m_roi_conf_ids

std::vector<int> dunedaq::trigger::TCProcessor::m_roi_conf_ids
private

Definition at line 102 of file TCProcessor.hpp.

◆ m_roi_conf_probs

std::vector<float> dunedaq::trigger::TCProcessor::m_roi_conf_probs
private

Definition at line 103 of file TCProcessor.hpp.

◆ m_roi_conf_probs_c

std::vector<float> dunedaq::trigger::TCProcessor::m_roi_conf_probs_c
private

Definition at line 104 of file TCProcessor.hpp.

◆ m_run_number

dfmessages::run_number_t dunedaq::trigger::TCProcessor::m_run_number
private

Definition at line 118 of file TCProcessor.hpp.

◆ m_running_flag

std::atomic<bool> dunedaq::trigger::TCProcessor::m_running_flag { false }
private

Definition at line 120 of file TCProcessor.hpp.

120{ false };

◆ m_send_timed_out_tds

std::atomic<bool> dunedaq::trigger::TCProcessor::m_send_timed_out_tds
private

Definition at line 144 of file TCProcessor.hpp.

◆ m_send_trigger_decisions_thread

std::thread dunedaq::trigger::TCProcessor::m_send_trigger_decisions_thread
private

Definition at line 70 of file TCProcessor.hpp.

◆ m_tc_ignored_count

std::atomic<metric_counter_type> dunedaq::trigger::TCProcessor::m_tc_ignored_count { 0 }
private

Definition at line 193 of file TCProcessor.hpp.

193{ 0 };

◆ m_tc_merging

std::atomic<bool> dunedaq::trigger::TCProcessor::m_tc_merging
private

Definition at line 111 of file TCProcessor.hpp.

◆ m_tc_received_count

std::atomic<metric_counter_type> dunedaq::trigger::TCProcessor::m_tc_received_count { 0 }
private

Definition at line 187 of file TCProcessor.hpp.

187{ 0 };

◆ m_td_readout_limit

int64_t dunedaq::trigger::TCProcessor::m_td_readout_limit
private

Definition at line 143 of file TCProcessor.hpp.

◆ m_td_sink

std::shared_ptr<iomanager::SenderConcept<dfmessages::TriggerDecision> > dunedaq::trigger::TCProcessor::m_td_sink
private

Definition at line 176 of file TCProcessor.hpp.

◆ m_td_vector_mutex

std::mutex dunedaq::trigger::TCProcessor::m_td_vector_mutex
private

Definition at line 131 of file TCProcessor.hpp.

◆ m_tds_cleared_count

std::atomic<metric_counter_type> dunedaq::trigger::TCProcessor::m_tds_cleared_count { 0 }
private

Definition at line 184 of file TCProcessor.hpp.

184{ 0 };

◆ m_tds_cleared_tc_count

std::atomic<metric_counter_type> dunedaq::trigger::TCProcessor::m_tds_cleared_tc_count { 0 }
private

Definition at line 192 of file TCProcessor.hpp.

192{ 0 };

◆ m_tds_created_count

std::atomic<metric_counter_type> dunedaq::trigger::TCProcessor::m_tds_created_count { 0 }
private

Definition at line 180 of file TCProcessor.hpp.

180{ 0 }; // NOLINT(build/unsigned)

◆ m_tds_created_tc_count

std::atomic<metric_counter_type> dunedaq::trigger::TCProcessor::m_tds_created_tc_count { 0 }
private

Definition at line 188 of file TCProcessor.hpp.

188{ 0 };

◆ m_tds_dropped_count

std::atomic<metric_counter_type> dunedaq::trigger::TCProcessor::m_tds_dropped_count { 0 }
private

Definition at line 182 of file TCProcessor.hpp.

182{ 0 };

◆ m_tds_dropped_tc_count

std::atomic<metric_counter_type> dunedaq::trigger::TCProcessor::m_tds_dropped_tc_count { 0 }
private

Definition at line 190 of file TCProcessor.hpp.

190{ 0 };

◆ m_tds_failed_bitword_count

std::atomic<metric_counter_type> dunedaq::trigger::TCProcessor::m_tds_failed_bitword_count { 0 }
private

Definition at line 183 of file TCProcessor.hpp.

183{ 0 };

◆ m_tds_failed_bitword_tc_count

std::atomic<metric_counter_type> dunedaq::trigger::TCProcessor::m_tds_failed_bitword_tc_count { 0 }
private

Definition at line 191 of file TCProcessor.hpp.

191{ 0 };

◆ m_tds_sent_count

std::atomic<metric_counter_type> dunedaq::trigger::TCProcessor::m_tds_sent_count { 0 }
private

Definition at line 181 of file TCProcessor.hpp.

181{ 0 };

◆ m_tds_sent_tc_count

std::atomic<metric_counter_type> dunedaq::trigger::TCProcessor::m_tds_sent_tc_count { 0 }
private

Definition at line 189 of file TCProcessor.hpp.

189{ 0 };

◆ m_total_group_links

int dunedaq::trigger::TCProcessor::m_total_group_links
private

Definition at line 76 of file TCProcessor.hpp.

◆ m_trigger_bitwords

std::vector<TDBitset> dunedaq::trigger::TCProcessor::m_trigger_bitwords
private

Definition at line 150 of file TCProcessor.hpp.

◆ m_trigger_type_shifted

dfmessages::trigger_type_t dunedaq::trigger::TCProcessor::m_trigger_type_shifted
private

Definition at line 167 of file TCProcessor.hpp.

◆ m_use_bitwords

bool dunedaq::trigger::TCProcessor::m_use_bitwords
private

Definition at line 149 of file TCProcessor.hpp.

◆ m_use_readout_map

bool dunedaq::trigger::TCProcessor::m_use_readout_map
private

Definition at line 157 of file TCProcessor.hpp.

◆ m_use_roi_readout

bool dunedaq::trigger::TCProcessor::m_use_roi_readout
private

Definition at line 90 of file TCProcessor.hpp.


The documentation for this class was generated from the following files: