Line data Source code
1 : /**
2 : * @file TRBModule.cpp TRBModule class implementation
3 : *
4 : * This is part of the DUNE DAQ Software Suite, copyright 2020.
5 : * Licensing/copyright details are in the COPYING file that you should have
6 : * received with this code.
7 : */
8 :
9 : #include "TRBModule.hpp"
10 : #include "dfmodules/CommonIssues.hpp"
11 :
12 : #include "appmodel/NetworkConnectionDescriptor.hpp"
13 : #include "appmodel/NetworkConnectionRule.hpp"
14 : #include "appmodel/ReadoutApplication.hpp"
15 : #include "appmodel/SourceIDConf.hpp"
16 : #include "appmodel/SourceIDToNetworkConnection.hpp"
17 : #include "appmodel/TRBModule.hpp"
18 : #include "appmodel/TriggerApplication.hpp"
19 : #include "confmodel/Application.hpp"
20 : #include "confmodel/Connection.hpp"
21 : #include "confmodel/DetectorStream.hpp"
22 : #include "confmodel/DetectorToDaqConnection.hpp"
23 : #include "confmodel/NetworkConnection.hpp"
24 : #include "confmodel/Session.hpp"
25 : #include "dfmessages/TriggerRecord_serialization.hpp"
26 : #include "logging/Logging.hpp"
27 :
28 : #include "iomanager/IOManager.hpp"
29 :
30 : #include <algorithm>
31 : #include <chrono>
32 : #include <cmath>
33 : #include <cstdlib>
34 : #include <limits>
35 : #include <memory>
36 : #include <numeric>
37 : #include <string>
38 : #include <thread>
39 : #include <utility>
40 : #include <vector>
41 :
42 : /**
43 : * @brief TRACE debug levels used in this source file
44 : */
45 : enum
46 : {
47 : TLVL_ENTER_EXIT_METHODS = 5,
48 : TLVL_INIT = 8,
49 : TLVL_WORK_STEPS = 10,
50 : TLVL_BOOKKEEPING = 15,
51 : TLVL_DISPATCH_DATAREQ = 21,
52 : TLVL_FRAGMENT_RECEIVE = 22
53 : };
54 :
55 : namespace dunedaq {
56 : namespace dfmodules {
57 :
58 : using daqdataformats::TriggerRecordErrorBits;
59 :
60 0 : TRBModule::TRBModule(const std::string& name)
61 : : dunedaq::appfwk::DAQModule(name)
62 0 : , m_stop_requested(true)
63 0 : , m_tr_queue_timeout(100)
64 0 : , m_dreq_queue_timeout(100)
65 : {
66 :
67 0 : register_command("conf", &TRBModule::do_conf);
68 0 : register_command("scrap", &TRBModule::do_scrap);
69 0 : register_command("start", &TRBModule::do_start);
70 0 : register_command("stop", &TRBModule::do_stop);
71 0 : }
72 :
73 : void
74 0 : TRBModule::init(std::shared_ptr<appfwk::ConfigurationManager> mcfg)
75 : {
76 :
77 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering init() method";
78 :
79 : //--------------------------------
80 : // Get single queues
81 : //---------------------------------
82 :
83 0 : auto mdal = mcfg->get_dal<appmodel::TRBModule>(get_name());
84 0 : if (!mdal) {
85 0 : throw appfwk::CommandFailed(ERS_HERE, "init", get_name(), "Unable to retrieve configuration object");
86 : }
87 :
88 0 : auto iom = iomanager::IOManager::get();
89 0 : for (auto con : mdal->get_inputs()) {
90 0 : if (con->get_data_type() == datatype_to_string<dfmessages::TriggerDecision>()) {
91 0 : m_trigger_decision_input = iom->get_receiver<dfmessages::TriggerDecision>(con->UID());
92 : }
93 0 : if (con->get_data_type() == datatype_to_string<std::unique_ptr<daqdataformats::Fragment>>()) {
94 0 : m_fragment_input = iom->get_receiver<std::unique_ptr<daqdataformats::Fragment>>(con->UID());
95 :
96 : // save the data fragment receiver global connection name for later, when it gets
97 : // copied into the DataRequests so that data producers know where to send their fragments
98 0 : m_reply_connection = con->UID();
99 : }
100 0 : if (con->get_data_type() == datatype_to_string<dfmessages::TRMonRequest>()) {
101 0 : m_mon_receiver = iom->get_receiver<dfmessages::TRMonRequest>(con->UID());
102 : }
103 : }
104 :
105 0 : if (m_trigger_decision_input == nullptr) {
106 0 : throw InvalidQueueFatalError(ERS_HERE, get_name(), "TriggerDecision Input queue");
107 : }
108 0 : if (m_fragment_input == nullptr) {
109 0 : throw InvalidQueueFatalError(ERS_HERE, get_name(), "Fragment Input queue");
110 : }
111 :
112 0 : m_trigger_record_output = iom->get_sender<std::unique_ptr<daqdataformats::TriggerRecord>>(mdal->get_trigger_record_output()->UID());
113 :
114 0 : for (auto con : mdal->get_request_connections()) {
115 0 : for (auto source_id : con->get_source_ids()) {
116 :
117 : // find the queue for sourceid_req in the map
118 0 : std::unique_lock<std::mutex> lk(m_map_sourceid_connections_mutex);
119 0 : daqdataformats::SourceID sid;
120 0 : sid.subsystem = daqdataformats::SourceID::string_to_subsystem(source_id->get_subsystem());
121 0 : sid.id = source_id->get_sid();
122 0 : auto it_req = m_map_sourceid_connections.find(sid);
123 0 : if (it_req == m_map_sourceid_connections.end() || it_req->second == nullptr) {
124 0 : m_map_sourceid_connections[sid] = get_iom_sender<dfmessages::DataRequest>(con->get_netconn()->UID());
125 : }
126 0 : lk.unlock();
127 0 : }
128 : }
129 :
130 0 : m_trb_conf = mdal->get_configuration();
131 :
132 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting init() method";
133 0 : }
134 :
135 : void
136 0 : TRBModule::generate_opmon_data()
137 : {
138 :
139 0 : opmon::TRBInfo i;
140 :
141 : // status metrics
142 0 : i.set_pending_trigger_decisions(m_trigger_decisions_counter.load());
143 0 : i.set_fragments_in_the_book(m_fragment_counter.load());
144 0 : i.set_pending_fragments(m_pending_fragment_counter.load());
145 :
146 : // operation metrics
147 0 : i.set_received_trigger_decisions(m_received_trigger_decisions.exchange(0));
148 0 : i.set_generated_trigger_records(m_generated_trigger_records.exchange(0));
149 0 : i.set_generated_data_requests(m_generated_data_requests.exchange(0));
150 0 : i.set_received_fragments(m_received_fragments.exchange(0));
151 0 : i.set_data_waiting_time(m_data_waiting_time.exchange(0));
152 0 : i.set_data_request_width(m_data_request_width.exchange(0));
153 0 : i.set_trigger_decision_width(m_trigger_decision_width.exchange(0));
154 0 : i.set_received_trmon_requests(m_trmon_request_counter.exchange(0));
155 0 : i.set_sent_trmon(m_trmon_sent_counter.exchange(0));
156 :
157 0 : i.set_td_processing_us(m_td_processing_us.exchange(0));
158 0 : i.set_fragment_processing_us(m_fragment_processing_us.exchange(0));
159 :
160 0 : publish(std::move(i));
161 :
162 0 : opmon::TRBErrors err;
163 : // error counters
164 0 : err.set_timed_out_trigger_records(m_timed_out_trigger_records.load());
165 0 : err.set_abandoned_trigger_records(m_abandoned_trigger_records.load());
166 0 : err.set_unexpected_fragments(m_unexpected_fragments.load());
167 0 : err.set_unexpected_trigger_decisions(m_unexpected_trigger_decisions.load());
168 0 : err.set_lost_fragments(m_lost_fragments.load());
169 0 : err.set_invalid_requests(m_invalid_requests.load());
170 0 : err.set_duplicated_trigger_ids(m_duplicated_trigger_ids.load());
171 :
172 0 : publish(std::move(err));
173 0 : }
174 :
175 : void
176 0 : TRBModule::do_conf(const CommandData_t&)
177 : {
178 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering do_conf() method";
179 :
180 0 : m_trigger_timeout = std::chrono::milliseconds(m_trb_conf->get_trigger_record_timeout_ms());
181 :
182 0 : m_tr_queue_timeout = std::chrono::milliseconds(m_trb_conf->get_tr_queue_timeout());
183 0 : m_dreq_queue_timeout = std::chrono::milliseconds(m_trb_conf->get_request_queue_timeout());
184 :
185 0 : TLOG() << get_name() << ": timeouts (ms): TR = " << m_tr_queue_timeout.count()
186 0 : << ", DReq = " << m_dreq_queue_timeout.count();
187 0 : m_max_sequence_length = m_trb_conf->get_max_sequence_length_ticks();
188 0 : TLOG() << get_name() << ": Max time window is " << m_max_sequence_length;
189 :
190 0 : m_this_trb_source_id.subsystem = daqdataformats::SourceID::Subsystem::kTRBuilder;
191 0 : m_this_trb_source_id.id = m_trb_conf->get_source_id();
192 :
193 0 : m_max_open_trigger_records = m_trb_conf->get_maximum_open_trigger_records();
194 :
195 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting do_conf() method";
196 0 : }
197 :
198 : void
199 0 : TRBModule::do_scrap(const CommandData_t& /*args*/)
200 : {
201 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering do_scrap() method";
202 :
203 0 : TLOG() << get_name() << " successfully scrapped";
204 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting do_scrap() method";
205 0 : }
206 :
207 : void
208 0 : TRBModule::do_start(const CommandData_t& args)
209 : {
210 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering do_start() method";
211 :
212 : // clean books from possible previous memory
213 0 : m_trigger_records.clear();
214 0 : m_trigger_decisions_counter.store(0);
215 0 : m_unexpected_trigger_decisions.store(0);
216 0 : m_pending_fragment_counter.store(0);
217 0 : m_generated_trigger_records.store(0);
218 0 : m_fragment_counter.store(0);
219 0 : m_timed_out_trigger_records.store(0);
220 0 : m_abandoned_trigger_records.store(0);
221 0 : m_unexpected_fragments.store(0);
222 0 : m_lost_fragments.store(0);
223 0 : m_invalid_requests.store(0);
224 0 : m_duplicated_trigger_ids.store(0);
225 :
226 : // 19-Dec-2024, KAB: check that DataRequest senders are ready to send. This is done so
227 : // that the IOManager infrastructure fetches the necessary connection details from
228 : // the ConnectivityService at 'start' time, instead of the first time that the sender
229 : // is used to send a message. This avoids delays in the sending of the first request in
230 : // the first data-taking run in a DAQ session. Such delays can lead to undesirable
231 : // system behavior like trigger inhibits.
232 0 : {
233 0 : std::unique_lock<std::mutex> lk(m_map_sourceid_connections_mutex);
234 0 : for (const auto& sid_sender : m_map_sourceid_connections) {
235 0 : std::shared_ptr<data_req_sender_t> sender = sid_sender.second;
236 0 : if (sender != nullptr) {
237 0 : bool is_ready = sender->is_ready_for_sending(std::chrono::milliseconds(100));
238 0 : TLOG_DEBUG(0) << "The DataRequest sender for " << sid_sender.first << " " << (is_ready ? "is" : "is not")
239 0 : << " ready.";
240 : }
241 0 : }
242 0 : }
243 :
244 0 : m_run_number.reset(new const daqdataformats::run_number_t(args.at("run").get<daqdataformats::run_number_t>()));
245 0 : m_stop_requested = false;
246 :
247 : // Register the callback to receive monitoring requests
248 0 : if (m_mon_receiver) {
249 0 : m_mon_requests.clear();
250 0 : m_mon_receiver->add_callback(std::bind(&TRBModule::tr_requested, this, std::placeholders::_1));
251 : }
252 :
253 0 : m_fragment_input->add_callback(std::bind(&TRBModule::fragments_callback, this, std::placeholders::_1));
254 0 : m_trigger_decision_input->add_callback(std::bind(&TRBModule::trigger_decision_callback, this, std::placeholders::_1));
255 :
256 0 : TLOG() << get_name() << " successfully started";
257 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting do_start() method";
258 0 : }
259 :
260 : void
261 0 : TRBModule::do_stop(const CommandData_t& /*args*/)
262 : {
263 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering do_stop() method";
264 : // Unregister the monitoring requests callback
265 :
266 0 : if (m_mon_receiver) {
267 0 : m_mon_receiver->remove_callback();
268 : }
269 :
270 0 : m_trigger_decision_input->remove_callback();
271 0 : m_fragment_input->remove_callback();
272 :
273 0 : m_stop_requested = true;
274 :
275 0 : flush_trigger_records();
276 :
277 0 : TLOG() << get_name() << " successfully stopped";
278 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting do_stop() method";
279 0 : }
280 :
281 : void
282 0 : TRBModule::tr_requested(const dfmessages::TRMonRequest& req)
283 : {
284 0 : ++m_trmon_request_counter;
285 :
286 : // Ignore requests that don't belong to the ongoing run
287 0 : if (req.run_number != *m_run_number)
288 0 : return;
289 :
290 : // Add requests to pending requests
291 : // To be done: choose a concurrent container implementation.
292 0 : const std::lock_guard<std::mutex> lock(m_mon_mutex);
293 0 : m_mon_requests.push_back(req);
294 0 : }
295 :
296 : void
297 0 : TRBModule::flush_trigger_records()
298 : {
299 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Starting draining phase ";
300 0 : std::chrono::steady_clock::time_point t1 = std::chrono::steady_clock::now();
301 :
302 : // //-------------------------------------------------
303 : // // Here we drain what has been left from the running condition
304 : // //--------------------------------------------------
305 :
306 : // create all possible trigger record
307 0 : std::vector<TriggerId> triggers;
308 0 : for (const auto& entry : m_trigger_records) {
309 0 : triggers.push_back(entry.first);
310 : }
311 :
312 : // create the trigger record and send it
313 0 : for (const auto& t : triggers) {
314 0 : send_trigger_record(t);
315 : }
316 :
317 0 : std::chrono::steady_clock::time_point t2 = std::chrono::steady_clock::now();
318 :
319 0 : std::chrono::duration<double> time_span = std::chrono::duration_cast<std::chrono::duration<double>>(t2 - t1);
320 :
321 0 : std::ostringstream oss_summ;
322 0 : oss_summ << ": Exiting the do_work() method, " << m_trigger_records.size() << " remaining Trigger Records"
323 0 : << std::endl
324 0 : << "Draining took : " << time_span.count() << " s";
325 0 : TLOG() << ProgressUpdate(ERS_HERE, get_name(), oss_summ.str());
326 :
327 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting do_work() method";
328 0 : } // NOLINT(readability/fn_size)
329 :
330 : void
331 0 : TRBModule::fragments_callback(std::unique_ptr<daqdataformats::Fragment>& temp_fragment)
332 : {
333 :
334 0 : auto start_time = std::chrono::steady_clock::now();
335 :
336 0 : TLOG_DEBUG(TLVL_FRAGMENT_RECEIVE) << get_name() << " Received fragment for trigger/sequence_number "
337 0 : << temp_fragment->get_trigger_number() << "."
338 0 : << temp_fragment->get_sequence_number() << " from "
339 0 : << temp_fragment->get_element_id();
340 :
341 0 : TriggerId temp_id(*temp_fragment);
342 0 : std::vector<TriggerId> complete;
343 0 : bool requested = false;
344 :
345 0 : { // Begin mutex block
346 0 : std::lock_guard<std::mutex> lk(m_trigger_records_mutex);
347 0 : auto it = m_trigger_records.find(temp_id);
348 :
349 0 : if (it != m_trigger_records.end()) {
350 :
351 : // check if the fragment has a Source Id that was desired
352 0 : daqdataformats::TriggerRecordHeader& header = it->second.second->get_header_ref();
353 :
354 0 : for (size_t i = 0; i < header.get_num_requested_components(); ++i) {
355 :
356 0 : const daqdataformats::ComponentRequest& request = header[i];
357 0 : if (request.component == temp_fragment->get_element_id()) {
358 : requested = true;
359 : break;
360 : }
361 :
362 : } // request loop
363 :
364 : } // if there is a corresponding trigger ID entry in the boook
365 :
366 0 : if (requested) {
367 0 : it->second.second->add_fragment(std::move(temp_fragment));
368 0 : ++m_fragment_counter;
369 0 : --m_pending_fragment_counter;
370 : } else {
371 0 : ers::error(UnexpectedFragment(
372 0 : ERS_HERE, temp_id, temp_fragment->get_fragment_type_code(), temp_fragment->get_element_id()));
373 0 : ++m_unexpected_fragments;
374 : }
375 :
376 : // Only do bookkeeping periodically
377 0 : if (std::chrono::duration_cast<std::chrono::milliseconds>(clock_type::now() - m_last_bookkeeping).count() > 1000) {
378 : //-------------------------------------------------
379 : // Check if trigger records are complete or timedout
380 : // and create dedicated record
381 : //--------------------------------------------------
382 0 : TLOG_DEBUG(TLVL_BOOKKEEPING) << "Bookeeping status: " << m_trigger_records.size()
383 0 : << " trigger records in progress ";
384 :
385 0 : for (const auto& tr : m_trigger_records) {
386 :
387 0 : auto comp_size = tr.second.second->get_fragments_ref().size();
388 0 : auto requ_size = tr.second.second->get_header_ref().get_num_requested_components();
389 0 : std::ostringstream message;
390 0 : message << tr.first << " with " << comp_size << '/' << requ_size << " components";
391 :
392 0 : if (comp_size == requ_size) {
393 :
394 0 : message << ": complete";
395 0 : complete.push_back(tr.first);
396 : }
397 :
398 0 : TLOG_DEBUG(TLVL_BOOKKEEPING) << message.str();
399 0 : m_last_bookkeeping = clock_type::now();
400 :
401 0 : } // loop over TRs to check if they are complete
402 0 : } else if (requested && it != m_trigger_records.end()) {
403 :
404 0 : auto comp_size = it->second.second->get_fragments_ref().size();
405 0 : auto requ_size = it->second.second->get_header_ref().get_num_requested_components();
406 :
407 0 : if (comp_size == requ_size) {
408 0 : complete.push_back(temp_id);
409 : }
410 : }
411 0 : } // End mutex block
412 : //------------------------------------------------
413 : // Create TriggerRecords and send them
414 : //-----------------------------------------------
415 :
416 0 : for (const auto& id : complete) {
417 :
418 0 : send_trigger_record(id);
419 :
420 : } // loop over compled trigger id
421 :
422 0 : check_stale_requests();
423 :
424 0 : ++m_received_fragments;
425 :
426 0 : auto end_time = std::chrono::steady_clock::now();
427 0 : m_fragment_processing_us += std::chrono::duration_cast<std::chrono::microseconds>(end_time - start_time).count();
428 0 : }
429 :
430 : void
431 0 : TRBModule::trigger_decision_callback(dfmessages::TriggerDecision& td)
432 : {
433 :
434 0 : auto start_time = std::chrono::steady_clock::now();
435 :
436 0 : if (td.run_number != *m_run_number) {
437 0 : ers::error(UnexpectedTriggerDecision(ERS_HERE, td.trigger_number, td.run_number, *m_run_number));
438 0 : ++m_unexpected_trigger_decisions;
439 0 : return;
440 : }
441 :
442 0 : ++m_received_trigger_decisions;
443 :
444 0 : create_trigger_records_and_dispatch(td);
445 : //check_stale_requests();
446 :
447 0 : auto end_time = std::chrono::steady_clock::now();
448 0 : m_td_processing_us += std::chrono::duration_cast<std::chrono::microseconds>(end_time - start_time).count();
449 : }
450 :
451 : TRBModule::trigger_record_ptr_t
452 0 : TRBModule::extract_trigger_record(const TriggerId& id)
453 : {
454 0 : std::unique_lock<std::mutex> lk(m_trigger_records_mutex);
455 0 : auto it = m_trigger_records.extract(id);
456 0 : m_open_trigger_record_cv.notify_one();
457 0 : lk.unlock();
458 :
459 0 : if (it.empty())
460 0 : return nullptr;
461 :
462 0 : trigger_record_ptr_t temp = std::move(it.mapped().second);
463 :
464 0 : auto time = clock_type::now();
465 0 : auto duration = time - it.mapped().first;
466 :
467 0 : m_data_waiting_time += std::chrono::duration_cast<duration_type>(duration).count();
468 :
469 0 : --m_trigger_decisions_counter;
470 0 : m_fragment_counter -= temp->get_fragments_ref().size();
471 :
472 0 : auto missing_fragments = temp->get_header_ref().get_num_requested_components() - temp->get_fragments_ref().size();
473 :
474 0 : if (missing_fragments > 0) {
475 :
476 0 : m_lost_fragments += missing_fragments;
477 0 : m_pending_fragment_counter -= missing_fragments;
478 0 : temp->get_header_ref().set_error_bit(TriggerRecordErrorBits::kIncomplete, true);
479 :
480 0 : ers::error(IncompleteTriggerRecord(ERS_HERE, (m_stop_requested.load() ? "at Stop time " : ""), id,
481 0 : temp->get_fragments_ref().size(),
482 0 : temp->get_header_ref().get_num_requested_components()));
483 : }
484 :
485 0 : return temp;
486 0 : }
487 :
488 : unsigned int
489 0 : TRBModule::create_trigger_records_and_dispatch(const dfmessages::TriggerDecision& td)
490 : {
491 :
492 0 : unsigned int new_tr_counter = 0;
493 :
494 : // check the whole time window
495 0 : daqdataformats::timestamp_t begin = std::numeric_limits<daqdataformats::timestamp_t>::max();
496 0 : daqdataformats::timestamp_t end = 0;
497 :
498 0 : for (const auto& component : td.components) {
499 0 : if (component.window_begin < begin)
500 0 : begin = component.window_begin;
501 0 : if (component.window_end > end)
502 0 : end = component.window_end;
503 : }
504 :
505 0 : daqdataformats::timestamp_diff_t tot_width = end - begin;
506 0 : daqdataformats::sequence_number_t max_sequence_number =
507 0 : (m_max_sequence_length > 0 && tot_width > 0) ? ((tot_width - 1) / m_max_sequence_length) : 0;
508 :
509 0 : TLOG_DEBUG(TLVL_WORK_STEPS) << get_name() << ": trig_number " << td.trigger_number << ": run_number " << td.run_number
510 0 : << ": trig_timestamp " << td.trigger_timestamp << " will have " << max_sequence_number + 1
511 0 : << " sequences";
512 :
513 0 : m_trigger_decision_width += tot_width;
514 :
515 : // create the trigger records
516 0 : for (daqdataformats::sequence_number_t sequence = 0; sequence <= max_sequence_number; ++sequence) {
517 :
518 0 : daqdataformats::timestamp_t slice_begin = begin + sequence * m_max_sequence_length;
519 0 : daqdataformats::timestamp_t slice_end =
520 0 : m_max_sequence_length > 0 ? std::min(slice_begin + m_max_sequence_length, end) : end;
521 :
522 0 : TLOG_DEBUG(TLVL_WORK_STEPS) << get_name() << ": trig_number " << td.trigger_number << ", sequence " << sequence
523 0 : << " ts=" << slice_begin << ":" << slice_end << " (TR " << begin << ":" << end << ")";
524 :
525 : // create the components cropped in time
526 0 : decltype(td.components) slice_components;
527 0 : for (const auto& component : td.components) {
528 :
529 0 : if (component.window_begin > slice_end)
530 0 : continue;
531 0 : if (component.window_end < slice_begin)
532 0 : continue;
533 :
534 0 : daqdataformats::timestamp_t new_begin = std::max(slice_begin, component.window_begin);
535 0 : daqdataformats::timestamp_t new_end = std::min(slice_end, component.window_end);
536 :
537 0 : daqdataformats::ComponentRequest temp(component.component, new_begin, new_end);
538 0 : slice_components.push_back(temp);
539 :
540 0 : m_data_request_width += new_end - new_begin;
541 :
542 : } // loop over component in trigger decision
543 :
544 : // Pleae note that the system could generate empty sequences
545 : // The code keeps them.
546 :
547 : // create the book entry
548 0 : TriggerId slice_id(td, sequence);
549 0 : {
550 0 : std::unique_lock<std::mutex> lk(m_trigger_records_mutex);
551 0 : m_open_trigger_record_cv.wait(lk, [&] { return m_trigger_records.size() < m_max_open_trigger_records; });
552 0 : auto it = m_trigger_records.find(slice_id);
553 0 : if (it != m_trigger_records.end()) {
554 0 : ers::error(DuplicatedTriggerDecision(ERS_HERE, slice_id));
555 0 : ++m_duplicated_trigger_ids;
556 0 : continue;
557 : }
558 :
559 : // create trigger record for the slice
560 0 : auto& entry = m_trigger_records[slice_id] = std::make_pair(clock_type::now(), trigger_record_ptr_t());
561 0 : trigger_record_ptr_t& trp = entry.second;
562 0 : trp.reset(new daqdataformats::TriggerRecord(slice_components));
563 0 : daqdataformats::TriggerRecord& tr = *trp;
564 :
565 0 : tr.get_header_ref().set_trigger_number(td.trigger_number);
566 0 : tr.get_header_ref().set_sequence_number(sequence);
567 0 : tr.get_header_ref().set_max_sequence_number(max_sequence_number);
568 0 : tr.get_header_ref().set_run_number(td.run_number);
569 0 : tr.get_header_ref().set_trigger_timestamp(td.trigger_timestamp);
570 0 : tr.get_header_ref().set_trigger_type(td.trigger_type);
571 0 : tr.get_header_ref().set_element_id(m_this_trb_source_id);
572 :
573 0 : m_trigger_decisions_counter++;
574 0 : m_pending_fragment_counter += slice_components.size();
575 0 : ++new_tr_counter;
576 0 : }
577 :
578 : // create and send the requests
579 0 : TLOG_DEBUG(TLVL_WORK_STEPS) << get_name() << ": Trigger Decision components: " << td.components.size()
580 0 : << ", slice components: " << slice_components.size();
581 0 : for (const auto& component : slice_components) {
582 :
583 0 : dfmessages::DataRequest dataReq;
584 0 : dataReq.trigger_number = td.trigger_number;
585 0 : dataReq.sequence_number = sequence;
586 0 : dataReq.run_number = td.run_number;
587 0 : dataReq.trigger_timestamp = td.trigger_timestamp;
588 0 : dataReq.readout_type = td.readout_type;
589 0 : dataReq.request_information = component;
590 0 : dataReq.data_destination = m_reply_connection;
591 0 : TLOG_DEBUG(TLVL_WORK_STEPS) << get_name() << ": TR " << slice_id << ": trig_timestamp "
592 0 : << dataReq.trigger_timestamp << ": SourceID " << component.component << ": window ["
593 0 : << dataReq.request_information.window_begin << ", "
594 0 : << dataReq.request_information.window_end << ']';
595 :
596 0 : dispatch_data_requests(std::move(dataReq), component.component);
597 :
598 0 : } // loop loop over component in the slice
599 :
600 0 : } // sequence loop
601 :
602 0 : return new_tr_counter;
603 : }
604 :
605 : bool
606 0 : TRBModule::dispatch_data_requests(dfmessages::DataRequest dr, const daqdataformats::SourceID& sid)
607 :
608 : {
609 :
610 : // find the queue for sourceid_req in the map
611 0 : std::unique_lock<std::mutex> lk(m_map_sourceid_connections_mutex);
612 0 : std::shared_ptr<data_req_sender_t> sender = nullptr;
613 0 : auto it_req = m_map_sourceid_connections.find(sid);
614 0 : if (it_req == m_map_sourceid_connections.end() || it_req->second == nullptr) {
615 :
616 : // if sourceid request is not valid. then print error and continue
617 0 : ers::error(
618 0 : dunedaq::dfmodules::DRSenderLookupFailed(ERS_HERE, sid, dr.run_number, dr.trigger_number, dr.sequence_number));
619 0 : ++m_invalid_requests;
620 0 : return false; // lk goes out of scope, is destroyed
621 : } else {
622 : // get the queue from map element
623 0 : sender = it_req->second;
624 : }
625 0 : lk.unlock();
626 :
627 0 : if (sender == nullptr) {
628 : // if sender lookup failed, report error and continue
629 0 : ers::error(
630 0 : dunedaq::dfmodules::DRSenderLookupFailed(ERS_HERE, sid, dr.run_number, dr.trigger_number, dr.sequence_number));
631 0 : ++m_invalid_requests;
632 0 : return false;
633 : }
634 :
635 : bool wasSentSuccessfully = false;
636 0 : do {
637 0 : TLOG_DEBUG(TLVL_DISPATCH_DATAREQ) << get_name() << ": Pushing the DataRequest from trigger/sequence number "
638 0 : << dr.trigger_number << "." << dr.sequence_number
639 0 : << " onto connection :" << sender->get_name();
640 :
641 : // send data request into the corresponding connection
642 0 : try {
643 0 : sender->send(std::move(dr), m_dreq_queue_timeout);
644 0 : wasSentSuccessfully = true;
645 0 : ++m_generated_data_requests;
646 0 : } catch (const ers::Issue& excpt) {
647 0 : std::ostringstream oss_warn;
648 0 : oss_warn << "Send to connection \"" << sender->get_name() << "\" failed";
649 0 : ers::warning(iomanager::OperationFailed(ERS_HERE, oss_warn.str(), excpt));
650 0 : }
651 0 : } while (!wasSentSuccessfully && !m_stop_requested.load());
652 :
653 : return wasSentSuccessfully;
654 0 : }
655 :
656 : bool
657 0 : TRBModule::send_trigger_record(const TriggerId& id)
658 : {
659 :
660 0 : trigger_record_ptr_t temp_record(extract_trigger_record(id));
661 :
662 : // Send to monitoring, if needed
663 :
664 0 : if (m_mon_receiver) {
665 0 : const std::lock_guard<std::mutex> lock(m_mon_mutex);
666 0 : auto it = m_mon_requests.begin();
667 0 : std::set<std::string> sent_destinations;
668 :
669 0 : while (it != m_mon_requests.end()) {
670 : // Only sent TR to each monitor once
671 0 : if (sent_destinations.count(it->data_destination)) {
672 0 : ++it;
673 0 : continue;
674 : }
675 :
676 : // send TR to mon if correct trigger type
677 0 : if ((it->trigger_type_mask & temp_record->get_header_data().trigger_type) != 0) {
678 0 : auto iom = iomanager::IOManager::get();
679 : bool wasSentSuccessfully = false;
680 0 : do {
681 0 : try {
682 : // HACK to copy the trigger record so we can send it off to monitoring
683 0 : auto trigger_record_bytes =
684 0 : serialization::serialize(temp_record, serialization::SerializationType::kMsgPack);
685 0 : trigger_record_ptr_t record_copy = serialization::deserialize<trigger_record_ptr_t>(trigger_record_bytes);
686 0 : iom->get_sender<trigger_record_ptr_t>(it->data_destination)->send(std::move(record_copy), m_tr_queue_timeout);
687 0 : ++m_trmon_sent_counter;
688 0 : wasSentSuccessfully = true;
689 0 : } catch (const ers::Issue& excpt) {
690 0 : std::ostringstream oss_warn;
691 0 : oss_warn << "Sending TR to connection \"" << it->data_destination << "\" failed";
692 0 : ers::warning(iomanager::OperationFailed(ERS_HERE, oss_warn.str(), excpt));
693 0 : }
694 0 : } while (!m_stop_requested.load() && !wasSentSuccessfully);
695 0 : sent_destinations.insert(it->data_destination);
696 0 : it = m_mon_requests.erase(it);
697 0 : } else {
698 0 : ++it;
699 : }
700 : }
701 0 : } // if m_mon_receiver
702 :
703 : bool wasSentSuccessfully = false;
704 0 : do {
705 0 : try {
706 0 : m_trigger_record_output->send(std::move(temp_record), m_tr_queue_timeout);
707 0 : wasSentSuccessfully = true;
708 0 : ++m_generated_trigger_records;
709 0 : } catch (const ers::Issue& excpt) {
710 0 : ers::warning(excpt);
711 0 : }
712 0 : } while (!m_stop_requested.load() && !wasSentSuccessfully); // push while loop
713 :
714 0 : if (!wasSentSuccessfully) {
715 0 : ++m_abandoned_trigger_records;
716 0 : m_lost_fragments += temp_record->get_fragments_ref().size();
717 0 : ers::error(dunedaq::dfmodules::AbandonedTriggerDecision(ERS_HERE, id));
718 : }
719 :
720 0 : return wasSentSuccessfully;
721 0 : }
722 :
723 : bool
724 0 : TRBModule::check_stale_requests()
725 : {
726 :
727 0 : bool book_updates = false;
728 :
729 : // -----------------------------------------------
730 : // optionally send over stale trigger records
731 : // -----------------------------------------------
732 :
733 0 : if (m_trigger_timeout.count() > 0) {
734 :
735 0 : std::vector<TriggerId> stale_triggers;
736 0 : {
737 0 : std::lock_guard<std::mutex> lk(m_trigger_records_mutex);
738 0 : for (auto it = m_trigger_records.begin(); it != m_trigger_records.end(); ++it) {
739 :
740 0 : daqdataformats::TriggerRecord& tr = *it->second.second;
741 :
742 0 : auto tr_time = clock_type::now() - it->second.first;
743 :
744 0 : if (tr_time > m_trigger_timeout) {
745 :
746 0 : ers::error(TimedOutTriggerDecision(ERS_HERE, it->first, tr.get_header_ref().get_trigger_timestamp()));
747 :
748 : // mark trigger record for seding
749 0 : stale_triggers.push_back(it->first);
750 0 : ++m_timed_out_trigger_records;
751 :
752 0 : book_updates = true;
753 : }
754 :
755 : } // trigger record loop
756 0 : }
757 : // create the trigger record and send it
758 0 : for (const auto& t : stale_triggers) {
759 0 : send_trigger_record(t);
760 : }
761 :
762 0 : } // m_trigger_timeout > 0
763 :
764 0 : return book_updates;
765 : }
766 :
767 : } // namespace dfmodules
768 : } // namespace dunedaq
769 :
770 0 : DEFINE_DUNE_DAQ_MODULE(dunedaq::dfmodules::TRBModule)
|