Line data Source code
1 : /**
2 : * @file TAProcessor.hpp TPC TP specific Task based raw processor
3 : *
4 : * This is part of the DUNE DAQ , copyright 2023.
5 : * Licensing/copyright details are in the COPYING file that you should have
6 : * received with this code.
7 : */
8 : #include "trigger/TAProcessor.hpp" // NOLINT(build/include)
9 :
10 : //#include "appfwk/DAQModuleHelper.hpp"
11 : #include "iomanager/Sender.hpp"
12 : #include "logging/Logging.hpp"
13 :
14 : #include "datahandlinglibs/FrameErrorRegistry.hpp"
15 : #include "datahandlinglibs/DataHandlingIssues.hpp"
16 : #include "datahandlinglibs/ReadoutLogging.hpp"
17 : #include "datahandlinglibs/models/IterableQueueModel.hpp"
18 :
19 : //#include "detchannelmaps/TPCChannelMap.hpp"
20 :
21 : #include "trigger/TAWrapper.hpp"
22 : #include "triggeralgs/TriggerActivity.hpp"
23 :
24 : #include "trigger/AlgorithmPlugins.hpp"
25 : #include "triggeralgs/TriggerCandidateMaker.hpp"
26 :
27 : #include "appmodel/TADataProcessor.hpp"
28 : #include "appmodel/TCAlgorithm.hpp"
29 :
30 : using dunedaq::datahandlinglibs::logging::TLVL_BOOKKEEPING;
31 : using dunedaq::datahandlinglibs::logging::TLVL_TAKE_NOTE;
32 :
33 : // THIS SHOULDN'T BE HERE!!!!! But it is necessary.....
34 : DUNE_DAQ_TYPESTRING(dunedaq::trigger::TAWrapper, "TriggerActivity")
35 :
36 : namespace dunedaq {
37 : namespace trigger {
38 :
39 0 : TAProcessor::TAProcessor(std::unique_ptr<datahandlinglibs::FrameErrorRegistry>& error_registry, bool post_processing_enabled)
40 0 : : datahandlinglibs::TaskRawDataProcessorModel<TAWrapper>(error_registry, post_processing_enabled)
41 : {
42 0 : }
43 :
44 0 : TAProcessor::~TAProcessor()
45 0 : {}
46 :
47 : void
48 0 : TAProcessor::start(const appfwk::DAQModule::CommandData_t& args)
49 : {
50 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << "TAProcessor: Entering start() method";
51 :
52 : // Reset stats
53 0 : m_ta_received_count.store(0);
54 0 : m_tc_made_count.store(0);
55 0 : m_tc_sent_count.store(0);
56 0 : m_tc_failed_sent_count.store(0);
57 :
58 0 : m_running_flag.store(true);
59 :
60 0 : inherited::start(args);
61 :
62 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << "TAProcessor: Exiting start() method";
63 0 : }
64 :
65 : void
66 0 : TAProcessor::stop(const appfwk::DAQModule::CommandData_t& args)
67 : {
68 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << "TAProcessor: Entering stop() method";
69 :
70 0 : inherited::stop(args);
71 0 : m_running_flag.store(false);
72 0 : print_opmon_stats();
73 :
74 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << "TAProcessor: Exiting stop() method";
75 0 : }
76 :
77 : void
78 0 : TAProcessor::conf(const appmodel::DataHandlerModule* conf)
79 : {
80 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << "TAProcessor: Entering conf() method";
81 :
82 0 : for (auto output : conf->get_outputs()) {
83 0 : try {
84 0 : if (output->get_data_type() == "TriggerCandidate") {
85 0 : m_tc_sink = get_iom_sender<triggeralgs::TriggerCandidate>(output->UID());
86 : }
87 0 : } catch (const ers::Issue& excpt) {
88 0 : ers::error(datahandlinglibs::ResourceQueueError(ERS_HERE, "tc", "DefaultRequestHandlerModel", excpt));
89 0 : }
90 : }
91 :
92 0 : m_sourceid.id = conf->get_source_id();
93 0 : m_sourceid.subsystem = trigger::TAWrapper::subsystem;
94 0 : std::vector<const appmodel::TCAlgorithm*> tc_algorithms;
95 0 : auto dp = conf->get_module_configuration()->get_data_processor();
96 0 : auto proc_conf = dp->cast<appmodel::TADataProcessor>();
97 0 : if (proc_conf != nullptr && m_post_processing_enabled ) {
98 0 : tc_algorithms = proc_conf->get_algorithms();
99 : }
100 :
101 0 : for (auto algo : tc_algorithms) {
102 0 : TLOG() << "Selected TC algorithm: " << algo->UID();
103 0 : std::shared_ptr<triggeralgs::TriggerCandidateMaker> maker = make_tc_maker(algo->class_name());
104 0 : nlohmann::json algo_json = algo->to_json(true);
105 0 : maker->configure(algo_json[algo->UID()]);
106 0 : inherited::add_postprocess_task(std::bind(&TAProcessor::find_tc, this, std::placeholders::_1, maker));
107 0 : m_tcms.push_back(maker);
108 0 : }
109 0 : m_latency_monitoring.store( dp->get_latency_monitoring() );
110 0 : inherited::conf(conf);
111 :
112 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << "TAProcessor: Exiting conf() method";
113 0 : }
114 :
115 : void
116 0 : TAProcessor::scrap(const appfwk::DAQModule::CommandData_t& args)
117 : {
118 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << "TAProcessor: Entering scrap() method";
119 0 : m_tcms.clear();
120 0 : m_tc_sink.reset();
121 0 : inherited::scrap(args);
122 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << "TAProcessor: Exiting scrap() method";
123 0 : }
124 :
125 : void
126 0 : TAProcessor::generate_opmon_data()
127 : {
128 0 : opmon::TAProcessorInfo info;
129 :
130 0 : info.set_ta_received_count( m_ta_received_count.load() );
131 0 : info.set_tc_made_count( m_tc_made_count.load() );
132 0 : info.set_tc_sent_count( m_tc_sent_count.load() );
133 0 : info.set_tc_failed_sent_count( m_tc_failed_sent_count.load() );
134 :
135 0 : this->publish(std::move(info));
136 :
137 0 : if ( m_latency_monitoring.load() && m_running_flag.load() ) {
138 0 : opmon::TriggerLatency lat_info;
139 :
140 0 : lat_info.set_latency_in( m_latency_instance.get_latency_in() );
141 0 : lat_info.set_latency_out( m_latency_instance.get_latency_out() );
142 :
143 0 : this->publish(std::move(lat_info));
144 0 : }
145 0 : }
146 :
147 : /**
148 : * Pipeline Stage 2.: Do software TPG
149 : * */
150 : void
151 0 : TAProcessor::find_tc(const TAWrapper* ta, std::shared_ptr<triggeralgs::TriggerCandidateMaker> tca)
152 : {
153 : //time_activity gave 0 :/
154 0 : if (m_latency_monitoring.load()) m_latency_instance.update_latency_in( ta->activity.time_start );
155 0 : m_ta_received_count++;
156 0 : std::vector<triggeralgs::TriggerCandidate> tcs;
157 0 : tca->operator()(ta->activity, tcs);
158 0 : for (auto tc : tcs) {
159 0 : m_tc_made_count++;
160 0 : if (m_latency_monitoring.load()) m_latency_instance.update_latency_out( tc.time_candidate );
161 0 : if(!m_tc_sink->try_send(std::move(tc), iomanager::Sender::s_no_block)) {
162 0 : ers::warning(TCDropped(ERS_HERE, tc.time_start, m_sourceid.id));
163 0 : m_tc_failed_sent_count++;
164 : } else {
165 0 : m_tc_sent_count++;
166 : }
167 0 : }
168 0 : m_last_processed_daq_ts = ta->activity.time_start;
169 0 : return;
170 0 : }
171 :
172 : void
173 0 : TAProcessor::print_opmon_stats()
174 : {
175 0 : TLOG() << "TAProcessor opmon counters summary:";
176 0 : TLOG() << "------------------------------";
177 0 : TLOG() << "TAs received: \t\t" << m_ta_received_count;
178 0 : TLOG() << "TCs made: \t\t" << m_tc_made_count;
179 0 : TLOG() << "TCs sent: \t\t" << m_tc_sent_count;
180 0 : TLOG() << "TCs failed to send: \t" << m_tc_failed_sent_count;
181 0 : TLOG();
182 0 : }
183 :
184 : } // namespace fdreadoutlibs
185 : } // namespace dunedaq
|