DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
TPProcessor.cpp
Go to the documentation of this file.
1
8#include "trigger/TPProcessor.hpp" // NOLINT(build/include)
9#include "trigger/Issues.hpp" // NOLINT(build/include)
10
11#include "iomanager/Sender.hpp"
12#include "logging/Logging.hpp"
13
18
20
23
26
29
30// THIS SHOULDN'T BE HERE!!!!! But it is necessary.....
32DUNE_DAQ_TYPESTRING(std::vector<dunedaq::trigger::TriggerPrimitiveTypeAdapter>, "TriggerPrimitiveVector")
33
34namespace dunedaq {
35namespace trigger {
36
37TPProcessor::TPProcessor(std::unique_ptr<datahandlinglibs::FrameErrorRegistry>& error_registry, bool post_processing_enabled)
38 : TaskRawDataProcessorModel<TriggerPrimitiveTypeAdapter>(error_registry, post_processing_enabled)
39{
40}
41
44
45void
46TPProcessor::start(const nlohmann::json& args)
47{
48 TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << "TPProcessor: Entering start() method";
49
50 // Reset stats
51 m_tp_received_count.store(0);
52 m_ta_made_count.store(0);
53 m_ta_sent_count.store(0);
55
56 m_running_flag.store(true);
57
58 inherited::start(args);
59
60 TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << "TPProcessor: Exiting start() method";
61}
62
63void
64TPProcessor::stop(const nlohmann::json& args)
65{
66 TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << "TPProcessor: Entering stop() method";
67
68 inherited::stop(args);
69 m_running_flag.store(false);
71
72 TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << "TPProcessor: Exiting stop() method";
73}
74
75void
77{
78 TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << "TPProcessor: Entering conf() method";
79
80 for (auto output : conf->get_outputs()) {
81 try {
82 if (output->get_data_type() == "TriggerActivity") {
84 }
85 } catch (const ers::Issue& excpt) {
86 ers::error(datahandlinglibs::ResourceQueueError(ERS_HERE, "ta", "DefaultRequestHandlerModel", excpt));
87 }
88 }
89
90 m_sourceid.id = conf->get_source_id();
92
93 std::vector<const appmodel::TAAlgorithm*> ta_algorithms;
94 auto dp = conf->get_module_configuration()->get_data_processor();
95 auto proc_conf = dp->cast<appmodel::TPDataProcessor>();
96 if (proc_conf != nullptr && m_post_processing_enabled) {
97 ta_algorithms = proc_conf->get_algorithms();
98 }
99
100 for (auto algo : ta_algorithms) {
101 TLOG() << "Selected TA algorithm: " << algo->UID() << " from class " << algo->class_name();
102 std::shared_ptr<triggeralgs::TriggerActivityMaker> maker = make_ta_maker(algo->class_name());
103 nlohmann::json algo_json = algo->to_json(true);
104
105 TLOG() << "Algo config:\n" << algo_json.dump();
106
107 maker->configure(algo_json[algo->UID()]);
108 inherited::add_postprocess_task(std::bind(&TPProcessor::find_ta, this, std::placeholders::_1, maker));
109 m_tams.push_back(maker);
110 }
111 m_latency_monitoring.store( dp->get_latency_monitoring() );
113
114 TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << "TPProcessor: Exiting conf() method";
115}
116
117void
118TPProcessor::scrap(const nlohmann::json& args)
119{
120 TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << "TPProcessor: Entering scrap() method";
121 m_tams.clear();
122 m_ta_sink.reset();
123 inherited::scrap(args);
124 TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << "TPProcessor: Exiting scrap() method";
125}
126
127void
129{
131
132 info.set_tp_received_count( m_tp_received_count.load() );
133 info.set_ta_made_count( m_ta_made_count.load() );
134 info.set_ta_sent_count( m_ta_sent_count.load() );
135 info.set_ta_failed_sent_count( m_ta_failed_sent_count.load() );
136
137 this->publish(std::move(info));
138
139 if ( m_latency_monitoring.load() && m_running_flag.load() ) {
140 opmon::TriggerLatency lat_info;
141
144
145 this->publish(std::move(lat_info));
146 }
147}
148
152void
153TPProcessor::find_ta(const TriggerPrimitiveTypeAdapter* tp, std::shared_ptr<triggeralgs::TriggerActivityMaker> taa)
154{
155 if (m_latency_monitoring.load()) m_latency_instance.update_latency_in( tp->tp.time_start ); // time_start or time_peak ?
157 std::vector<triggeralgs::TriggerActivity> tas;
158 taa->operator()(tp->tp, tas);
159
160 while (tas.size()) {
162 if (m_latency_monitoring.load()) m_latency_instance.update_latency_out( tas.back().time_start );
163 if (!m_ta_sink->try_send(std::move(tas.back()), iomanager::Sender::s_no_block)) {
166 } else {
168 }
169 tas.pop_back();
170 }
172 return;
173}
174
175void
177{
178 TLOG() << "TPProcessor opmon counters summary:";
179 TLOG() << "------------------------------";
180 TLOG() << "TPs received: \t\t" << m_tp_received_count;
181 TLOG() << "TAs made: \t\t" << m_ta_made_count;
182 TLOG() << "TAs sent: \t\t" << m_ta_sent_count;
183 TLOG() << "TAs failed to send: \t" << m_ta_failed_sent_count;
184 TLOG();
185}
186
187} // namespace fdreadoutlibs
188} // namespace dunedaq
#define ERS_HERE
#define DUNE_DAQ_TYPESTRING(Type, typestring)
static constexpr timeout_t s_no_block
Definition Sender.hpp:26
void publish(google::protobuf::Message &&, CustomOrigin &&co={}, OpMonLevel l=to_level(EntryOpMonLevel::kDefault)) const noexcept
void update_latency_out(uint64_t latency)
Definition Latency.hpp:43
latency get_latency_in() const
Definition Latency.hpp:48
latency get_latency_out() const
Definition Latency.hpp:53
void update_latency_in(uint64_t latency)
Definition Latency.hpp:38
dunedaq::trigger::Latency m_latency_instance
std::atomic< bool > m_latency_monitoring
void start(const nlohmann::json &args) override
Start operation.
daqdataformats::SourceID m_sourceid
std::vector< std::shared_ptr< triggeralgs::TriggerActivityMaker > > m_tams
std::atomic< metric_counter_type > m_ta_failed_sent_count
std::atomic< metric_counter_type > m_tp_received_count
std::atomic< metric_counter_type > m_ta_sent_count
void scrap(const nlohmann::json &args) override
Unconfigure.
void find_ta(const TriggerPrimitiveTypeAdapter *tp, std::shared_ptr< triggeralgs::TriggerActivityMaker > tam)
std::shared_ptr< iomanager::SenderConcept< triggeralgs::TriggerActivity > > m_ta_sink
std::atomic< metric_counter_type > m_ta_made_count
void stop(const nlohmann::json &args) override
Stop operation.
void conf(const appmodel::DataHandlerModule *conf) override
Set the emulator mode, if active, timestamps of processed packets are overwritten with new ones.
std::atomic< bool > m_running_flag
void generate_opmon_data() override
Base class for any user define issue.
Definition Issue.hpp:69
#define TLVL_ENTER_EXIT_METHODS
#define TLOG_DEBUG(lvl,...)
Definition Logging.hpp:112
#define TLOG(...)
Definition macro.hpp:22
std::unique_ptr< triggeralgs::TriggerActivityMaker > make_ta_maker(std::string const &plugin_name)
Load a TriggerActivityMaker plugin and return a unique_ptr to the contained class.
Including Qt Headers.
static std::shared_ptr< iomanager::SenderConcept< Datatype > > get_iom_sender(iomanager::ConnectionId const &id)
void warning(const Issue &issue)
Definition ers.hpp:115
void error(const Issue &issue)
Definition ers.hpp:81
Subsystem subsystem
The general subsystem of the source of the data.
Definition SourceID.hpp:69
ID_t id
Unique identifier of the source of the data.
Definition SourceID.hpp:74
static const constexpr daqdataformats::SourceID::Subsystem subsystem