DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
dunedaq::trigger::TAProcessor Class Reference

#include <TAProcessor.hpp>

Inheritance diagram for dunedaq::trigger::TAProcessor:
[legend]
Collaboration diagram for dunedaq::trigger::TAProcessor:
[legend]

Public Types

using inherited = datahandlinglibs::TaskRawDataProcessorModel<TAWrapper>
 
using taptr = TAWrapper*
 
using consttaptr = const TAWrapper*
 
- Public Types inherited from dunedaq::opmonlib::MonitorableObject
using NodePtr = std::weak_ptr<MonitorableObject>
 
using NewNodePtr = std::shared_ptr<MonitorableObject>
 
using ElementId = std::string
 

Public Member Functions

 TAProcessor (std::unique_ptr< datahandlinglibs::FrameErrorRegistry > &error_registry, bool post_processing_enabled)
 
 ~TAProcessor ()
 
void start (const nlohmann::json &args) override
 Start operation.
 
void stop (const nlohmann::json &args) override
 Stop operation.
 
void conf (const appmodel::DataHandlerModule *conf) override
 Set the emulator mode, if active, timestamps of processed packets are overwritten with new ones.
 
void scrap (const nlohmann::json &args) override
 Unconfigure.
 
void generate_opmon_data () override
 
- Public Member Functions inherited from dunedaq::datahandlinglibs::TaskRawDataProcessorModel< TAWrapper >
 TaskRawDataProcessorModel (std::unique_ptr< FrameErrorRegistry > &error_registry, bool post_processing_enabled)
 
 ~TaskRawDataProcessorModel ()
 
void conf (const appmodel::DataHandlerModule *conf) override
 Set the emulator mode, if active, timestamps of processed packets are overwritten with new ones.
 
void scrap (const nlohmann::json &) override
 Unconfigure.
 
void start (const nlohmann::json &) override
 Start operation.
 
void stop (const nlohmann::json &) override
 Stop operation.
 
void reset_last_daq_time ()
 
std::uint64_t get_last_daq_time () override
 Get newest timestamp of last seen packet.
 
void preprocess_item (TAWrapper *item) override
 Preprocess one element.
 
void postprocess_item (const TAWrapper *item) override
 Postprocess one element.
 
void add_preprocess_task (Task &&task)
 
void add_postprocess_task (Task &&task)
 
void invoke_all_preprocess_functions (TAWrapper *item)
 
void launch_all_preprocess_functions (TAWrapper *item)
 
- Public Member Functions inherited from dunedaq::datahandlinglibs::RawDataProcessorConcept< TAWrapper >
 RawDataProcessorConcept ()
 
 RawDataProcessorConcept (const RawDataProcessorConcept &)=delete
 RawDataProcessorConcept is not copy-constructible.
 
 RawDataProcessorConcept (RawDataProcessorConcept &&)=delete
 RawDataProcessorConcept is not move-constructible.
 
virtual ~RawDataProcessorConcept ()
 
RawDataProcessorConceptoperator= (const RawDataProcessorConcept &)=delete
 RawDataProcessorConcept is not copy-assginable.
 
RawDataProcessorConceptoperator= (RawDataProcessorConcept &&)=delete
 RawDataProcessorConcept is not move-assignable.
 
- Public Member Functions inherited from dunedaq::opmonlib::MonitorableObject
 MonitorableObject (const MonitorableObject &)=delete
 
MonitorableObjectoperator= (const MonitorableObject &)=delete
 
 MonitorableObject (MonitorableObject &&)=delete
 
MonitorableObjectoperator= (MonitorableObject &&)=delete
 
virtual ~MonitorableObject ()=default
 
auto get_opmon_id () const noexcept
 
auto get_opmon_level () const noexcept
 

Protected Member Functions

void find_tc (const TAWrapper *ta, std::shared_ptr< triggeralgs::TriggerCandidateMaker > tcm)
 
