Line data Source code
1 : /**
2 : * @file MLTModule.cpp MLTModule class
3 : * implementation
4 : *
5 : * This is part of the DUNE DAQ Software Suite, copyright 2020.
6 : * Licensing/copyright details are in the COPYING file that you should have
7 : * received with this code.
8 : */
9 :
10 : /**
11 : * TODO: get_group_links
12 : * TODO: get_mandatory_links
13 : */
14 :
15 : #include "MLTModule.hpp"
16 :
17 : #include "trigger/Issues.hpp"
18 : #include "trigger/LivetimeCounter.hpp"
19 :
20 : #include "daqdataformats/ComponentRequest.hpp"
21 : #include "dfmessages/TriggerDecision.hpp"
22 : #include "dfmessages/TriggerInhibit.hpp"
23 : #include "dfmessages/Types.hpp"
24 : #include "iomanager/IOManager.hpp"
25 : #include "logging/Logging.hpp"
26 : #include "trgdataformats/Types.hpp"
27 :
28 : namespace dunedaq {
29 : namespace trigger {
30 :
31 0 : MLTModule::MLTModule(const std::string& name)
32 : : DAQModule(name)
33 0 : , m_last_trigger_number(1)
34 0 : , m_run_number(0)
35 : {
36 : // clang-format off
37 0 : register_command("conf", &MLTModule::do_configure);
38 0 : register_command("start", &MLTModule::do_start);
39 0 : register_command("stop", &MLTModule::do_stop);
40 0 : register_command("disable_triggers", &MLTModule::do_pause);
41 0 : register_command("enable_triggers", &MLTModule::do_resume);
42 0 : register_command("scrap", &MLTModule::do_scrap);
43 : // clang-format on
44 0 : }
45 :
46 : std::map<std::string, int>
47 0 : MLTModule::decode_geoid(uint64_t _geoid_int)
48 : {
49 :
50 0 : std::map<std::string, int> geoid;
51 :
52 : // Extract stream_id (stored in the top 16 bits)
53 0 : geoid["stream_id"] = (_geoid_int >> 48) & 0xFFFF;
54 :
55 : // Extract slot_id (stored in the next 16 bits)
56 0 : geoid["slot_id"] = (_geoid_int >> 32) & 0xFFFF;
57 :
58 : // Extract crate_id (stored in the next 16 bits)
59 0 : geoid["crate_id"] = (_geoid_int >> 16) & 0xFFFF;
60 :
61 : // Extract det_id (stored in the lowest 16 bits)
62 0 : geoid["detector_id"] = _geoid_int & 0xFFFF;
63 :
64 0 : return geoid;
65 0 : }
66 :
67 : void
68 0 : MLTModule::init(std::shared_ptr<appfwk::ConfigurationManager> mcfg)
69 : {
70 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering init() method";
71 :
72 0 : m_mtrg = mcfg->get_dal<appmodel::MLTModule>(get_name());
73 : // Get the session to access the detector configuration
74 0 : m_session = mcfg->get_session();
75 :
76 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting init() method";
77 0 : }
78 :
79 : void
80 0 : MLTModule::do_configure(const CommandData_t& /*obj*/)
81 : {
82 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering conf() method";
83 :
84 : // Get the inputs
85 0 : for (auto con : m_mtrg->get_inputs()) {
86 0 : if (con->get_data_type() == datatype_to_string<dfmessages::TriggerDecision>()) {
87 0 : m_decision_input = get_iom_receiver<dfmessages::TriggerDecision>(con->UID());
88 0 : } else if (con->get_data_type() == datatype_to_string<dfmessages::TriggerInhibit>()) {
89 0 : m_inhibit_input = get_iom_receiver<dfmessages::TriggerInhibit>(con->UID());
90 : }
91 : }
92 :
93 : // Get the outputs
94 0 : for (auto con : m_mtrg->get_outputs()) {
95 0 : if (con->get_data_type() == datatype_to_string<dfmessages::TriggerDecision>())
96 0 : m_decision_output = get_iom_sender<dfmessages::TriggerDecision>(con->UID());
97 : }
98 :
99 0 : hdf5libs::HDF5SourceIDHandler::source_id_geo_id_map_t geoidmap =
100 0 : hdf5libs::HDF5SourceIDHandler::make_source_id_geo_id_map(m_session);
101 : // Fill the SourceID -- Subdetector map
102 0 : for (auto const& [sourceid, geoids] : geoidmap) {
103 0 : TLOG() << "SourceID: " << sourceid;
104 0 : for (auto const& geoid : geoids) {
105 0 : std::map<std::string, int> gidmap = decode_geoid(geoid);
106 0 : SubdetectorID detid = static_cast<SubdetectorID>(gidmap["detector_id"]);
107 0 : if (m_srcid_detid_map.contains(sourceid) && !(m_srcid_detid_map[sourceid] == detid)) {
108 0 : throw MLTConfigurationProblem(
109 0 : ERS_HERE, get_name(), "Multiple subdetector types for one SourceID not supported in trigger system!");
110 : }
111 0 : m_srcid_detid_map[sourceid] = detid;
112 0 : }
113 0 : TLOG() << " * Subdetector type: " << m_srcid_detid_map[sourceid];
114 : }
115 :
116 0 : for (auto subdet_readout_window : m_mtrg->get_configuration()->get_subdetector_readout_map()) {
117 0 : std::string subdetector_name = subdet_readout_window->get_subdetector();
118 0 : SubdetectorID detid = dunedaq::detdataformats::DetID::string_to_subdetector(subdetector_name);
119 0 : if (detid == detdataformats::DetID::Subdetector::kUnknown) {
120 0 : throw MLTConfigurationProblem(
121 0 : ERS_HERE, get_name(), "Unknown Subdetector supplied to MLT subdetector-readout window map");
122 : }
123 :
124 0 : if (m_subdetector_readout_window_map.count(detid)) {
125 0 : throw MLTConfigurationProblem(
126 0 : ERS_HERE,
127 0 : get_name(),
128 0 : "Supplied more than one of the same Subdetector name to MLT subdetector-readout window map");
129 : }
130 :
131 0 : m_subdetector_readout_window_map[detid] =
132 0 : std::make_pair(subdet_readout_window->get_time_before(), subdet_readout_window->get_time_after());
133 :
134 0 : TLOG() << "[MLT] Custom readout map for subdetector: " << detid
135 0 : << " time_start: " << subdet_readout_window->get_time_before()
136 0 : << " time_after: " << subdet_readout_window->get_time_after();
137 0 : }
138 :
139 : // Latency related
140 0 : m_latency_monitoring.store(m_mtrg->get_configuration()->get_latency_monitoring());
141 :
142 : // Now do the configuration: dummy for now
143 0 : m_configured_flag.store(true);
144 :
145 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting conf() method";
146 0 : }
147 :
148 : void
149 0 : MLTModule::do_scrap(const CommandData_t& /*obj*/)
150 : {
151 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering scrap() method";
152 :
153 0 : m_decision_input.reset();
154 0 : m_decision_output.reset();
155 0 : m_inhibit_input.reset();
156 :
157 0 : m_srcid_detid_map.clear();
158 0 : m_subdetector_readout_window_map.clear();
159 0 : m_trigger_counters.clear();
160 :
161 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting scrap() method";
162 0 : }
163 :
164 : void
165 0 : MLTModule::generate_opmon_data()
166 : {
167 0 : opmon::ModuleLevelTriggerInfo info;
168 :
169 0 : info.set_td_msg_received_count(m_td_msg_received_count.load());
170 0 : info.set_td_sent_count(m_td_sent_count.load());
171 0 : info.set_td_inhibited_count(m_td_inhibited_count.load());
172 0 : info.set_td_paused_count(m_td_paused_count.load());
173 0 : info.set_td_queue_timeout_expired_err_count(m_td_queue_timeout_expired_err_count.load());
174 0 : info.set_td_total_count(m_td_total_count.load());
175 :
176 0 : if (m_lc_started) {
177 0 : info.set_lc_klive(m_livetime_counter->get_time(LivetimeCounter::State::kLive));
178 0 : info.set_lc_kpaused(m_livetime_counter->get_time(LivetimeCounter::State::kPaused));
179 0 : info.set_lc_kdead(m_livetime_counter->get_time(LivetimeCounter::State::kDead));
180 : } else {
181 0 : info.set_lc_klive(m_lc_kLive);
182 0 : info.set_lc_kpaused(m_lc_kPaused);
183 0 : info.set_lc_kdead(m_lc_kDead);
184 : }
185 :
186 0 : this->publish(std::move(info));
187 :
188 : // per TC type
189 0 : std::lock_guard<std::mutex> guard(m_trigger_mutex);
190 0 : for (auto& [type, counts] : m_trigger_counters) {
191 0 : auto name = dunedaq::trgdataformats::get_trigger_candidate_type_names()[type];
192 0 : opmon::TriggerDecisionInfo td_info;
193 0 : td_info.set_received(counts.received.exchange(0));
194 0 : td_info.set_sent(counts.sent.exchange(0));
195 0 : td_info.set_failed_send(counts.failed_send.exchange(0));
196 0 : td_info.set_paused(counts.paused.exchange(0));
197 0 : td_info.set_inhibited(counts.inhibited.exchange(0));
198 0 : this->publish(std::move(td_info), { { "type", name } });
199 0 : }
200 :
201 : // latency
202 0 : if (m_latency_monitoring.load() && m_running_flag.load()) {
203 : // TC in, TD out
204 0 : opmon::TriggerLatency lat_info;
205 0 : lat_info.set_latency_in(m_latency_instance.get_latency_in());
206 0 : lat_info.set_latency_out(m_latency_instance.get_latency_out());
207 0 : this->publish(std::move(lat_info));
208 :
209 : // vs readout window requests
210 0 : opmon::ModuleLevelTriggerRequestLatency lat_request_info;
211 0 : lat_request_info.set_latency_window_start(m_latency_requests_instance.get_latency_in());
212 0 : lat_request_info.set_latency_window_end(m_latency_requests_instance.get_latency_out());
213 0 : this->publish(std::move(lat_request_info));
214 0 : }
215 0 : }
216 :
217 : void
218 0 : MLTModule::do_start(const CommandData_t& startobj)
219 : {
220 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering start() method";
221 :
222 0 : m_run_number = startobj.value<dunedaq::daqdataformats::run_number_t>("run", 0);
223 : // We get here at start of run, so reset the trigger number
224 0 : m_last_trigger_number = 1;
225 :
226 : // OpMon.
227 0 : m_td_msg_received_count.store(0);
228 0 : m_td_sent_count.store(0);
229 0 : m_td_total_count.store(0);
230 : // OpMon DFO
231 0 : m_td_inhibited_count.store(0);
232 0 : m_td_paused_count.store(0);
233 0 : m_td_queue_timeout_expired_err_count.store(0);
234 : // OpMon Livetime counter
235 0 : m_lc_kLive.store(0);
236 0 : m_lc_kPaused.store(0);
237 0 : m_lc_kDead.store(0);
238 :
239 0 : m_paused.store(true);
240 0 : m_running_flag.store(true);
241 0 : m_dfo_is_busy.store(false);
242 :
243 0 : m_livetime_counter.reset(new LivetimeCounter(LivetimeCounter::State::kPaused));
244 0 : m_lc_started = true;
245 :
246 0 : m_inhibit_input->add_callback(std::bind(&MLTModule::dfo_busy_callback, this, std::placeholders::_1));
247 0 : m_decision_input->add_callback(std::bind(&MLTModule::trigger_decisions_callback, this, std::placeholders::_1));
248 :
249 0 : ers::info(TriggerStartOfRun(ERS_HERE, m_run_number));
250 :
251 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting start() method";
252 0 : }
253 :
254 : void
255 0 : MLTModule::do_stop(const CommandData_t& /*stopobj*/)
256 : {
257 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering stop() method";
258 :
259 0 : m_running_flag.store(false);
260 0 : m_decision_input->remove_callback();
261 0 : m_inhibit_input->remove_callback();
262 : // m_send_trigger_decisions_thread.join();
263 0 : m_lc_kLive_count = m_livetime_counter->get_time(LivetimeCounter::State::kLive);
264 0 : m_lc_kPaused_count = m_livetime_counter->get_time(LivetimeCounter::State::kPaused);
265 0 : m_lc_kDead_count = m_livetime_counter->get_time(LivetimeCounter::State::kDead);
266 0 : m_lc_kLive = m_lc_kLive_count;
267 0 : m_lc_kPaused = m_lc_kPaused_count;
268 0 : m_lc_kDead = m_lc_kDead_count;
269 :
270 0 : m_lc_deadtime = m_livetime_counter->get_time(LivetimeCounter::State::kDead) +
271 0 : m_livetime_counter->get_time(LivetimeCounter::State::kPaused);
272 :
273 0 : TLOG(3) << "LivetimeCounter - total deadtime+paused: " << m_lc_deadtime << std::endl;
274 0 : m_livetime_counter.reset(); // Calls LivetimeCounter dtor?
275 0 : m_lc_started = false;
276 :
277 0 : print_opmon_stats();
278 :
279 0 : ers::info(TriggerEndOfRun(ERS_HERE, m_run_number));
280 :
281 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting stop() method";
282 0 : }
283 :
284 : void
285 0 : MLTModule::do_pause(const CommandData_t& /*pauseobj*/)
286 : {
287 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering pause() method";
288 :
289 0 : m_paused.store(true);
290 0 : m_livetime_counter->set_state(LivetimeCounter::State::kPaused);
291 0 : TLOG() << "******* Triggers PAUSED! in run " << m_run_number << " *********";
292 0 : ers::info(TriggerPaused(ERS_HERE));
293 0 : TLOG_DEBUG(5) << "TS End: "
294 0 : << std::chrono::duration_cast<std::chrono::microseconds>(
295 0 : std::chrono::system_clock::now().time_since_epoch())
296 0 : .count();
297 :
298 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting pause() method";
299 0 : }
300 :
301 : void
302 0 : MLTModule::do_resume(const CommandData_t& /*resumeobj*/)
303 : {
304 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering resume() method";
305 :
306 0 : ers::info(TriggerActive(ERS_HERE));
307 0 : TLOG() << "******* Triggers RESUMED! in run " << m_run_number << " *********";
308 0 : m_livetime_counter->set_state(LivetimeCounter::State::kLive);
309 0 : m_lc_started = true;
310 0 : m_paused.store(false);
311 0 : TLOG_DEBUG(5) << "TS Start: "
312 0 : << std::chrono::duration_cast<std::chrono::microseconds>(
313 0 : std::chrono::system_clock::now().time_since_epoch())
314 0 : .count();
315 :
316 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting resume() method";
317 0 : }
318 :
319 : void
320 0 : MLTModule::trigger_decisions_callback(dfmessages::TriggerDecision& decision)
321 : {
322 0 : m_td_msg_received_count++;
323 0 : if (m_latency_monitoring.load())
324 0 : m_latency_instance.update_latency_in(decision.trigger_timestamp);
325 :
326 0 : auto trigger_types = unpack_types(decision.trigger_type);
327 0 : for (const auto t : trigger_types) {
328 0 : ++get_trigger_counter(t).received;
329 : }
330 :
331 0 : auto ts = decision.trigger_timestamp;
332 0 : auto tt = decision.trigger_type;
333 0 : decision.run_number = m_run_number;
334 0 : decision.trigger_number = m_last_trigger_number;
335 :
336 : // Overwrite the component's readout window if we have custom
337 : // subdetector--readout window map
338 0 : for (const auto& [subdetectorid, window] : m_subdetector_readout_window_map) {
339 0 : for (auto& request : decision.components) {
340 0 : if (request.component.subsystem != daqdataformats::SourceID::Subsystem::kDetectorReadout) {
341 0 : continue;
342 : }
343 :
344 0 : if (subdetectorid != m_srcid_detid_map[request.component]) {
345 0 : continue;
346 : }
347 :
348 0 : request.window_begin = decision.trigger_timestamp - window.first;
349 0 : request.window_end = decision.trigger_timestamp + window.second;
350 : }
351 : }
352 :
353 0 : TLOG_DEBUG(2) << "Received decision with timestamp " << decision.trigger_timestamp;
354 :
355 0 : if ((!m_paused.load() && !m_dfo_is_busy.load())) {
356 0 : TLOG_DEBUG(1) << "Sending a decision with triggernumber " << decision.trigger_number << " timestamp "
357 0 : << decision.trigger_timestamp << " start " << decision.components.front().window_begin << " end "
358 0 : << decision.components.front().window_end << " number of links " << decision.components.size();
359 :
360 : // readout window latency update
361 : // TODO: The latency will be different for different components, since they might have different readout windows
362 0 : if (m_latency_monitoring.load()) {
363 0 : m_latency_requests_instance.update_latency_in(decision.components.front().window_begin);
364 0 : m_latency_requests_instance.update_latency_out(decision.components.front().window_end);
365 : }
366 :
367 0 : try {
368 0 : m_decision_output->send(std::move(decision), std::chrono::milliseconds(1));
369 0 : m_td_sent_count++;
370 :
371 0 : for (const auto t : trigger_types) {
372 0 : ++get_trigger_counter(t).sent;
373 : }
374 :
375 0 : m_last_trigger_number++;
376 : // add_td(pending_td);
377 0 : } catch (const ers::Issue& e) {
378 0 : ers::error(e);
379 0 : TLOG_DEBUG(1) << "The network is misbehaving: TD send failed for " << m_last_trigger_number;
380 0 : m_td_queue_timeout_expired_err_count++;
381 :
382 0 : for (const auto t : trigger_types) {
383 0 : ++get_trigger_counter(t).failed_send;
384 : }
385 0 : }
386 :
387 0 : } else if (m_paused.load()) {
388 0 : ++m_td_paused_count;
389 0 : for (const auto t : trigger_types) {
390 0 : ++get_trigger_counter(t).paused;
391 : }
392 :
393 0 : TLOG_DEBUG(1) << "Triggers are paused. Not sending a TriggerDecision for TD with timestamp and type " << ts << "/"
394 0 : << tt;
395 : } else {
396 0 : ers::warning(TriggerInhibited(ERS_HERE, m_run_number));
397 0 : TLOG_DEBUG(1) << "The DFO is busy. Not sending a TriggerDecision with timestamp and type " << ts << "/" << tt;
398 0 : m_td_inhibited_count++;
399 0 : for (const auto t : trigger_types) {
400 0 : ++get_trigger_counter(t).inhibited;
401 : }
402 : }
403 0 : if (m_latency_monitoring.load())
404 0 : m_latency_instance.update_latency_out(decision.trigger_timestamp);
405 0 : m_td_total_count++;
406 0 : }
407 :
408 : void
409 0 : MLTModule::dfo_busy_callback(dfmessages::TriggerInhibit& inhibit)
410 : {
411 0 : TLOG_DEBUG(17) << "Received inhibit message with busy status " << inhibit.busy << " and run number "
412 0 : << inhibit.run_number;
413 0 : if (inhibit.run_number == m_run_number) {
414 0 : TLOG_DEBUG(18) << "Changing our flag for the DFO busy state from " << m_dfo_is_busy.load() << " to "
415 0 : << inhibit.busy;
416 0 : m_dfo_is_busy = inhibit.busy;
417 0 : LivetimeCounter::State state = (inhibit.busy) ? LivetimeCounter::State::kDead : LivetimeCounter::State::kLive;
418 0 : m_livetime_counter->set_state(state);
419 : }
420 0 : }
421 :
422 : void
423 0 : MLTModule::print_opmon_stats()
424 : {
425 0 : TLOG() << "MLT opmon counters summary:";
426 0 : TLOG() << "------------------------------";
427 0 : TLOG() << "Received TD messages: \t\t" << m_td_msg_received_count;
428 0 : TLOG() << "Sent TDs: \t\t\t" << m_td_sent_count;
429 0 : TLOG() << "Inhibited TDs: \t\t" << m_td_inhibited_count;
430 0 : TLOG() << "Paused TDs: \t\t\t" << m_td_paused_count;
431 0 : TLOG() << "Queue timeout TDs: \t\t" << m_td_queue_timeout_expired_err_count;
432 0 : TLOG() << "Total TDs: \t\t\t" << m_td_total_count;
433 0 : TLOG() << "------------------------------";
434 0 : TLOG() << "Livetime::Live: \t" << m_lc_kLive;
435 0 : TLOG() << "Livetime::Paused: \t" << m_lc_kPaused;
436 0 : TLOG() << "Livetime::Dead: \t" << m_lc_kDead;
437 0 : TLOG();
438 0 : }
439 :
440 : } // namespace trigger
441 : } // namespace dunedaq
442 :
443 0 : DEFINE_DUNE_DAQ_MODULE(dunedaq::trigger::MLTModule)
444 :
445 : // Local Variables:
446 : // c-basic-offset: 2
447 : // End:
|