Line data Source code
1 : /**
2 : * @file TPProcessor.hpp TPC TP specific Task based raw processor
3 : *
4 : * This is part of the DUNE DAQ , copyright 2023.
5 : * Licensing/copyright details are in the COPYING file that you should have
6 : * received with this code.
7 : */
8 : #include "trigger/TPProcessor.hpp" // NOLINT(build/include)
9 : #include "trigger/Issues.hpp" // NOLINT(build/include)
10 :
11 : #include "iomanager/Sender.hpp"
12 : #include "logging/Logging.hpp"
13 :
14 : #include "datahandlinglibs/FrameErrorRegistry.hpp"
15 : #include "datahandlinglibs/DataHandlingIssues.hpp"
16 : #include "datahandlinglibs/ReadoutLogging.hpp"
17 : #include "datahandlinglibs/models/IterableQueueModel.hpp"
18 :
19 : #include "triggeralgs/TriggerActivity.hpp"
20 :
21 : #include "trigger/AlgorithmPlugins.hpp"
22 : #include "triggeralgs/TriggerActivityMaker.hpp"
23 :
24 : #include "appmodel/TPDataProcessor.hpp"
25 : #include "appmodel/TAAlgorithm.hpp"
26 :
27 : using dunedaq::datahandlinglibs::logging::TLVL_BOOKKEEPING;
28 : using dunedaq::datahandlinglibs::logging::TLVL_TAKE_NOTE;
29 :
30 : // THIS SHOULDN'T BE HERE!!!!! But it is necessary.....
31 : DUNE_DAQ_TYPESTRING(dunedaq::trigger::TriggerPrimitiveTypeAdapter, "TriggerPrimitive")
32 : DUNE_DAQ_TYPESTRING(std::vector<dunedaq::trigger::TriggerPrimitiveTypeAdapter>, "TriggerPrimitiveVector")
33 :
34 : namespace dunedaq {
35 : namespace trigger {
36 :
37 0 : TPProcessor::TPProcessor(std::unique_ptr<datahandlinglibs::FrameErrorRegistry>& error_registry, bool post_processing_enabled)
38 0 : : TaskRawDataProcessorModel<TriggerPrimitiveTypeAdapter>(error_registry, post_processing_enabled)
39 : {
40 0 : }
41 :
42 0 : TPProcessor::~TPProcessor()
43 0 : {}
44 :
45 : void
46 0 : TPProcessor::start(const appfwk::DAQModule::CommandData_t& args)
47 : {
48 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << "TPProcessor: Entering start() method";
49 :
50 : // Reset stats
51 0 : m_tp_received_count.store(0);
52 0 : m_ta_made_count.store(0);
53 0 : m_ta_sent_count.store(0);
54 0 : m_ta_failed_sent_count.store(0);
55 :
56 0 : m_running_flag.store(true);
57 :
58 0 : inherited::start(args);
59 :
60 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << "TPProcessor: Exiting start() method";
61 0 : }
62 :
63 : void
64 0 : TPProcessor::stop(const appfwk::DAQModule::CommandData_t& args)
65 : {
66 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << "TPProcessor: Entering stop() method";
67 :
68 0 : inherited::stop(args);
69 0 : m_running_flag.store(false);
70 0 : print_opmon_stats();
71 :
72 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << "TPProcessor: Exiting stop() method";
73 0 : }
74 :
75 : void
76 0 : TPProcessor::conf(const appmodel::DataHandlerModule* conf)
77 : {
78 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << "TPProcessor: Entering conf() method";
79 :
80 0 : for (auto output : conf->get_outputs()) {
81 0 : try {
82 0 : if (output->get_data_type() == "TriggerActivity") {
83 0 : m_ta_sink = get_iom_sender<triggeralgs::TriggerActivity>(output->UID());
84 : }
85 0 : } catch (const ers::Issue& excpt) {
86 0 : ers::error(datahandlinglibs::ResourceQueueError(ERS_HERE, "ta", "DefaultRequestHandlerModel", excpt));
87 0 : }
88 : }
89 :
90 0 : m_sourceid.id = conf->get_source_id();
91 0 : m_sourceid.subsystem = TriggerPrimitiveTypeAdapter::subsystem;
92 :
93 0 : std::vector<const appmodel::TAAlgorithm*> ta_algorithms;
94 0 : auto dp = conf->get_module_configuration()->get_data_processor();
95 0 : auto proc_conf = dp->cast<appmodel::TPDataProcessor>();
96 0 : if (proc_conf != nullptr && m_post_processing_enabled) {
97 0 : ta_algorithms = proc_conf->get_algorithms();
98 : }
99 :
100 0 : for (auto algo : ta_algorithms) {
101 0 : TLOG() << "Selected TA algorithm: " << algo->UID() << " from class " << algo->class_name();
102 0 : std::shared_ptr<triggeralgs::TriggerActivityMaker> maker = make_ta_maker(algo->class_name());
103 0 : nlohmann::json algo_json = algo->to_json(true);
104 :
105 0 : TLOG() << "Algo config:\n" << algo_json.dump();
106 :
107 0 : maker->configure(algo_json[algo->UID()]);
108 0 : inherited::add_postprocess_task(std::bind(&TPProcessor::find_ta, this, std::placeholders::_1, maker));
109 0 : m_tams.push_back(maker);
110 0 : }
111 0 : m_latency_monitoring.store( dp->get_latency_monitoring() );
112 0 : inherited::conf(conf);
113 :
114 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << "TPProcessor: Exiting conf() method";
115 0 : }
116 :
117 : void
118 0 : TPProcessor::scrap(const appfwk::DAQModule::CommandData_t& args)
119 : {
120 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << "TPProcessor: Entering scrap() method";
121 0 : m_tams.clear();
122 0 : m_ta_sink.reset();
123 0 : inherited::scrap(args);
124 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << "TPProcessor: Exiting scrap() method";
125 0 : }
126 :
127 : void
128 0 : TPProcessor::generate_opmon_data()
129 : {
130 0 : opmon::TPProcessorInfo info;
131 :
132 0 : info.set_tp_received_count( m_tp_received_count.load() );
133 0 : info.set_ta_made_count( m_ta_made_count.load() );
134 0 : info.set_ta_sent_count( m_ta_sent_count.load() );
135 0 : info.set_ta_failed_sent_count( m_ta_failed_sent_count.load() );
136 :
137 0 : this->publish(std::move(info));
138 :
139 0 : if ( m_latency_monitoring.load() && m_running_flag.load() ) {
140 0 : opmon::TriggerLatency lat_info;
141 :
142 0 : lat_info.set_latency_in( m_latency_instance.get_latency_in() );
143 0 : lat_info.set_latency_out( m_latency_instance.get_latency_out() );
144 :
145 0 : this->publish(std::move(lat_info));
146 0 : }
147 0 : }
148 :
149 : /**
150 : * Pipeline Stage 2.: Do software TPG
151 : * */
152 : void
153 0 : TPProcessor::find_ta(const TriggerPrimitiveTypeAdapter* tp, std::shared_ptr<triggeralgs::TriggerActivityMaker> taa)
154 : {
155 0 : if (m_latency_monitoring.load()) m_latency_instance.update_latency_in( tp->tp.time_start ); // time_start or time_peak ?
156 0 : m_tp_received_count++;
157 0 : std::vector<triggeralgs::TriggerActivity> tas;
158 0 : taa->operator()(tp->tp, tas);
159 :
160 0 : while (tas.size()) {
161 0 : m_ta_made_count++;
162 0 : if (m_latency_monitoring.load()) m_latency_instance.update_latency_out( tas.back().time_start );
163 0 : if (!m_ta_sink->try_send(std::move(tas.back()), iomanager::Sender::s_no_block)) {
164 0 : ers::warning(TADropped(ERS_HERE, tp->tp.time_start, m_sourceid.id));
165 0 : m_ta_failed_sent_count++;
166 : } else {
167 0 : m_ta_sent_count++;
168 : }
169 0 : tas.pop_back();
170 : }
171 0 : m_last_processed_daq_ts = tp->tp.time_start;
172 0 : return;
173 0 : }
174 :
175 : void
176 0 : TPProcessor::print_opmon_stats()
177 : {
178 0 : TLOG() << "TPProcessor opmon counters summary:";
179 0 : TLOG() << "------------------------------";
180 0 : TLOG() << "TPs received: \t\t" << m_tp_received_count;
181 0 : TLOG() << "TAs made: \t\t" << m_ta_made_count;
182 0 : TLOG() << "TAs sent: \t\t" << m_ta_sent_count;
183 0 : TLOG() << "TAs failed to send: \t" << m_ta_failed_sent_count;
184 0 : TLOG();
185 0 : }
186 :
187 : } // namespace fdreadoutlibs
188 : } // namespace dunedaq
|