- Protected Member Functions inherited from dunedaq::datahandlinglibs::TaskRawDataProcessorModel< TAWrapper >
void run_post_processing_thread (std::function< void(const TAWrapper *)> &function, folly::ProducerConsumerQueue< const TAWrapper * > &queue)
 
- Protected Member Functions inherited from dunedaq::opmonlib::MonitorableObject
 MonitorableObject ()=default
 
void register_node (ElementId name, NewNodePtr)
 
void publish (google::protobuf::Message &&, CustomOrigin &&co={}, OpMonLevel l=to_level(EntryOpMonLevel::kDefault)) const noexcept
 

Protected Attributes

dunedaq::daqdataformats::timestamp_t m_previous_ts = 0
 
dunedaq::daqdataformats::timestamp_t m_current_ts = 0
 
- Protected Attributes inherited from dunedaq::datahandlinglibs::TaskRawDataProcessorModel< TAWrapper >
std::vector< std::function< void(TAWrapper *)> > m_preprocess_functions
 
std::unique_ptr< FrameErrorRegistry > & m_error_registry
 
bool m_post_processing_enabled
 
std::atomic< bool > m_run_marker
 
std::vector< std::function< void(const TAWrapper *)> > m_post_process_functions
 
std::vector< std::unique_ptr< folly::ProducerConsumerQueue< const TAWrapper * > > > m_items_to_postprocess_queues
 
std::vector< std::unique_ptr< utilities::ReusableThread > > m_post_process_threads
 
size_t m_postprocess_queue_sizes
 
daqdataformats::SourceID m_sourceid
 
std::atomic< uint64_t > m_last_processed_daq_ts
 

Private Types

using metric_counter_type = uint64_t
 

Private Member Functions

void print_opmon_stats ()
 

Private Attributes

std::vector< std::shared_ptr< triggeralgs::TriggerCandidateMaker > > m_tcms
 
std::shared_ptr< iomanager::SenderConcept< triggeralgs::TriggerCandidate > > m_tc_sink
 
daqdataformats::SourceID m_sourceid
 
std::atomic< metric_counter_typem_ta_received_count { 0 }
 
std::atomic< metric_counter_typem_tc_made_count { 0 }
 
std::atomic< metric_counter_typem_tc_sent_count { 0 }
 
std::atomic< metric_counter_typem_tc_failed_sent_count { 0 }
 
std::atomic< bool > m_running_flag { false }
 
std::atomic< bool > m_latency_monitoring { false }
 
dunedaq::trigger::Latency m_latency_instance
 
std::atomic< metric_counter_typem_latency_in { 0 }
 
std::atomic< metric_counter_typem_latency_out { 0 }
 

Additional Inherited Members

- Static Public Member Functions inherited from dunedaq::opmonlib::MonitorableObject
static bool publishable_metric (OpMonLevel entry, OpMonLevel system) noexcept
 

Detailed Description

Definition at line 32 of file TAProcessor.hpp.

Member Typedef Documentation

◆ consttaptr

Definition at line 38 of file TAProcessor.hpp.

◆ inherited

◆ metric_counter_type

Definition at line 74 of file TAProcessor.hpp.

◆ taptr

Definition at line 37 of file TAProcessor.hpp.

Constructor & Destructor Documentation

◆ TAProcessor()

dunedaq::trigger::TAProcessor::TAProcessor ( std::unique_ptr< datahandlinglibs::FrameErrorRegistry > & error_registry,
bool post_processing_enabled )
explicit

Definition at line 39 of file TAProcessor.cpp.

40 : datahandlinglibs::TaskRawDataProcessorModel<TAWrapper>(error_registry, post_processing_enabled)
41{
42}

◆ ~TAProcessor()

dunedaq::trigger::TAProcessor::~TAProcessor ( )

Definition at line 44 of file TAProcessor.cpp.

45{}

Member Function Documentation

◆ conf()

void dunedaq::trigger::TAProcessor::conf ( const appmodel::DataHandlerModule * conf)
overridevirtual

