Line data Source code
1 : /**
2 : * @file TRBModule.cpp TRBModule class implementation
3 : *
4 : * This is part of the DUNE DAQ Software Suite, copyright 2020.
5 : * Licensing/copyright details are in the COPYING file that you should have
6 : * received with this code.
7 : */
8 :
9 : #include "TRBModule.hpp"
10 : #include "dfmodules/CommonIssues.hpp"
11 :
12 : #include "appmodel/NetworkConnectionDescriptor.hpp"
13 : #include "appmodel/NetworkConnectionRule.hpp"
14 : #include "appmodel/ReadoutApplication.hpp"
15 : #include "appmodel/SourceIDConf.hpp"
16 : #include "appmodel/SourceIDToNetworkConnection.hpp"
17 : #include "appmodel/TRBModule.hpp"
18 : #include "appmodel/TriggerApplication.hpp"
19 : #include "confmodel/Application.hpp"
20 : #include "confmodel/Connection.hpp"
21 : #include "confmodel/DetectorStream.hpp"
22 : #include "confmodel/DetectorToDaqConnection.hpp"
23 : #include "confmodel/NetworkConnection.hpp"
24 : #include "confmodel/Session.hpp"
25 : #include "dfmessages/TriggerRecord_serialization.hpp"
26 : #include "logging/Logging.hpp"
27 :
28 : #include "iomanager/IOManager.hpp"
29 :
30 : #include <algorithm>
31 : #include <chrono>
32 : #include <cmath>
33 : #include <cstdlib>
34 : #include <limits>
35 : #include <memory>
36 : #include <numeric>
37 : #include <string>
38 : #include <thread>
39 : #include <utility>
40 : #include <vector>
41 :
42 : /**
43 : * @brief TRACE debug levels used in this source file
44 : */
45 : enum
46 : {
47 : TLVL_ENTER_EXIT_METHODS = 5,
48 : TLVL_INIT = 8,
49 : TLVL_WORK_STEPS = 10,
50 : TLVL_BOOKKEEPING = 15,
51 : TLVL_DISPATCH_DATAREQ = 21,
52 : TLVL_FRAGMENT_RECEIVE = 22
53 : };
54 :
55 : namespace dunedaq {
56 : namespace dfmodules {
57 :
58 : using daqdataformats::TriggerRecordErrorBits;
59 :
60 0 : TRBModule::TRBModule(const std::string& name)
61 : : dunedaq::appfwk::DAQModule(name)
62 0 : , m_stop_requested(true)
63 0 : , m_tr_queue_timeout(100)
64 0 : , m_dreq_queue_timeout(100)
65 : {
66 :
67 0 : register_command("conf", &TRBModule::do_conf);
68 0 : register_command("scrap", &TRBModule::do_scrap);
69 0 : register_command("start", &TRBModule::do_start);
70 0 : register_command("stop", &TRBModule::do_stop);
71 0 : }
72 :
73 : void
74 0 : TRBModule::init(std::shared_ptr<appfwk::ConfigurationManager> mcfg)
75 : {
76 :
77 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering init() method";
78 :
79 : //--------------------------------
80 : // Get single queues
81 : //---------------------------------
82 :
83 0 : auto mdal = mcfg->get_dal<appmodel::TRBModule>(get_name());
84 0 : if (!mdal) {
85 0 : throw appfwk::CommandFailed(ERS_HERE, "init", get_name(), "Unable to retrieve configuration object");
86 : }
87 :
88 0 : auto iom = iomanager::IOManager::get();
89 0 : for (auto con : mdal->get_inputs()) {
90 0 : if (con->get_data_type() == datatype_to_string<dfmessages::TriggerDecision>()) {
91 0 : m_trigger_decision_input = iom->get_receiver<dfmessages::TriggerDecision>(con->UID());
92 : }
93 0 : if (con->get_data_type() == datatype_to_string<std::unique_ptr<daqdataformats::Fragment>>()) {
94 0 : m_fragment_input = iom->get_receiver<std::unique_ptr<daqdataformats::Fragment>>(con->UID());
95 :
96 : // save the data fragment receiver global connection name for later, when it gets
97 : // copied into the DataRequests so that data producers know where to send their fragments
98 0 : m_reply_connection = con->UID();
99 : }
100 0 : if (con->get_data_type() == datatype_to_string<dfmessages::TRMonRequest>()) {
101 0 : m_mon_receiver = iom->get_receiver<dfmessages::TRMonRequest>(con->UID());
102 : }
103 : }
104 :
105 0 : if (m_trigger_decision_input == nullptr) {
106 0 : throw InvalidQueueFatalError(ERS_HERE, get_name(), "TriggerDecision Input queue");
107 : }
108 0 : if (m_fragment_input == nullptr) {
109 0 : throw InvalidQueueFatalError(ERS_HERE, get_name(), "Fragment Input queue");
110 : }
111 :
112 0 : m_trigger_record_output = iom->get_sender<std::unique_ptr<daqdataformats::TriggerRecord>>(mdal->get_trigger_record_output()->UID());
113 :
114 0 : for (auto con : mdal->get_request_connections()) {
115 0 : for (auto source_id : con->get_source_ids()) {
116 :
117 : // find the queue for sourceid_req in the map
118 0 : std::unique_lock<std::mutex> lk(m_map_sourceid_connections_mutex);
119 0 : daqdataformats::SourceID sid;
120 0 : sid.subsystem = daqdataformats::SourceID::string_to_subsystem(source_id->get_subsystem());
121 0 : sid.id = source_id->get_sid();
122 0 : auto it_req = m_map_sourceid_connections.find(sid);
123 0 : if (it_req == m_map_sourceid_connections.end() || it_req->second == nullptr) {
124 0 : m_map_sourceid_connections[sid] = get_iom_sender<dfmessages::DataRequest>(con->get_netconn()->UID());
125 : }
126 0 : lk.unlock();
127 0 : }
128 : }
129 :
130 0 : m_trb_conf = mdal->get_configuration();
131 :
132 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting init() method";
133 0 : }
134 :
135 : void
136 0 : TRBModule::generate_opmon_data()
137 : {
138 :
139 0 : opmon::TRBInfo i;
140 :
141 : // status metrics
142 0 : i.set_pending_trigger_decisions(m_trigger_decisions_counter.load());
143 0 : i.set_fragments_in_the_book(m_fragment_counter.load());
144 0 : i.set_pending_fragments(m_pending_fragment_counter.load());
145 :
146 : // operation metrics
147 0 : i.set_received_trigger_decisions(m_received_trigger_decisions.exchange(0));
148 0 : i.set_generated_trigger_records(m_generated_trigger_records.exchange(0));
149 0 : i.set_generated_data_requests(m_generated_data_requests.exchange(0));
150 0 : i.set_received_fragments(m_received_fragments.exchange(0));
151 0 : i.set_data_waiting_time(m_data_waiting_time.exchange(0));
152 0 : i.set_data_request_width(m_data_request_width.exchange(0));
153 0 : i.set_trigger_decision_width(m_trigger_decision_width.exchange(0));
154 0 : i.set_received_trmon_requests(m_trmon_request_counter.exchange(0));
155 0 : i.set_sent_trmon(m_trmon_sent_counter.exchange(0));
156 :
157 0 : i.set_td_processing_us(m_td_processing_us.exchange(0));
158 0 : i.set_fragment_processing_us(m_fragment_processing_us.exchange(0));
159 :
160 0 : publish(std::move(i));
161 :
162 0 : opmon::TRBErrors err;
163 : // error counters
164 0 : err.set_timed_out_trigger_records(m_timed_out_trigger_records.load());
165 0 : err.set_abandoned_trigger_records(m_abandoned_trigger_records.load());
166 0 : err.set_unexpected_fragments(m_unexpected_fragments.load());
167 0 : err.set_unexpected_trigger_decisions(m_unexpected_trigger_decisions.load());
168 0 : err.set_lost_fragments(m_lost_fragments.load());
169 0 : err.set_invalid_requests(m_invalid_requests.load());
170 0 : err.set_duplicated_trigger_ids(m_duplicated_trigger_ids.load());
171 :
172 0 : publish(std::move(err));
173 0 : }
174 :
175 : void
176 0 : TRBModule::do_conf(const CommandData_t&)
177 : {
178 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering do_conf() method";
179 :
180 0 : m_trigger_timeout = std::chrono::milliseconds(m_trb_conf->get_trigger_record_timeout_ms());
181 :
182 0 : m_tr_queue_timeout = std::chrono::milliseconds(m_trb_conf->get_tr_queue_timeout());
183 0 : m_dreq_queue_timeout = std::chrono::milliseconds(m_trb_conf->get_request_queue_timeout());
184 :
185 0 : TLOG() << get_name() << ": timeouts (ms): TR = " << m_tr_queue_timeout.count()
186 0 : << ", DReq = " << m_dreq_queue_timeout.count();
187 0 : m_max_sequence_length = m_trb_conf->get_max_sequence_length_ticks();
188 0 : TLOG() << get_name() << ": Max time window is " << m_max_sequence_length;
189 :
190 0 : m_this_trb_source_id.subsystem = daqdataformats::SourceID::Subsystem::kTRBuilder;
191 0 : m_this_trb_source_id.id = m_trb_conf->get_source_id();
192 :
193 0 : m_max_open_trigger_records = m_trb_conf->get_maximum_open_trigger_records();
194 :
195 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting do_conf() method";
196 0 : }
197 :
198 : void
199 0 : TRBModule::do_scrap(const CommandData_t& /*args*/)
200 : {
201 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering do_scrap() method";
202 :
203 0 : TLOG() << get_name() << " successfully scrapped";
204 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting do_scrap() method";
205 0 : }
206 :
207 : void
208 0 : TRBModule::do_start(const CommandData_t& args)
209 : {
210 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering do_start() method";
211 :
212 : // clean books from possible previous memory
213 0 : m_trigger_records.clear();
214 0 : m_trigger_decisions_counter.store(0);
215 0 : m_unexpected_trigger_decisions.store(0);
216 0 : m_pending_fragment_counter.store(0);
217 0 : m_generated_trigger_records.store(0);
218 0 : m_fragment_counter.store(0);
219 0 : m_timed_out_trigger_records.store(0);
220 0 : m_abandoned_trigger_records.store(0);
221 0 : m_unexpected_fragments.store(0);
222 0 : m_lost_fragments.store(0);
223 0 : m_invalid_requests.store(0);
224 0 : m_duplicated_trigger_ids.store(0);
225 :
226 : // 19-Dec-2024, KAB: check that DataRequest senders are ready to send. This is done so
227 : // that the IOManager infrastructure fetches the necessary connection details from
228 : // the ConnectivityService at 'start' time, instead of the first time that the sender
229 : // is used to send a message. This avoids delays in the sending of the first request in
230 : // the first data-taking run in a DAQ session. Such delays can lead to undesirable
231 : // system behavior like trigger inhibits.
232 0 : {
233 0 : std::unique_lock<std::mutex> lk(m_map_sourceid_connections_mutex);
234 0 : for (const auto& sid_sender : m_map_sourceid_connections) {
235 0 : std::shared_ptr<data_req_sender_t> sender = sid_sender.second;
236 0 : if (sender != nullptr) {
237 0 : bool is_ready = sender->is_ready_for_sending(std::chrono::milliseconds(100));
238 0 : TLOG_DEBUG(0) << "The DataRequest sender for " << sid_sender.first << " " << (is_ready ? "is" : "is not")
239 0 : << " ready.";
240 : }
241 0 : }
242 0 : }
243 :
244 0 : m_run_number.reset(new const daqdataformats::run_number_t(args.at("run").get<daqdataformats::run_number_t>()));
245 0 : m_stop_requested = false;
246 :
247 : // Register the callback to receive monitoring requests
248 0 : if (m_mon_receiver) {
249 0 : m_mon_requests.clear();
250 0 : m_mon_receiver->add_callback(std::bind(&TRBModule::tr_requested, this, std::placeholders::_1));
251 : }
252 :
253 0 : m_fragment_input->add_callback(std::bind(&TRBModule::fragments_callback, this, std::placeholders::_1));
254 0 : m_trigger_decision_input->add_callback(std::bind(&TRBModule::trigger_decision_callback, this, std::placeholders::_1));
255 :
256 0 : TLOG() << get_name() << " successfully started";
257 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting do_start() method";
258 0 : }
259 :
260 : void
261 0 : TRBModule::do_stop(const CommandData_t& /*args*/)
262 : {
263 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering do_stop() method";
264 : // Unregister the monitoring requests callback
265 :
266 0 : if (m_mon_receiver) {
267 0 : m_mon_receiver->remove_callback();
268 : }
269 :
270 0 : m_trigger_decision_input->remove_callback();
271 0 : m_fragment_input->remove_callback();
272 :
273 0 : m_stop_requested = true;
274 :
275 0 : flush_trigger_records();
276 :
277 0 : TLOG() << get_name() << " successfully stopped";
278 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting do_stop() method";
279 0 : }
280 :
281 : void
282 0 : TRBModule::tr_requested(const dfmessages::TRMonRequest& req)
283 : {
284 0 : ++m_trmon_request_counter;
285 :
286 : // Ignore requests that don't belong to the ongoing run
287 0 : if (req.run_number != *m_run_number)
288 0 : return;
289 :
290 : // Add requests to pending requests
291 : // To be done: choose a concurrent container implementation.
292 0 : const std::lock_guard<std::mutex> lock(m_mon_mutex);
293 0 : m_mon_requests.push_back(req);
294 0 : }
295 :
296 : void
297 0 : TRBModule::flush_trigger_records()
298 : {
299 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Starting draining phase ";
300 0 : std::chrono::steady_clock::time_point t1 = std::chrono::steady_clock::now();
301 :
302 : // //-------------------------------------------------
303 : // // Here we drain what has been left from the running condition
304 : // //--------------------------------------------------
305 :
306 : // create all possible trigger record
307 0 : std::vector<TriggerId> triggers;
308 0 : for (const auto& entry : m_trigger_records) {
309 0 : triggers.push_back(entry.first);
310 : }
311 :
312 : // create the trigger record and send it
313 0 : for (const auto& t : triggers) {
314 0 : send_trigger_record(t);
315 : }
316 :
317 0 : std::chrono::steady_clock::time_point t2 = std::chrono::steady_clock::now();
318 :
319 0 : std::chrono::duration<double> time_span = std::chrono::duration_cast<std::chrono::duration<double>>(t2 - t1);
320 :
321 0 : std::ostringstream oss_summ;
322 0 : oss_summ << ": Exiting the do_work() method, " << m_trigger_records.size() << " remaining Trigger Records"
323 0 : << std::endl
324 0 : << "Draining took : " << time_span.count() << " s";
325 0 : TLOG() << ProgressUpdate(ERS_HERE, get_name(), oss_summ.str());
326 :
327 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting do_work() method";
328 0 : } // NOLINT(readability/fn_size)
329 :
330 : void
331 0 : TRBModule::fragments_callback(std::unique_ptr<daqdataformats::Fragment>& temp_fragment)
332 : {
333 :
334 0 : auto start_time = std::chrono::steady_clock::now();
335 :
336 0 : TLOG_DEBUG(TLVL_FRAGMENT_RECEIVE) << get_name() << " Received fragment for trigger/sequence_number "
337 0 : << temp_fragment->get_trigger_number() << "."
338 0 : << temp_fragment->get_sequence_number() << " from "
339 0 : << temp_fragment->get_element_id();
340 :
341 0 : TriggerId temp_id(*temp_fragment);
342 0 : std::vector<TriggerId> complete;
343 0 : bool requested = false;
344 :
345 0 : { // Begin mutex block
346 0 : std::lock_guard<std::mutex> lk(m_trigger_records_mutex);
347 0 : auto it = m_trigger_records.find(temp_id);
348 :
349 0 : if (it != m_trigger_records.end()) {
350 :
351 : // check if the fragment has a Source Id that was desired
352 0 : daqdataformats::TriggerRecordHeader& header = it->second.second->get_header_ref();
353 :
354 0 : for (size_t i = 0; i < header.get_num_requested_components(); ++i) {
355 :
356 0 : const daqdataformats::ComponentRequest& request = header[i];
357 0 : if (request.component == temp_fragment->get_element_id()) {
358 : requested = true;
359 : break;
360 : }
361 :
362 : } // request loop
363 :
364 : } // if there is a corresponding trigger ID entry in the boook
365 :
366 0 : if (requested) {
367 0 : it->second.second->add_fragment(std::move(temp_fragment));
368 0 : ++m_fragment_counter;
369 0 : --m_pending_fragment_counter;
370 : } else {
371 0 : ers::error(UnexpectedFragment(
372 0 : ERS_HERE, temp_id, temp_fragment->get_fragment_type_code(), temp_fragment->get_element_id()));
373 0 : ++m_unexpected_fragments;
374 : }
375 :
376 : // Only do bookkeeping periodically
377 0 : if (std::chrono::duration_cast<std::chrono::milliseconds>(clock_type::now() - m_last_bookkeeping).count() > 1000) {
378 : //-------------------------------------------------
379 : // Check if trigger records are complete or timedout
380 : // and create dedicated record
381 : //--------------------------------------------------
382 0 : TLOG_DEBUG(TLVL_BOOKKEEPING) << "Bookeeping status: " << m_trigger_records.size()
383 0 : << " trigger records in progress ";
384 :
385 0 : for (const auto& tr : m_trigger_records) {
386 :
387 0 : auto comp_size = tr.second.second->get_fragments_ref().size();
388 0 : auto requ_size = tr.second.second->get_header_ref().get_num_requested_components();
389 0 : std::ostringstream message;
390 0 : message << tr.first << " with " << comp_size << '/' << requ_size << " components";
391 :
392 0 : if (comp_size == requ_size) {
393 :
394 0 : message << ": complete";
395 0 : complete.push_back(tr.first);
396 : }
397 :
398 0 : TLOG_DEBUG(TLVL_BOOKKEEPING) << message.str();
399 0 : m_last_bookkeeping = clock_type::now();
400 :
401 0 : } // loop over TRs to check if they are complete
402 0 : } else if (requested && it != m_trigger_records.end()) {
403 :
404 0 : auto comp_size = it->second.second->get_fragments_ref().size();
405 0 : auto requ_size = it->second.second->get_header_ref().get_num_requested_components();
406 :
407 0 : if (comp_size == requ_size) {
408 0 : complete.push_back(temp_id);
409 : }
410 : }
411 0 : } // End mutex block
412 : //------------------------------------------------
413 : // Create TriggerRecords and send them
414 : //-----------------------------------------------
415 :
416 0 : for (const auto& id : complete) {
417 :
418 0 : send_trigger_record(id);
419 :
420 : } // loop over compled trigger id
421 :
422 0 : check_stale_requests();
423 :
424 0 : ++m_received_fragments;
425 :
426 0 : auto end_time = std::chrono::steady_clock::now();
427 0 : m_fragment_processing_us += std::chrono::duration_cast<std::chrono::microseconds>(end_time - start_time).count();
428 0 : }
429 :
430 : void
431 0 : TRBModule::trigger_decision_callback(dfmessages::TriggerDecision& td)
432 : {
433 :
434 0 : auto start_time = std::chrono::steady_clock::now();
435 :
436 0 : if (td.run_number != *m_run_number) {
437 0 : ers::error(UnexpectedTriggerDecision(ERS_HERE, td.trigger_number, td.run_number, *m_run_number));
438 0 : ++m_unexpected_trigger_decisions;
439 0 : return;
440 : }
441 :
442 0 : ++m_received_trigger_decisions;
443 :
444 0 : create_trigger_records_and_dispatch(td);
445 : //check_stale_requests();
446 :
447 0 : auto end_time = std::chrono::steady_clock::now();
448 0 : m_td_processing_us += std::chrono::duration_cast<std::chrono::microseconds>(end_time - start_time).count();
449 : }
450 :
451 : TRBModule::trigger_record_ptr_t
452 0 : TRBModule::extract_trigger_record(const TriggerId& id)
453 : {
454 0 : std::unique_lock<std::mutex> lk(m_trigger_records_mutex);
455 0 : auto it = m_trigger_records.extract(id);
456 0 : m_open_trigger_record_cv.notify_one();
457 0 : lk.unlock();
458 :
459 0 : if (it.empty())
460 0 : return nullptr;
461 :
462 0 : trigger_record_ptr_t temp = std::move(it.mapped().second);
463 :
464 0 : auto time = clock_type::now();
465 0 : auto duration = time - it.mapped().first;
466 :
467 0 : m_data_waiting_time += std::chrono::duration_cast<duration_type>(duration).count();
468 :
469 0 : --m_trigger_decisions_counter;
470 0 : m_fragment_counter -= temp->get_fragments_ref().size();
471 :
472 0 : auto missing_fragments = temp->get_header_ref().get_num_requested_components() - temp->get_fragments_ref().size();
473 :
474 0 : if (missing_fragments > 0) {
475 :
476 0 : m_lost_fragments += missing_fragments;
477 0 : m_pending_fragment_counter -= missing_fragments;
478 0 : temp->get_header_ref().set_error_bit(TriggerRecordErrorBits::kIncomplete, true);
479 :
480 0 : TLOG() << get_name() << " sending incomplete TriggerRecord downstream "
481 0 : << (m_stop_requested.load() ? "at Stop time " : "") << "(trigger/run_number=" << id << ", "
482 0 : << temp->get_fragments_ref().size() << " of " << temp->get_header_ref().get_num_requested_components()
483 0 : << " fragments included)";
484 : }
485 :
486 0 : return temp;
487 0 : }
488 :
489 : unsigned int
490 0 : TRBModule::create_trigger_records_and_dispatch(const dfmessages::TriggerDecision& td)
491 : {
492 :
493 0 : unsigned int new_tr_counter = 0;
494 :
495 : // check the whole time window
496 0 : daqdataformats::timestamp_t begin = std::numeric_limits<daqdataformats::timestamp_t>::max();
497 0 : daqdataformats::timestamp_t end = 0;
498 :
499 0 : for (const auto& component : td.components) {
500 0 : if (component.window_begin < begin)
501 0 : begin = component.window_begin;
502 0 : if (component.window_end > end)
503 0 : end = component.window_end;
504 : }
505 :
506 0 : daqdataformats::timestamp_diff_t tot_width = end - begin;
507 0 : daqdataformats::sequence_number_t max_sequence_number =
508 0 : (m_max_sequence_length > 0 && tot_width > 0) ? ((tot_width - 1) / m_max_sequence_length) : 0;
509 :
510 0 : TLOG_DEBUG(TLVL_WORK_STEPS) << get_name() << ": trig_number " << td.trigger_number << ": run_number " << td.run_number
511 0 : << ": trig_timestamp " << td.trigger_timestamp << " will have " << max_sequence_number + 1
512 0 : << " sequences";
513 :
514 0 : m_trigger_decision_width += tot_width;
515 :
516 : // create the trigger records
517 0 : for (daqdataformats::sequence_number_t sequence = 0; sequence <= max_sequence_number; ++sequence) {
518 :
519 0 : daqdataformats::timestamp_t slice_begin = begin + sequence * m_max_sequence_length;
520 0 : daqdataformats::timestamp_t slice_end =
521 0 : m_max_sequence_length > 0 ? std::min(slice_begin + m_max_sequence_length, end) : end;
522 :
523 0 : TLOG_DEBUG(TLVL_WORK_STEPS) << get_name() << ": trig_number " << td.trigger_number << ", sequence " << sequence
524 0 : << " ts=" << slice_begin << ":" << slice_end << " (TR " << begin << ":" << end << ")";
525 :
526 : // create the components cropped in time
527 0 : decltype(td.components) slice_components;
528 0 : for (const auto& component : td.components) {
529 :
530 0 : if (component.window_begin > slice_end)
531 0 : continue;
532 0 : if (component.window_end < slice_begin)
533 0 : continue;
534 :
535 0 : daqdataformats::timestamp_t new_begin = std::max(slice_begin, component.window_begin);
536 0 : daqdataformats::timestamp_t new_end = std::min(slice_end, component.window_end);
537 :
538 0 : daqdataformats::ComponentRequest temp(component.component, new_begin, new_end);
539 0 : slice_components.push_back(temp);
540 :
541 0 : m_data_request_width += new_end - new_begin;
542 :
543 : } // loop over component in trigger decision
544 :
545 : // Pleae note that the system could generate empty sequences
546 : // The code keeps them.
547 :
548 : // create the book entry
549 0 : TriggerId slice_id(td, sequence);
550 0 : {
551 0 : std::unique_lock<std::mutex> lk(m_trigger_records_mutex);
552 0 : m_open_trigger_record_cv.wait(lk, [&] { return m_trigger_records.size() < m_max_open_trigger_records; });
553 0 : auto it = m_trigger_records.find(slice_id);
554 0 : if (it != m_trigger_records.end()) {
555 0 : ers::error(DuplicatedTriggerDecision(ERS_HERE, slice_id));
556 0 : ++m_duplicated_trigger_ids;
557 0 : continue;
558 : }
559 :
560 : // create trigger record for the slice
561 0 : auto& entry = m_trigger_records[slice_id] = std::make_pair(clock_type::now(), trigger_record_ptr_t());
562 0 : trigger_record_ptr_t& trp = entry.second;
563 0 : trp.reset(new daqdataformats::TriggerRecord(slice_components));
564 0 : daqdataformats::TriggerRecord& tr = *trp;
565 :
566 0 : tr.get_header_ref().set_trigger_number(td.trigger_number);
567 0 : tr.get_header_ref().set_sequence_number(sequence);
568 0 : tr.get_header_ref().set_max_sequence_number(max_sequence_number);
569 0 : tr.get_header_ref().set_run_number(td.run_number);
570 0 : tr.get_header_ref().set_trigger_timestamp(td.trigger_timestamp);
571 0 : tr.get_header_ref().set_trigger_type(td.trigger_type);
572 0 : tr.get_header_ref().set_element_id(m_this_trb_source_id);
573 :
574 0 : m_trigger_decisions_counter++;
575 0 : m_pending_fragment_counter += slice_components.size();
576 0 : ++new_tr_counter;
577 0 : }
578 :
579 : // create and send the requests
580 0 : TLOG_DEBUG(TLVL_WORK_STEPS) << get_name() << ": Trigger Decision components: " << td.components.size()
581 0 : << ", slice components: " << slice_components.size();
582 0 : for (const auto& component : slice_components) {
583 :
584 0 : dfmessages::DataRequest dataReq;
585 0 : dataReq.trigger_number = td.trigger_number;
586 0 : dataReq.sequence_number = sequence;
587 0 : dataReq.run_number = td.run_number;
588 0 : dataReq.trigger_timestamp = td.trigger_timestamp;
589 0 : dataReq.readout_type = td.readout_type;
590 0 : dataReq.request_information = component;
591 0 : dataReq.data_destination = m_reply_connection;
592 0 : TLOG_DEBUG(TLVL_WORK_STEPS) << get_name() << ": TR " << slice_id << ": trig_timestamp "
593 0 : << dataReq.trigger_timestamp << ": SourceID " << component.component << ": window ["
594 0 : << dataReq.request_information.window_begin << ", "
595 0 : << dataReq.request_information.window_end << ']';
596 :
597 0 : dispatch_data_requests(std::move(dataReq), component.component);
598 :
599 0 : } // loop loop over component in the slice
600 :
601 0 : } // sequence loop
602 :
603 0 : return new_tr_counter;
604 : }
605 :
606 : bool
607 0 : TRBModule::dispatch_data_requests(dfmessages::DataRequest dr, const daqdataformats::SourceID& sid)
608 :
609 : {
610 :
611 : // find the queue for sourceid_req in the map
612 0 : std::unique_lock<std::mutex> lk(m_map_sourceid_connections_mutex);
613 0 : std::shared_ptr<data_req_sender_t> sender = nullptr;
614 0 : auto it_req = m_map_sourceid_connections.find(sid);
615 0 : if (it_req == m_map_sourceid_connections.end() || it_req->second == nullptr) {
616 :
617 : // if sourceid request is not valid. then print error and continue
618 0 : ers::error(
619 0 : dunedaq::dfmodules::DRSenderLookupFailed(ERS_HERE, sid, dr.run_number, dr.trigger_number, dr.sequence_number));
620 0 : ++m_invalid_requests;
621 0 : return false; // lk goes out of scope, is destroyed
622 : } else {
623 : // get the queue from map element
624 0 : sender = it_req->second;
625 : }
626 0 : lk.unlock();
627 :
628 0 : if (sender == nullptr) {
629 : // if sender lookup failed, report error and continue
630 0 : ers::error(
631 0 : dunedaq::dfmodules::DRSenderLookupFailed(ERS_HERE, sid, dr.run_number, dr.trigger_number, dr.sequence_number));
632 0 : ++m_invalid_requests;
633 0 : return false;
634 : }
635 :
636 : bool wasSentSuccessfully = false;
637 0 : do {
638 0 : TLOG_DEBUG(TLVL_DISPATCH_DATAREQ) << get_name() << ": Pushing the DataRequest from trigger/sequence number "
639 0 : << dr.trigger_number << "." << dr.sequence_number
640 0 : << " onto connection :" << sender->get_name();
641 :
642 : // send data request into the corresponding connection
643 0 : try {
644 0 : sender->send(std::move(dr), m_dreq_queue_timeout);
645 0 : wasSentSuccessfully = true;
646 0 : ++m_generated_data_requests;
647 0 : } catch (const ers::Issue& excpt) {
648 0 : std::ostringstream oss_warn;
649 0 : oss_warn << "Send to connection \"" << sender->get_name() << "\" failed";
650 0 : ers::warning(iomanager::OperationFailed(ERS_HERE, oss_warn.str(), excpt));
651 0 : }
652 0 : } while (!wasSentSuccessfully && !m_stop_requested.load());
653 :
654 : return wasSentSuccessfully;
655 0 : }
656 :
657 : bool
658 0 : TRBModule::send_trigger_record(const TriggerId& id)
659 : {
660 :
661 0 : trigger_record_ptr_t temp_record(extract_trigger_record(id));
662 :
663 : // Send to monitoring, if needed
664 :
665 0 : if (m_mon_receiver) {
666 0 : const std::lock_guard<std::mutex> lock(m_mon_mutex);
667 0 : auto it = m_mon_requests.begin();
668 0 : std::set<std::string> sent_destinations;
669 :
670 0 : while (it != m_mon_requests.end()) {
671 : // Only sent TR to each monitor once
672 0 : if (sent_destinations.count(it->data_destination)) {
673 0 : ++it;
674 0 : continue;
675 : }
676 :
677 : // send TR to mon if correct trigger type
678 0 : if ((it->trigger_type_mask & temp_record->get_header_data().trigger_type) != 0) {
679 0 : auto iom = iomanager::IOManager::get();
680 : bool wasSentSuccessfully = false;
681 0 : do {
682 0 : try {
683 : // HACK to copy the trigger record so we can send it off to monitoring
684 0 : auto trigger_record_bytes =
685 0 : serialization::serialize(temp_record, serialization::SerializationType::kMsgPack);
686 0 : trigger_record_ptr_t record_copy = serialization::deserialize<trigger_record_ptr_t>(trigger_record_bytes);
687 0 : iom->get_sender<trigger_record_ptr_t>(it->data_destination)->send(std::move(record_copy), m_tr_queue_timeout);
688 0 : ++m_trmon_sent_counter;
689 0 : wasSentSuccessfully = true;
690 0 : } catch (const ers::Issue& excpt) {
691 0 : std::ostringstream oss_warn;
692 0 : oss_warn << "Sending TR to connection \"" << it->data_destination << "\" failed";
693 0 : ers::warning(iomanager::OperationFailed(ERS_HERE, oss_warn.str(), excpt));
694 0 : }
695 0 : } while (!m_stop_requested.load() && !wasSentSuccessfully);
696 0 : sent_destinations.insert(it->data_destination);
697 0 : it = m_mon_requests.erase(it);
698 0 : } else {
699 0 : ++it;
700 : }
701 : }
702 0 : } // if m_mon_receiver
703 :
704 : bool wasSentSuccessfully = false;
705 0 : do {
706 0 : try {
707 0 : m_trigger_record_output->send(std::move(temp_record), m_tr_queue_timeout);
708 0 : wasSentSuccessfully = true;
709 0 : ++m_generated_trigger_records;
710 0 : } catch (const ers::Issue& excpt) {
711 0 : ers::warning(excpt);
712 0 : }
713 0 : } while (!m_stop_requested.load() && !wasSentSuccessfully); // push while loop
714 :
715 0 : if (!wasSentSuccessfully) {
716 0 : ++m_abandoned_trigger_records;
717 0 : m_lost_fragments += temp_record->get_fragments_ref().size();
718 0 : ers::error(dunedaq::dfmodules::AbandonedTriggerDecision(ERS_HERE, id));
719 : }
720 :
721 0 : return wasSentSuccessfully;
722 0 : }
723 :
724 : bool
725 0 : TRBModule::check_stale_requests()
726 : {
727 :
728 0 : bool book_updates = false;
729 :
730 : // -----------------------------------------------
731 : // optionally send over stale trigger records
732 : // -----------------------------------------------
733 :
734 0 : if (m_trigger_timeout.count() > 0) {
735 :
736 0 : std::vector<TriggerId> stale_triggers;
737 0 : {
738 0 : std::lock_guard<std::mutex> lk(m_trigger_records_mutex);
739 0 : for (auto it = m_trigger_records.begin(); it != m_trigger_records.end(); ++it) {
740 :
741 0 : daqdataformats::TriggerRecord& tr = *it->second.second;
742 :
743 0 : auto tr_time = clock_type::now() - it->second.first;
744 :
745 0 : if (tr_time > m_trigger_timeout) {
746 :
747 0 : ers::error(TimedOutTriggerDecision(ERS_HERE, it->first, tr.get_header_ref().get_trigger_timestamp()));
748 :
749 : // mark trigger record for seding
750 0 : stale_triggers.push_back(it->first);
751 0 : ++m_timed_out_trigger_records;
752 :
753 0 : book_updates = true;
754 : }
755 :
756 : } // trigger record loop
757 0 : }
758 : // create the trigger record and send it
759 0 : for (const auto& t : stale_triggers) {
760 0 : send_trigger_record(t);
761 : }
762 :
763 0 : } // m_trigger_timeout > 0
764 :
765 0 : return book_updates;
766 : }
767 :
768 : } // namespace dfmodules
769 : } // namespace dunedaq
770 :
771 0 : DEFINE_DUNE_DAQ_MODULE(dunedaq::dfmodules::TRBModule)
|