Line data Source code
1 : /**
2 : * @file TCProcessor.hpp TPC TP specific Task based raw processor
3 : *
4 : * This is part of the DUNE DAQ , copyright 2023.
5 : * Licensing/copyright details are in the COPYING file that you should have
6 : * received with this code.
7 : */
8 : #include "trigger/TCProcessor.hpp" // NOLINT(build/include)
9 :
10 : #include "iomanager/Sender.hpp"
11 : #include "logging/Logging.hpp"
12 :
13 : #include "datahandlinglibs/FrameErrorRegistry.hpp"
14 : #include "datahandlinglibs/DataHandlingIssues.hpp"
15 : #include "datahandlinglibs/ReadoutLogging.hpp"
16 : #include "datahandlinglibs/models/IterableQueueModel.hpp"
17 : #include "trigger/TCWrapper.hpp"
18 : #include "triggeralgs/TriggerCandidate.hpp"
19 :
20 : #include "appmodel/TCDataProcessor.hpp"
21 : #include "appmodel/TriggerDataHandlerModule.hpp"
22 :
23 : using dunedaq::datahandlinglibs::logging::TLVL_BOOKKEEPING;
24 : using dunedaq::datahandlinglibs::logging::TLVL_TAKE_NOTE;
25 :
26 : // THIS SHOULDN'T BE HERE!!!!! But it is necessary.....
27 : DUNE_DAQ_TYPESTRING(dunedaq::trigger::TCWrapper, "TriggerCandidate")
28 :
29 : namespace dunedaq {
30 : namespace trigger {
31 :
32 0 : TCProcessor::TCProcessor(std::unique_ptr<datahandlinglibs::FrameErrorRegistry>& error_registry, bool post_processing_enabled)
33 0 : : datahandlinglibs::TaskRawDataProcessorModel<TCWrapper>(error_registry, post_processing_enabled)
34 : {
35 0 : }
36 :
37 0 : TCProcessor::~TCProcessor()
38 0 : {}
39 :
40 : void
41 0 : TCProcessor::start(const appfwk::DAQModule::CommandData_t& args)
42 : {
43 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << "TCProcessor: Entering start() method";
44 :
45 0 : m_running_flag.store(true);
46 0 : m_send_trigger_decisions_thread = std::thread(&TCProcessor::send_trigger_decisions, this);
47 0 : pthread_setname_np(m_send_trigger_decisions_thread.native_handle(), "mlt-dec"); // TODO: originally mlt-trig-dec
48 :
49 : // Reset stats
50 0 : m_tds_created_count.store(0);
51 0 : m_tds_sent_count.store(0);
52 0 : m_tds_dropped_count.store(0);
53 0 : m_tds_failed_bitword_count.store(0);
54 0 : m_tds_cleared_count.store(0);
55 : // per TC
56 0 : m_tc_received_count.store(0);
57 0 : m_tds_created_tc_count.store(0);
58 0 : m_tds_sent_tc_count.store(0);
59 0 : m_tds_dropped_tc_count.store(0);
60 0 : m_tds_failed_bitword_tc_count.store(0);
61 0 : m_tds_cleared_tc_count.store(0);
62 0 : m_tc_ignored_count.store(0);
63 0 : inherited::start(args);
64 :
65 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << "TCProcessor: Exiting start() method";
66 0 : }
67 :
68 : void
69 0 : TCProcessor::stop(const appfwk::DAQModule::CommandData_t& args)
70 : {
71 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << "TCProcessor: Entering stop() method";
72 :
73 0 : inherited::stop(args);
74 0 : m_running_flag.store(false);
75 :
76 : // Make sure condition_variable knows we flipped running flag
77 0 : {
78 0 : std::lock_guard<std::mutex> lock(m_td_vector_mutex);
79 0 : m_cv.notify_all();
80 0 : }
81 :
82 : // Wait for the TD-sending thread to stop
83 0 : m_send_trigger_decisions_thread.join();
84 :
85 : // Drop all TDs in vectors at run stage change. Have to do this
86 : // after joining m_send_trigger_decisions_thread so we don't
87 : // concurrently access the vectors
88 0 : clear_td_vectors();
89 :
90 0 : print_opmon_stats();
91 :
92 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << "TCProcessor: Exiting stop() method";
93 0 : }
94 :
95 : void
96 0 : TCProcessor::conf(const appmodel::DataHandlerModule* cfg)
97 : {
98 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << "TCProcessor: Entering conf() method";
99 :
100 0 : auto mtrg = cfg->cast<appmodel::TriggerDataHandlerModule>();
101 0 : if (mtrg == nullptr) {
102 0 : throw(InvalidConfiguration(ERS_HERE, "Provided null TriggerDataHandlerModule configuration!"));
103 : }
104 0 : for (auto output : mtrg->get_outputs()) {
105 0 : try {
106 0 : if (output->get_data_type() == "TriggerDecision") {
107 0 : m_td_sink = get_iom_sender<dfmessages::TriggerDecision>(output->UID());
108 : }
109 0 : } catch (const ers::Issue& excpt) {
110 0 : ers::error(datahandlinglibs::ResourceQueueError(ERS_HERE, "td", "DefaultRequestHandlerModel", excpt));
111 0 : }
112 : }
113 :
114 0 : auto dp = mtrg->get_module_configuration()->get_data_processor();
115 0 : auto proc_conf = dp->cast<appmodel::TCDataProcessor>();
116 :
117 : // Add all Source IDs to mandatoy links for now...
118 0 : for(auto const& link : mtrg->get_mandatory_source_ids()){
119 0 : m_mandatory_links.push_back(
120 0 : dfmessages::SourceID{
121 0 : daqdataformats::SourceID::string_to_subsystem(link->get_subsystem()),
122 0 : link->get_sid()});
123 : }
124 0 : for(auto const& link : mtrg->get_enabled_source_ids()){
125 0 : m_mandatory_links.push_back(
126 0 : dfmessages::SourceID{
127 0 : daqdataformats::SourceID::string_to_subsystem(link->get_subsystem()),
128 0 : link->get_sid()});
129 : }
130 :
131 : // TODO: Group links!
132 : //m_group_links_data = conf->get_groups_links();
133 0 : parse_group_links(m_group_links_data);
134 0 : print_group_links();
135 0 : m_total_group_links = m_group_links.size();
136 0 : TLOG_DEBUG(3) << "Total group links: " << m_total_group_links;
137 :
138 0 : m_tc_merging = proc_conf->get_merge_overlapping_tcs();
139 0 : m_ignore_tc_pileup = proc_conf->get_ignore_overlapping_tcs();
140 0 : m_buffer_timeout = proc_conf->get_buffer_timeout();
141 0 : m_send_timed_out_tds = (m_ignore_tc_pileup) ? false : proc_conf->get_td_out_of_timeout();
142 0 : m_td_readout_limit = proc_conf->get_td_readout_limit();
143 0 : m_ignored_tc_types = proc_conf->get_ignore_tc();
144 0 : m_ignoring_tc_types = !m_ignored_tc_types.empty();
145 :
146 : // Trigger bitwords
147 0 : std::vector<const appmodel::TriggerBitword*> bitwords = proc_conf->get_trigger_bitwords();
148 0 : m_use_bitwords = !bitwords.empty();
149 0 : if(m_use_bitwords){
150 0 : set_trigger_bitwords(bitwords);
151 0 : print_trigger_bitwords();
152 : }
153 0 : TLOG_DEBUG(3) << "Use bitwords: " << m_use_bitwords;
154 0 : TLOG_DEBUG(3) << "Allow merging: " << m_tc_merging;
155 0 : TLOG_DEBUG(3) << "Ignore pileup: " << m_ignore_tc_pileup;
156 0 : TLOG_DEBUG(3) << "Buffer timeout: " << m_buffer_timeout;
157 0 : TLOG_DEBUG(3) << "Should send timed out TDs: " << m_send_timed_out_tds;
158 0 : TLOG_DEBUG(3) << "TD readout limit: " << m_td_readout_limit;
159 :
160 : // ROI map
161 0 : m_roi_conf_data = proc_conf->get_roi_group_conf();
162 0 : m_use_roi_readout = !m_roi_conf_data.empty();
163 0 : if (m_use_roi_readout) {
164 0 : parse_roi_conf(m_roi_conf_data);
165 0 : print_roi_conf(m_roi_conf);
166 : }
167 0 : TLOG_DEBUG(3) << "Use ROI readout?: " << m_use_roi_readout;
168 :
169 : // Custom readout map
170 0 : m_readout_window_map_data = proc_conf->get_tc_readout_map();
171 0 : m_use_readout_map = !m_readout_window_map_data.empty();
172 0 : if (m_use_readout_map) {
173 0 : parse_readout_map(m_readout_window_map_data);
174 0 : print_readout_map(m_readout_window_map);
175 : }
176 0 : TLOG_DEBUG(3) << "Use readout map: " << m_use_readout_map;
177 :
178 : // Ignoring TC types
179 0 : TLOG_DEBUG(3) << "Ignoring TC types: " << m_ignoring_tc_types;
180 0 : if(m_ignoring_tc_types){
181 0 : TLOG_DEBUG(3) << "TC types to ignore: ";
182 0 : for (std::vector<unsigned int>::iterator it = m_ignored_tc_types.begin(); it != m_ignored_tc_types.end();) {
183 0 : TLOG_DEBUG(3) << *it;
184 0 : ++it;
185 : }
186 : }
187 0 : m_latency_monitoring.store( dp->get_latency_monitoring() );
188 0 : inherited::add_postprocess_task(std::bind(&TCProcessor::make_td, this, std::placeholders::_1));
189 :
190 0 : inherited::conf(mtrg);
191 :
192 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << "TCProcessor: Exiting conf() method";
193 0 : }
194 :
195 : void
196 0 : TCProcessor::scrap(const appfwk::DAQModule::CommandData_t& args)
197 : {
198 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << "TCProcessor: Entering scrap() method";
199 :
200 0 : m_mandatory_links.clear();
201 0 : m_group_links.clear();
202 0 : m_roi_conf.clear();
203 0 : m_roi_conf_data.clear();
204 0 : m_roi_conf_ids.clear();
205 0 : m_roi_conf_probs.clear();
206 0 : m_roi_conf_probs_c.clear();
207 0 : m_pending_tds.clear();
208 0 : m_readout_window_map_data.clear();
209 0 : m_readout_window_map.clear();
210 0 : m_ignored_tc_types.clear();
211 :
212 0 : m_td_sink.reset();
213 :
214 0 : m_group_links_data.clear();
215 :
216 0 : inherited::scrap(args);
217 :
218 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << "TCProcessor: Exiting scrap() method";
219 0 : }
220 :
221 : void
222 0 : TCProcessor::generate_opmon_data()
223 : {
224 0 : opmon::TCProcessorInfo info;
225 :
226 0 : info.set_tds_created_count( m_tds_created_count.load() );
227 0 : info.set_tds_sent_count( m_tds_sent_count.load() );
228 0 : info.set_tds_dropped_count( m_tds_dropped_count.load() );
229 0 : info.set_tds_failed_bitword_count( m_tds_failed_bitword_count.load() );
230 0 : info.set_tds_cleared_count( m_tds_cleared_count.load() );
231 0 : info.set_tc_received_count( m_tc_received_count.load() );
232 0 : info.set_tc_ignored_count( m_tc_ignored_count.load() );
233 0 : info.set_tds_created_tc_count( m_tds_created_tc_count.load() );
234 0 : info.set_tds_sent_tc_count( m_tds_sent_tc_count.load() );
235 0 : info.set_tds_dropped_tc_count( m_tds_dropped_tc_count.load() );
236 0 : info.set_tds_failed_bitword_tc_count( m_tds_failed_bitword_tc_count.load() );
237 0 : info.set_tds_cleared_tc_count( m_tds_cleared_tc_count.load() );
238 :
239 0 : this->publish(std::move(info));
240 :
241 0 : if ( m_latency_monitoring.load() && m_running_flag.load() ) {
242 0 : opmon::TriggerLatency lat_info;
243 :
244 0 : lat_info.set_latency_in( m_latency_instance.get_latency_in() );
245 0 : lat_info.set_latency_out( m_latency_instance.get_latency_out() );
246 :
247 0 : this->publish(std::move(lat_info));
248 0 : }
249 0 : }
250 :
251 : /**
252 : * Pipeline Stage 2.: put valid TCs in a vector for grouping and forming of TDs
253 : * */
254 : void
255 0 : TCProcessor::make_td(const TCWrapper* tcw)
256 : {
257 :
258 0 : auto tc = tcw->candidate;
259 0 : if (m_latency_monitoring.load()) m_latency_instance.update_latency_in( tc.time_start );
260 0 : m_tc_received_count++;
261 :
262 0 : if ( (m_use_readout_map) && (m_readout_window_map.count(tc.type)) ) {
263 0 : TLOG_DEBUG(3) << "Got TC of type " << static_cast<int>(tc.type) << ", timestamp " << tc.time_candidate
264 0 : << ", start/end " << tc.time_start << "/" << tc.time_end << ", readout start/end "
265 0 : << tc.time_candidate - m_readout_window_map[tc.type].first << "/"
266 0 : << tc.time_candidate + m_readout_window_map[tc.type].second;
267 : } else {
268 0 : TLOG_DEBUG(3) << "Got TC of type " << static_cast<int>(tc.type) << ", timestamp " << tc.time_candidate
269 0 : << ", start/end " << tc.time_start << "/" << tc.time_end;
270 : }
271 :
272 : // Option to ignore TC types (if given by config)
273 0 : if (m_ignoring_tc_types == true && check_trigger_type_ignore(static_cast<unsigned int>(tc.type)) == true) {
274 0 : TLOG_DEBUG(3) << " Ignore TC type: " << static_cast<unsigned int>(tc.type);
275 0 : m_tc_ignored_count++;
276 :
277 : /*FIXME: comment out this block: if a TC is to be ignored it shall just be ignored!
278 : if (m_tc_merging) {
279 : // Still need to check for overlap with existing TD, if overlaps, include in the TD, but don't extend
280 : // readout
281 : std::lock_guard<std::mutex> lock(m_td_vector_mutex);
282 : add_tc_ignored(*tc);
283 : }
284 : */
285 : }
286 : else {
287 0 : std::lock_guard<std::mutex> lock(m_td_vector_mutex);
288 0 : add_tc(tc);
289 0 : m_cv.notify_one();
290 0 : TLOG_DEBUG(10) << "pending tds size: " << m_pending_tds.size();
291 0 : }
292 0 : m_last_processed_daq_ts = tc.time_start;
293 0 : return;
294 0 : }
295 :
296 : dfmessages::TriggerDecision
297 0 : TCProcessor::create_decision(const PendingTD& pending_td)
298 : {
299 0 : m_earliest_tc_index = get_earliest_tc_index(pending_td);
300 0 : TLOG_DEBUG(5) << "earliest TC index: " << m_earliest_tc_index;
301 :
302 0 : if (pending_td.contributing_tcs.size() > 1) {
303 0 : TLOG_DEBUG(5) << "!!! TD created from " << pending_td.contributing_tcs.size() << " TCs !!!";
304 : }
305 :
306 0 : dfmessages::TriggerDecision decision;
307 0 : decision.trigger_number = 0; // filled by MLT
308 0 : decision.run_number = 0; // filled by MLT
309 0 : decision.trigger_timestamp = pending_td.contributing_tcs[m_earliest_tc_index].time_candidate;
310 0 : decision.readout_type = dfmessages::ReadoutType::kLocalized;
311 :
312 0 : TDBitset td_bitword = get_TD_bitword(pending_td);
313 0 : TLOG_DEBUG(5) << "[MLT] TD has bitword: " << td_bitword << " "
314 0 : << static_cast<dfmessages::trigger_type_t>(td_bitword.to_ulong());
315 0 : decision.trigger_type = static_cast<dfmessages::trigger_type_t>(td_bitword.to_ulong()); // m_trigger_type;
316 :
317 : //decision.trigger_type = 1; // m_trigger_type;
318 :
319 0 : TLOG_DEBUG(3) << ", TC detid: " << pending_td.contributing_tcs[m_earliest_tc_index].detid
320 0 : << ", TC type: " << static_cast<int>(pending_td.contributing_tcs[m_earliest_tc_index].type)
321 0 : << ", TC cont number: " << pending_td.contributing_tcs.size()
322 0 : << ", DECISION trigger type: " << decision.trigger_type
323 0 : << ", DECISION timestamp: " << decision.trigger_timestamp
324 0 : << ", request window begin: " << pending_td.readout_start
325 0 : << ", request window end: " << pending_td.readout_end;
326 :
327 0 : std::vector<dfmessages::ComponentRequest> requests =
328 0 : create_all_decision_requests(m_mandatory_links, pending_td.readout_start, pending_td.readout_end);
329 0 : add_requests_to_decision(decision, requests);
330 :
331 0 : if (!m_use_roi_readout) {
332 0 : for (const auto& [key, value] : m_group_links) {
333 0 : std::vector<dfmessages::ComponentRequest> group_requests =
334 0 : create_all_decision_requests(value, pending_td.readout_start, pending_td.readout_end);
335 0 : add_requests_to_decision(decision, group_requests);
336 0 : }
337 : } else { // using ROI readout
338 0 : roi_readout_make_requests(decision);
339 : }
340 :
341 0 : m_tds_created_count++;
342 0 : m_tds_created_tc_count += pending_td.contributing_tcs.size();
343 :
344 0 : return decision;
345 0 : }
346 :
347 :
348 : void
349 0 : TCProcessor::send_trigger_decisions() {
350 : // A unique lock that can be locked and unlocked
351 0 : std::unique_lock<std::mutex> lock(m_td_vector_mutex);
352 :
353 0 : while (m_running_flag) {
354 : // TODO: think about better implementation (notify?, something event driven)
355 0 : m_cv.wait_for(lock, std::chrono::microseconds(100));
356 0 : auto ready_tds = get_ready_tds(m_pending_tds);
357 0 : TLOG_DEBUG(10) << "ready tds: " << ready_tds.size() << ", updated pending tds: " << m_pending_tds.size();
358 :
359 0 : for (std::vector<PendingTD>::iterator it = ready_tds.begin(); it != ready_tds.end();) {
360 0 : call_tc_decision(*it);
361 0 : ++it;
362 : }
363 0 : }
364 0 : }
365 :
366 : void
367 0 : TCProcessor::call_tc_decision(const TCProcessor::PendingTD& pending_td)
368 : {
369 :
370 0 : if (m_use_bitwords) {
371 : // Check trigger bitwords
372 0 : TDBitset td_bitword = get_TD_bitword(pending_td);
373 0 : if (!check_trigger_bitwords(td_bitword)) {
374 : // Don't process further if the bitword check failed
375 0 : m_tds_failed_bitword_count++;
376 0 : m_tds_failed_bitword_tc_count += pending_td.contributing_tcs.size();
377 0 : return;
378 : }
379 : }
380 :
381 0 : dfmessages::TriggerDecision decision = create_decision(pending_td);
382 0 : auto tn = decision.trigger_number;
383 0 : auto td_ts = decision.trigger_timestamp;
384 :
385 0 : if (m_latency_monitoring.load()) m_latency_instance.update_latency_out( pending_td.contributing_tcs.front().time_start );
386 0 : if(!m_td_sink->try_send(std::move(decision), iomanager::Sender::s_no_block)) {
387 0 : ers::warning(TDDropped(ERS_HERE, tn, td_ts));
388 0 : m_tds_dropped_count++;
389 0 : m_tds_dropped_tc_count += pending_td.contributing_tcs.size();
390 : }
391 : else {
392 0 : m_tds_sent_count++;
393 0 : m_tds_sent_tc_count += pending_td.contributing_tcs.size();
394 : }
395 0 : }
396 :
397 :
398 : void
399 0 : TCProcessor::add_tc(const triggeralgs::TriggerCandidate tc)
400 : {
401 0 : bool tc_dealt = false;
402 0 : int64_t tc_wallclock_arrived =
403 0 : std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
404 :
405 0 : if (m_tc_merging || m_ignore_tc_pileup) {
406 :
407 0 : for (std::vector<PendingTD>::iterator it = m_pending_tds.begin(); it != m_pending_tds.end();) {
408 : // Don't deal with TC here if there's no overlap
409 0 : if (!check_overlap(tc, *it)) {
410 0 : it++;
411 0 : continue;
412 : }
413 :
414 : // If overlap and ignoring, we drop the TC and flag it as dealt with.
415 0 : if (m_ignore_tc_pileup) {
416 0 : m_tds_dropped_tc_count++;
417 0 : tc_dealt = true;
418 0 : TLOG_DEBUG(3) << "TC overlapping with a previous TD, dropping!";
419 0 : break;
420 : }
421 :
422 : // If we're here, TC merging must be on, in which case we're actually
423 : // going to merge the TC into the TD.
424 0 : it->contributing_tcs.push_back(tc);
425 0 : if ( (m_use_readout_map) && (m_readout_window_map.count(tc.type)) ){
426 0 : TLOG_DEBUG(3) << "TC with start/end times " << tc.time_candidate - m_readout_window_map[tc.type].first << "/"
427 0 : << tc.time_candidate + m_readout_window_map[tc.type].second
428 0 : << " overlaps with pending TD with start/end times " << it->readout_start << "/"
429 0 : << it->readout_end;
430 0 : it->readout_start = ((tc.time_candidate - m_readout_window_map[tc.type].first) >= it->readout_start)
431 0 : ? it->readout_start
432 0 : : (tc.time_candidate - m_readout_window_map[tc.type].first);
433 0 : it->readout_end = ((tc.time_candidate + m_readout_window_map[tc.type].second) >= it->readout_end)
434 0 : ? (tc.time_candidate + m_readout_window_map[tc.type].second)
435 : : it->readout_end;
436 : } else {
437 0 : TLOG_DEBUG(3) << "TC with start/end times " << tc.time_start << "/" << tc.time_end
438 0 : << " overlaps with pending TD with start/end times " << it->readout_start << "/"
439 0 : << it->readout_end;
440 0 : it->readout_start = (tc.time_start >= it->readout_start) ? it->readout_start : tc.time_start;
441 0 : it->readout_end = (tc.time_end >= it->readout_end) ? tc.time_end : it->readout_end;
442 : }
443 0 : it->walltime_expiration = tc_wallclock_arrived + m_buffer_timeout;
444 0 : tc_dealt = true;
445 0 : break;
446 : }
447 : }
448 :
449 : // Don't do anything else if we've already dealt with the TC
450 0 : if (tc_dealt) {
451 0 : return;
452 : }
453 :
454 : // Create a new TD out of the TC
455 0 : PendingTD td_candidate;
456 0 : td_candidate.contributing_tcs.push_back(tc);
457 0 : if ( (m_use_readout_map) && (m_readout_window_map.count(tc.type)) ){
458 0 : td_candidate.readout_start = tc.time_candidate - m_readout_window_map[tc.type].first;
459 0 : td_candidate.readout_end = tc.time_candidate + m_readout_window_map[tc.type].second;
460 : } else {
461 0 : td_candidate.readout_start = tc.time_start;
462 0 : td_candidate.readout_end = tc.time_end;
463 : }
464 0 : td_candidate.walltime_expiration = tc_wallclock_arrived + m_buffer_timeout;
465 0 : m_pending_tds.push_back(td_candidate);
466 0 : }
467 :
468 : void
469 0 : TCProcessor::add_tc_ignored(const triggeralgs::TriggerCandidate tc)
470 : {
471 0 : for (std::vector<PendingTD>::iterator it = m_pending_tds.begin(); it != m_pending_tds.end();) {
472 0 : if (check_overlap(tc, *it)) {
473 0 : if ( (m_use_readout_map) && (m_readout_window_map.count(tc.type)) ) {
474 0 : TLOG_DEBUG(3) << "!Ignored! TC with start/end times " << tc.time_candidate - m_readout_window_map[tc.type].first
475 0 : << "/" << tc.time_candidate + m_readout_window_map[tc.type].second
476 0 : << " overlaps with pending TD with start/end times " << it->readout_start << "/"
477 0 : << it->readout_end;
478 : } else {
479 0 : TLOG_DEBUG(3) << "!Ignored! TC with start/end times " << tc.time_start << "/" << tc.time_end
480 0 : << " overlaps with pending TD with start/end times " << it->readout_start << "/"
481 0 : << it->readout_end;
482 : }
483 0 : it->contributing_tcs.push_back(tc);
484 : break;
485 : }
486 0 : ++it;
487 : }
488 0 : return;
489 : }
490 :
491 : bool
492 0 : TCProcessor::check_overlap(const triggeralgs::TriggerCandidate& tc, const PendingTD& pending_td)
493 : {
494 0 : if ( (m_use_readout_map) && (m_readout_window_map.count(tc.type)) ) {
495 0 : return !(((tc.time_candidate + m_readout_window_map[tc.type].second) < pending_td.readout_start) ||
496 0 : ((tc.time_candidate - m_readout_window_map[tc.type].first > pending_td.readout_end)));
497 : } else {
498 0 : return !((tc.time_end < pending_td.readout_start) || (tc.time_start > pending_td.readout_end));
499 : }
500 : }
501 :
502 : std::vector<TCProcessor::PendingTD>
503 0 : TCProcessor::get_ready_tds(std::vector<PendingTD>& pending_tds)
504 : {
505 0 : std::vector<PendingTD> return_tds;
506 0 : for (std::vector<PendingTD>::iterator it = pending_tds.begin(); it != pending_tds.end();) {
507 0 : auto timestamp_now =
508 0 : std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch())
509 0 : .count();
510 0 : if (timestamp_now >= it->walltime_expiration) {
511 0 : return_tds.push_back(*it);
512 0 : it = pending_tds.erase(it);
513 0 : } else if (check_td_readout_length(*it)) { // Also pass on TDs with (too) long readout window
514 0 : return_tds.push_back(*it);
515 0 : it = pending_tds.erase(it);
516 : } else {
517 0 : ++it;
518 : }
519 : }
520 0 : return return_tds;
521 0 : }
522 :
523 : int
524 0 : TCProcessor::get_earliest_tc_index(const PendingTD& pending_td)
525 : {
526 0 : int earliest_tc_index = -1;
527 0 : triggeralgs::timestamp_t earliest_tc_time;
528 0 : for (int i = 0; i < static_cast<int>(pending_td.contributing_tcs.size()); i++) {
529 0 : if (earliest_tc_index == -1) {
530 0 : earliest_tc_time = pending_td.contributing_tcs[i].time_candidate;
531 0 : earliest_tc_index = i;
532 : } else {
533 0 : if (pending_td.contributing_tcs[i].time_candidate < earliest_tc_time) {
534 0 : earliest_tc_time = pending_td.contributing_tcs[i].time_candidate;
535 0 : earliest_tc_index = i;
536 : }
537 : }
538 : }
539 0 : return earliest_tc_index;
540 : }
541 :
542 0 : bool TCProcessor::check_td_readout_length(const PendingTD& pending_td)
543 : {
544 0 : bool td_too_long = false;
545 0 : if (static_cast<int64_t>(pending_td.readout_end - pending_td.readout_start) >= m_td_readout_limit) {
546 0 : td_too_long = true;
547 0 : TLOG_DEBUG(3) << "Too long readout window: " << (pending_td.readout_end - pending_td.readout_start)
548 0 : << ", sending immediate TD!";
549 : }
550 0 : return td_too_long;
551 : }
552 :
553 : void
554 0 : TCProcessor::clear_td_vectors()
555 : {
556 0 : std::lock_guard<std::mutex> lock(m_td_vector_mutex);
557 0 : m_tds_cleared_count += m_pending_tds.size();
558 : // Use std::accumulate to sum up the sizes of all contributing_tcs vectors
559 0 : size_t tds_cleared_tc_count = std::accumulate(
560 : m_pending_tds.begin(), m_pending_tds.end(), 0,
561 0 : [](size_t sum, const PendingTD& ptd) {
562 0 : return sum + ptd.contributing_tcs.size();
563 : }
564 0 : );
565 0 : m_tds_cleared_tc_count += tds_cleared_tc_count;
566 0 : m_pending_tds.clear();
567 0 : }
568 : bool
569 0 : TCProcessor::check_trigger_type_ignore(unsigned int tc_type)
570 : {
571 0 : bool ignore = false;
572 0 : for (std::vector<unsigned int>::iterator it = m_ignored_tc_types.begin(); it != m_ignored_tc_types.end();) {
573 0 : if (tc_type == *it) {
574 : ignore = true;
575 : break;
576 : }
577 0 : ++it;
578 : }
579 0 : return ignore;
580 : }
581 :
582 : void
583 0 : TCProcessor::print_trigger_bitwords()
584 : {
585 0 : TLOG_DEBUG(3) << "Configured trigger words:";
586 0 : for (const auto& bitword : m_trigger_bitwords) {
587 0 : TLOG_DEBUG(3) << bitword;
588 : }
589 0 : }
590 :
591 : bool
592 0 : TCProcessor::check_trigger_bitwords(const TDBitset& td_bitword) const
593 : {
594 0 : bool trigger_check = false;
595 0 : for (const auto& bitword : m_trigger_bitwords) {
596 0 : TLOG_DEBUG(15) << "TD word: " << td_bitword << ", bitword: " << bitword;
597 0 : trigger_check = ((td_bitword & bitword) == bitword);
598 0 : TLOG_DEBUG(15) << "&: " << (td_bitword & bitword);
599 0 : TLOG_DEBUG(15) << "trigger?: " << trigger_check;
600 0 : if (trigger_check == true)
601 : break;
602 : }
603 0 : return trigger_check;
604 : }
605 :
606 : void
607 0 : TCProcessor::set_trigger_bitwords(const std::vector<const appmodel::TriggerBitword*>& _bitwords)
608 : {
609 0 : for (const appmodel::TriggerBitword* bitword : _bitwords) {
610 0 : TDBitset temp_bitword;
611 :
612 0 : for (const std::string& tctype_str: bitword->get_bitword()) {
613 0 : TCType tc_type = static_cast<TCType>(dunedaq::trgdataformats::string_to_trigger_candidate_type(tctype_str));
614 :
615 0 : if (tc_type == TCType::kUnknown) {
616 0 : throw(InvalidConfiguration(ERS_HERE, "Provided an unknown/non-existent TC type as a trigger bitword!"));
617 : }
618 :
619 0 : temp_bitword.set(static_cast<uint64_t>(tc_type));
620 : }
621 :
622 0 : m_trigger_bitwords.push_back(temp_bitword);
623 : }
624 0 : }
625 :
626 : void
627 0 : TCProcessor::parse_readout_map(const std::vector<const appmodel::TCReadoutMap*>& data)
628 : {
629 0 : for (auto readout_type : data) {
630 0 : TCType tc_type = static_cast<TCType>(
631 0 : dunedaq::trgdataformats::string_to_trigger_candidate_type(readout_type->get_tc_type_name()));
632 :
633 : // Throw error if unknown TC type
634 0 : if (tc_type == TCType::kUnknown) {
635 0 : throw(InvalidConfiguration(ERS_HERE, "Provided an unknown TC type in the TCReadoutMap for the TCProcessor"));
636 : }
637 :
638 0 : m_readout_window_map[tc_type] = {
639 0 : readout_type->get_time_before(), readout_type->get_time_after()
640 0 : };
641 : }
642 0 : return;
643 : }
644 : void
645 0 : TCProcessor::print_readout_map(std::map<TCType,
646 : std::pair<triggeralgs::timestamp_t, triggeralgs::timestamp_t>> map)
647 : {
648 0 : TLOG_DEBUG(3) << "MLT TD Readout map:";
649 0 : for (auto const& [key, val] : map) {
650 0 : TLOG_DEBUG(3) << "Type: " << static_cast<int>(key) << ", before: " << val.first << ", after: " << val.second;
651 : }
652 0 : return;
653 : }
654 :
655 : void
656 0 : TCProcessor::parse_group_links(const nlohmann::json& data)
657 : {
658 0 : for (auto group : data) {
659 0 : const nlohmann::json& temp_links_data = group["links"];
660 0 : std::vector<dfmessages::SourceID> temp_links;
661 0 : for (auto link : temp_links_data) {
662 0 : temp_links.push_back(
663 0 : dfmessages::SourceID{ daqdataformats::SourceID::string_to_subsystem(link["subsystem"]), link["element"] });
664 0 : }
665 0 : m_group_links.insert({ group["group"], temp_links });
666 0 : }
667 0 : return;
668 : }
669 :
670 : void
671 0 : TCProcessor::print_group_links()
672 : {
673 0 : TLOG_DEBUG(3) << "MLT Group Links:";
674 0 : for (auto const& [key, val] : m_group_links) {
675 0 : TLOG_DEBUG(3) << "Group: " << key;
676 0 : for (auto const& link : val) {
677 0 : TLOG_DEBUG(3) << link;
678 : }
679 : }
680 0 : TLOG_DEBUG(3) << " ";
681 0 : return;
682 : }
683 : dfmessages::ComponentRequest
684 0 : TCProcessor::create_request_for_link(dfmessages::SourceID link,
685 : triggeralgs::timestamp_t start,
686 : triggeralgs::timestamp_t end)
687 : {
688 0 : dfmessages::ComponentRequest request;
689 0 : request.component = link;
690 0 : request.window_begin = start;
691 0 : request.window_end = end;
692 :
693 0 : TLOG_DEBUG(10) << "setting request start: " << request.window_begin;
694 0 : TLOG_DEBUG(10) << "setting request end: " << request.window_end;
695 :
696 0 : return request;
697 : }
698 :
699 : std::vector<dfmessages::ComponentRequest>
700 0 : TCProcessor::create_all_decision_requests(std::vector<dfmessages::SourceID> links,
701 : triggeralgs::timestamp_t start,
702 : triggeralgs::timestamp_t end)
703 : {
704 0 : std::vector<dfmessages::ComponentRequest> requests;
705 0 : for (auto link : links) {
706 0 : requests.push_back(create_request_for_link(link, start, end));
707 : }
708 0 : return requests;
709 0 : }
710 :
711 : void
712 0 : TCProcessor::add_requests_to_decision(dfmessages::TriggerDecision& decision,
713 : std::vector<dfmessages::ComponentRequest> requests)
714 : {
715 0 : for (auto request : requests) {
716 0 : decision.components.push_back(request);
717 : }
718 0 : }
719 :
720 : void
721 0 : TCProcessor::parse_roi_conf(const std::vector<const appmodel::ROIGroupConf*>& data)
722 : {
723 0 : int counter = 0;
724 0 : float run_sum = 0;
725 0 : for (auto group : data) {
726 0 : roi_group temp_roi_group;
727 0 : temp_roi_group.n_links = group->get_number_of_link_groups();
728 0 : temp_roi_group.prob = group->get_probability();
729 0 : temp_roi_group.time_window = group->get_time_window();
730 0 : temp_roi_group.mode = group->get_groups_selection_mode();
731 0 : m_roi_conf.insert({ counter, temp_roi_group });
732 0 : m_roi_conf_ids.push_back(counter);
733 0 : m_roi_conf_probs.push_back(group->get_probability());
734 0 : run_sum += static_cast<float>(group->get_probability());
735 0 : m_roi_conf_probs_c.push_back(run_sum);
736 0 : counter++;
737 0 : }
738 0 : return;
739 : }
740 :
741 : void
742 0 : TCProcessor::print_roi_conf(std::map<int, roi_group> roi_conf)
743 : {
744 0 : TLOG_DEBUG(3) << "ROI CONF";
745 0 : for (const auto& [key, value] : roi_conf) {
746 0 : TLOG_DEBUG(3) << "ID: " << key;
747 0 : TLOG_DEBUG(3) << "n links: " << value.n_links;
748 0 : TLOG_DEBUG(3) << "prob: " << value.prob;
749 0 : TLOG_DEBUG(3) << "time: " << value.time_window;
750 0 : TLOG_DEBUG(3) << "mode: " << value.mode;
751 : }
752 0 : TLOG_DEBUG(3) << " ";
753 0 : return;
754 : }
755 :
756 : float
757 0 : TCProcessor::get_random_num_float(float limit)
758 : {
759 0 : float rnd = (double)rand() / RAND_MAX;
760 0 : return rnd * (limit);
761 : }
762 :
763 : int
764 0 : TCProcessor::pick_roi_group_conf()
765 : {
766 0 : float rnd_num = get_random_num_float(m_roi_conf_probs_c.back());
767 0 : for (int i = 0; i < static_cast<int>(m_roi_conf_probs_c.size()); i++) {
768 0 : if (rnd_num < m_roi_conf_probs_c[i]) {
769 0 : return i;
770 : }
771 : }
772 : return -1;
773 : }
774 :
775 : int
776 0 : TCProcessor::get_random_num_int()
777 : {
778 0 : int range = m_total_group_links;
779 0 : int rnd = rand() % range;
780 0 : return rnd;
781 : }
782 : void
783 0 : TCProcessor::roi_readout_make_requests(dfmessages::TriggerDecision& decision)
784 : {
785 : // Get configuration at random (weighted)
786 0 : int group_pick = pick_roi_group_conf();
787 0 : if (group_pick != -1) {
788 0 : roi_group this_group = m_roi_conf[m_roi_conf_ids[group_pick]];
789 0 : std::vector<dfmessages::SourceID> links;
790 :
791 : // If mode is random, pick groups to request at random
792 0 : if (this_group.mode == "kRandom") {
793 0 : TLOG_DEBUG(10) << "RAND";
794 0 : std::set<int> groups;
795 0 : while (static_cast<int>(groups.size()) < this_group.n_links) {
796 0 : groups.insert(get_random_num_int());
797 : }
798 0 : for (auto r_id : groups) {
799 0 : links.insert(links.end(), m_group_links[r_id].begin(), m_group_links[r_id].end());
800 : }
801 : // Otherwise, read sequntially by IDs, starting at 0
802 0 : } else {
803 0 : TLOG_DEBUG(10) << "SEQ";
804 0 : int r_id = 0;
805 0 : while (r_id < this_group.n_links) {
806 0 : links.insert(links.end(), m_group_links[r_id].begin(), m_group_links[r_id].end());
807 0 : r_id++;
808 : }
809 : }
810 :
811 0 : TLOG_DEBUG(10) << "TD timestamp: " << decision.trigger_timestamp;
812 0 : TLOG_DEBUG(10) << "group window: " << this_group.time_window;
813 :
814 : // Once the components are prepared, create requests and append them to decision
815 0 : std::vector<dfmessages::ComponentRequest> requests =
816 : create_all_decision_requests(links, decision.trigger_timestamp - this_group.time_window,
817 0 : decision.trigger_timestamp + this_group.time_window);
818 0 : add_requests_to_decision(decision, requests);
819 0 : links.clear();
820 0 : }
821 0 : return;
822 : }
823 :
824 : TCProcessor::TDBitset
825 0 : TCProcessor::get_TD_bitword(const PendingTD& ready_td) const
826 : {
827 : // get only unique types
828 0 : std::vector<int> tc_types;
829 0 : for (auto tc : ready_td.contributing_tcs) {
830 0 : tc_types.push_back(static_cast<int>(tc.type));
831 0 : }
832 0 : tc_types.erase(std::unique(tc_types.begin(), tc_types.end()), tc_types.end());
833 :
834 : // form TD bitword
835 0 : TDBitset td_bitword;
836 0 : for (auto tc_type : tc_types) {
837 0 : td_bitword.set(tc_type);
838 : }
839 0 : return td_bitword;
840 0 : }
841 :
842 : void
843 0 : TCProcessor::print_opmon_stats()
844 : {
845 0 : TLOG() << "TCProcessor opmon counters summary:";
846 0 : TLOG() << "------------------------------";
847 0 : TLOG() << "TDs created: \t\t\t" << m_tds_created_count << " \t(" << m_tds_created_tc_count << " TCs)";
848 0 : TLOG() << "TDs sent: \t\t\t" << m_tds_sent_count << " \t(" << m_tds_sent_tc_count << " TCs)";
849 0 : TLOG() << "TDs dropped: \t\t\t" << m_tds_dropped_count << " \t(" << m_tds_dropped_tc_count << " TCs)";
850 0 : TLOG() << "TDs failed bitword check: \t" << m_tds_failed_bitword_count << " \t(" << m_tds_failed_bitword_tc_count << " TCs)";
851 0 : TLOG() << "TDs cleared: \t\t\t" << m_tds_cleared_count << " \t(" << m_tds_cleared_tc_count << " TCs)";
852 0 : TLOG() << "------------------------------";
853 0 : TLOG() << "TCs received: \t" << m_tc_received_count;
854 0 : TLOG() << "TCs ignored: \t" << m_tc_ignored_count;
855 0 : TLOG();
856 0 : }
857 :
858 : } // namespace fdreadoutlibs
859 : } // namespace dunedaq
|