Set the emulator mode, if active, timestamps of processed packets are overwritten with new ones.

Implements dunedaq::datahandlinglibs::RawDataProcessorConcept< TAWrapper >.

Definition at line 78 of file TAProcessor.cpp.

79{
80 TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << "TAProcessor: Entering conf() method";
81
82 for (auto output : conf->get_outputs()) {
83 try {
84 if (output->get_data_type() == "TriggerCandidate") {
86 }
87 } catch (const ers::Issue& excpt) {
88 ers::error(datahandlinglibs::ResourceQueueError(ERS_HERE, "tc", "DefaultRequestHandlerModel", excpt));
89 }
90 }
91
92 m_sourceid.id = conf->get_source_id();
94 std::vector<const appmodel::TCAlgorithm*> tc_algorithms;
95 auto dp = conf->get_module_configuration()->get_data_processor();
96 auto proc_conf = dp->cast<appmodel::TADataProcessor>();
97 if (proc_conf != nullptr && m_post_processing_enabled ) {
98 tc_algorithms = proc_conf->get_algorithms();
99 }
100
101 for (auto algo : tc_algorithms) {
102 TLOG() << "Selected TC algorithm: " << algo->UID();
103 std::shared_ptr<triggeralgs::TriggerCandidateMaker> maker = make_tc_maker(algo->class_name());
104 nlohmann::json algo_json = algo->to_json(true);
105 maker->configure(algo_json[algo->UID()]);
106 inherited::add_postprocess_task(std::bind(&TAProcessor::find_tc, this, std::placeholders::_1, maker));
107 m_tcms.push_back(maker);
108 }
109 m_latency_monitoring.store( dp->get_latency_monitoring() );
111
112 TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << "TAProcessor: Exiting conf() method";
113}
#define ERS_HERE
void conf(const appmodel::DataHandlerModule *conf) override
void find_tc(const TAWrapper *ta, std::shared_ptr< triggeralgs::TriggerCandidateMaker > tcm)
std::vector< std::shared_ptr< triggeralgs::TriggerCandidateMaker > > m_tcms
daqdataformats::SourceID m_sourceid
std::atomic< bool > m_latency_monitoring
std::shared_ptr< iomanager::SenderConcept< triggeralgs::TriggerCandidate > > m_tc_sink
void conf(const appmodel::DataHandlerModule *conf) override
Set the emulator mode, if active, timestamps of processed packets are overwritten with new ones.
Base class for any user define issue.
Definition Issue.hpp:69
#define TLVL_ENTER_EXIT_METHODS
#define TLOG_DEBUG(lvl,...)
Definition Logging.hpp:112
#define TLOG(...)
Definition macro.hpp:22
std::unique_ptr< triggeralgs::TriggerCandidateMaker > make_tc_maker(std::string const &plugin_name)
Load a TriggerCandidateMaker plugin and return a unique_ptr to the contained class.
static std::shared_ptr< iomanager::SenderConcept< Datatype > > get_iom_sender(iomanager::ConnectionId const &id)
void error(const Issue &issue)
Definition ers.hpp:81
Subsystem subsystem
The general subsystem of the source of the data.
Definition SourceID.hpp:69
ID_t id
Unique identifier of the source of the data.
Definition SourceID.hpp:74
static const constexpr daqdataformats::SourceID::Subsystem subsystem
Definition TAWrapper.hpp:81

◆ find_tc()

void dunedaq::trigger::TAProcessor::find_tc ( const TAWrapper * ta,
std::shared_ptr< triggeralgs::TriggerCandidateMaker > tca )
protected

Pipeline Stage 2.: Do TA finding

Pipeline Stage 2.: Do software TPG

Definition at line 151 of file TAProcessor.cpp.

152{
153 //time_activity gave 0 :/
154 if (m_latency_monitoring.load()) m_latency_instance.update_latency_in( ta->activity.time_start );
156 std::vector<triggeralgs::TriggerCandidate> tcs;
157 tca->operator()(ta->activity, tcs);
158 for (auto tc : tcs) {
160 if (m_latency_monitoring.load()) m_latency_instance.update_latency_out( tc.time_candidate );
161 if(!m_tc_sink->try_send(std::move(tc), iomanager::Sender::s_no_block)) {
162 ers::warning(TCDropped(ERS_HERE, tc.time_start, m_sourceid.id));
164 } else {
166 }
167 }
168 m_last_processed_daq_ts = ta->activity.time_start;
169 return;
170}
static constexpr timeout_t s_no_block
Definition Sender.hpp:26
void update_latency_out(uint64_t latency)
Definition Latency.hpp:43
void update_latency_in(uint64_t latency)
Definition Latency.hpp:38
std::atomic< metric_counter_type > m_ta_received_count
dunedaq::trigger::Latency m_latency_instance
std::atomic< metric_counter_type > m_tc_failed_sent_count
std::atomic< metric_counter_type > m_tc_made_count
std::atomic< metric_counter_type > m_tc_sent_count
void warning(const Issue &issue)
Definition ers.hpp:115

◆ generate_opmon_data()

void dunedaq::trigger::TAProcessor::generate_opmon_data ( )
overridevirtual

Hook for customisable pubblication. The function can throw, exception will be caught by the monitoring thread

Reimplemented from dunedaq::datahandlinglibs::TaskRawDataProcessorModel< TAWrapper >.

Definition at line 126 of file TAProcessor.cpp.

127{
128 opmon::TAProcessorInfo info;
129
130 info.set_ta_received_count( m_ta_received_count.load() );
131 info.set_tc_made_count( m_tc_made_count.load() );
132 info.set_tc_sent_count( m_tc_sent_count.load() );
133 info.set_tc_failed_sent_count( m_tc_failed_sent_count.load() );
134
135 this->publish(std::move(info));
136
137 if ( m_latency_monitoring.load() && m_running_flag.load() ) {
138 opmon::TriggerLatency lat_info;
139
140 lat_info.set_latency_in( m_latency_instance.get_latency_in() );
141 lat_info.set_latency_out( m_latency_instance.get_latency_out() );
142
143 this->publish(std::move(lat_info));
144 }
145}
void publish(google::protobuf::Message &&, CustomOrigin &&co={}, OpMonLevel l=to_level(EntryOpMonLevel::kDefault)) const noexcept
latency get_latency_in() const
Definition Latency.hpp:48
latency get_latency_out() const
Definition Latency.hpp:53
std::atomic< bool > m_running_flag

◆ print_opmon_stats()

void dunedaq::trigger::TAProcessor::print_opmon_stats ( )
private

Definition at line 173 of file TAProcessor.cpp.

174{
175 TLOG() << "TAProcessor opmon counters summary:";
176 TLOG() << "------------------------------";
177 TLOG() << "TAs received: \t\t" << m_ta_received_count;
178 TLOG() << "TCs made: \t\t" << m_tc_made_count;
179 TLOG() << "TCs sent: \t\t" << m_tc_sent_count;
180 TLOG() << "TCs failed to send: \t" << m_tc_failed_sent_count;
181 TLOG();
182}

◆ scrap()

void dunedaq::trigger::TAProcessor::scrap ( const nlohmann::json & args)
overridevirtual

Unconfigure.

Implements dunedaq::datahandlinglibs::RawDataProcessorConcept< TAWrapper >.

Definition at line 116 of file TAProcessor.cpp.

117{
118 TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << "TAProcessor: Entering scrap() method";
119 m_tcms.clear();
120 m_tc_sink.reset();
121 inherited::scrap(args);
122 TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << "TAProcessor: Exiting scrap() method";
123}

◆ start()

void dunedaq::trigger::TAProcessor::start ( const nlohmann::json & args)
overridevirtual

Start operation.

Implements dunedaq::datahandlinglibs::RawDataProcessorConcept< TAWrapper >.

Definition at line 48 of file TAProcessor.cpp.

49{
50 TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << "TAProcessor: Entering start() method";
51
52 // Reset stats
53 m_ta_received_count.store(0);
54 m_tc_made_count.store(0);
55 m_tc_sent_count.store(0);
57
58 m_running_flag.store(true);
59
60 inherited::start(args);
61
62 TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << "TAProcessor: Exiting start() method";
63}

◆ stop()

void dunedaq::trigger::TAProcessor::stop ( const nlohmann::json & args)
overridevirtual

Stop operation.

Implements dunedaq::datahandlinglibs::RawDataProcessorConcept< TAWrapper >.

Definition at line 66 of file TAProcessor.cpp.

67{
68 TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << "TAProcessor: Entering stop() method";
69
70 inherited::stop(args);
71 m_running_flag.store(false);
73
74 TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << "TAProcessor: Exiting stop() method";
75}

Member Data Documentation

◆ m_current_ts

dunedaq::daqdataformats::timestamp_t dunedaq::trigger::TAProcessor::m_current_ts = 0
protected

Definition at line 58 of file TAProcessor.hpp.

◆ m_latency_in

std::atomic<metric_counter_type> dunedaq::trigger::TAProcessor::m_latency_in { 0 }
private

Definition at line 85 of file TAProcessor.hpp.

85{ 0 };

◆ m_latency_instance

dunedaq::trigger::Latency dunedaq::trigger::TAProcessor::m_latency_instance
private

Definition at line 84 of file TAProcessor.hpp.

◆ m_latency_monitoring

std::atomic<bool> dunedaq::trigger::TAProcessor::m_latency_monitoring { false }
private

Definition at line 83 of file TAProcessor.hpp.

83{ false };

◆ m_latency_out

std::atomic<metric_counter_type> dunedaq::trigger::TAProcessor::m_latency_out { 0 }
private

Definition at line 86 of file TAProcessor.hpp.

86{ 0 };

◆ m_previous_ts

dunedaq::daqdataformats::timestamp_t dunedaq::trigger::TAProcessor::m_previous_ts = 0
protected

Definition at line 57 of file TAProcessor.hpp.

◆ m_running_flag

std::atomic<bool> dunedaq::trigger::TAProcessor::m_running_flag { false }
private

Definition at line 82 of file TAProcessor.hpp.

82{ false };

◆ m_sourceid

daqdataformats::SourceID dunedaq::trigger::TAProcessor::m_sourceid
private

Definition at line 72 of file TAProcessor.hpp.

◆ m_ta_received_count

std::atomic<metric_counter_type> dunedaq::trigger::TAProcessor::m_ta_received_count { 0 }
private

Definition at line 75 of file TAProcessor.hpp.

75{ 0 }; // NOLINT(build/unsigned)

◆ m_tc_failed_sent_count

std::atomic<metric_counter_type> dunedaq::trigger::TAProcessor::m_tc_failed_sent_count { 0 }
private

Definition at line 78 of file TAProcessor.hpp.

78{ 0 };

◆ m_tc_made_count

std::atomic<metric_counter_type> dunedaq::trigger::TAProcessor::m_tc_made_count { 0 }
private

Definition at line 76 of file TAProcessor.hpp.

76{ 0 };

◆ m_tc_sent_count

std::atomic<metric_counter_type> dunedaq::trigger::TAProcessor::m_tc_sent_count { 0 }
private

Definition at line 77 of file TAProcessor.hpp.

77{ 0 };

◆ m_tc_sink

std::shared_ptr<iomanager::SenderConcept<triggeralgs::TriggerCandidate> > dunedaq::trigger::TAProcessor::m_tc_sink
private

Definition at line 70 of file TAProcessor.hpp.

◆ m_tcms

std::vector<std::shared_ptr<triggeralgs::TriggerCandidateMaker> > dunedaq::trigger::TAProcessor::m_tcms
private

Definition at line 68 of file TAProcessor.hpp.


The documentation for this class was generated from the following files: