Line data Source code
1 : /**
2 : * @file DFOModule.cpp DFOModule class implementation
3 : *
4 : * This is part of the DUNE DAQ Software Suite, copyright 2020.
5 : * Licensing/copyright details are in the COPYING file that you should have
6 : * received with this code.
7 : */
8 :
9 : #include "DFOModule.hpp"
10 : #include "dfmodules/CommonIssues.hpp"
11 :
12 : #include "dfmodules/opmon/DFOModule.pb.h"
13 :
14 : #include "appmodel/DFOModule.hpp"
15 : #include "confmodel/Connection.hpp"
16 : #include "iomanager/IOManager.hpp"
17 : #include "logging/Logging.hpp"
18 :
19 : #include <chrono>
20 : #include <cstdlib>
21 : #include <future>
22 : #include <limits>
23 : #include <list>
24 : #include <map>
25 : #include <memory>
26 : #include <string>
27 : #include <thread>
28 : #include <utility>
29 : #include <vector>
30 :
31 : /**
32 : * @brief Name used by TRACE TLOG calls from this source file
33 : */
34 : #define TRACE_NAME "DFOModule" // NOLINT
35 : enum
36 : {
37 : TLVL_ENTER_EXIT_METHODS = 5,
38 : TLVL_CONFIG = 7,
39 : TLVL_WORK_STEPS = 10,
40 : TLVL_TRIGDEC_RECEIVED = 21,
41 : TLVL_NOTIFY_TRIGGER = 22,
42 : TLVL_DISPATCH_TO_TRB = 23,
43 : TLVL_TDTOKEN_RECEIVED = 24
44 : };
45 :
46 : namespace dunedaq::dfmodules {
47 :
48 5 : DFOModule::DFOModule(const std::string& name)
49 : : dunedaq::appfwk::DAQModule(name)
50 5 : , m_queue_timeout(100)
51 10 : , m_run_number(0)
52 : {
53 5 : register_command("conf", &DFOModule::do_conf);
54 5 : register_command("start", &DFOModule::do_start);
55 5 : register_command("drain_dataflow", &DFOModule::do_stop);
56 5 : register_command("scrap", &DFOModule::do_scrap);
57 5 : }
58 :
59 : void
60 4 : DFOModule::init(std::shared_ptr<appfwk::ConfigurationManager> mcfg)
61 : {
62 4 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering init() method";
63 :
64 4 : auto mdal = mcfg->get_dal<appmodel::DFOModule>(get_name());
65 4 : if (!mdal) {
66 0 : throw appfwk::CommandFailed(ERS_HERE, "init", get_name(), "Unable to retrieve configuration object");
67 : }
68 4 : auto iom = iomanager::IOManager::get();
69 :
70 12 : for (auto con : mdal->get_inputs()) {
71 8 : if (con->get_data_type() == datatype_to_string<dfmessages::TriggerDecisionToken>()) {
72 4 : m_token_connection = con->UID();
73 : }
74 8 : if (con->get_data_type() == datatype_to_string<dfmessages::TriggerDecision>()) {
75 4 : m_td_connection = con->UID();
76 : }
77 : }
78 12 : for (auto con : mdal->get_outputs()) {
79 8 : if (con->get_data_type() == datatype_to_string<dfmessages::TriggerInhibit>()) {
80 4 : m_busy_sender = iom->get_sender<dfmessages::TriggerInhibit>(con->UID());
81 : }
82 8 : if (con->get_data_type() == datatype_to_string<dfmessages::TriggerDecision>()) {
83 4 : m_trb_conn_ids.push_back(con->UID());
84 : }
85 : }
86 :
87 4 : if (m_token_connection == "") {
88 0 : throw appfwk::MissingConnection(
89 0 : ERS_HERE, get_name(), datatype_to_string<dfmessages::TriggerDecisionToken>(), "input");
90 : }
91 4 : if (m_td_connection == "") {
92 0 : throw appfwk::MissingConnection(ERS_HERE, get_name(), datatype_to_string<dfmessages::TriggerDecision>(), "input");
93 : }
94 4 : if (m_busy_sender == nullptr) {
95 0 : throw appfwk::MissingConnection(ERS_HERE, get_name(), datatype_to_string<dfmessages::TriggerInhibit>(), "output");
96 :
97 : }
98 :
99 4 : m_dfo_conf = mdal->get_configuration();
100 : // these are just tests to check if the connections are ok
101 4 : iom->get_receiver<dfmessages::TriggerDecisionToken>(m_token_connection);
102 4 : iom->get_receiver<dfmessages::TriggerDecision>(m_td_connection);
103 :
104 4 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting init() method";
105 4 : }
106 :
107 : void
108 3 : DFOModule::do_conf(const CommandData_t&)
109 : {
110 3 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering do_conf() method";
111 :
112 3 : m_queue_timeout = std::chrono::milliseconds(m_dfo_conf->get_general_queue_timeout_ms());
113 3 : m_stop_timeout = std::chrono::milliseconds(m_dfo_conf->get_stop_timeout_ms());
114 3 : m_busy_threshold = m_dfo_conf->get_busy_threshold();
115 3 : m_free_threshold = m_dfo_conf->get_free_threshold();
116 :
117 3 : m_td_send_retries = m_dfo_conf->get_td_send_retries();
118 :
119 3 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting do_conf() method, there are "
120 3 : << m_dataflow_availability.size() << " TRB apps defined";
121 3 : }
122 :
123 : void
124 3 : DFOModule::do_start(const CommandData_t& payload)
125 : {
126 3 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering do_start() method";
127 :
128 3 : m_received_tokens = 0;
129 3 : m_run_number = payload.value<dunedaq::daqdataformats::run_number_t>("run", 0);
130 :
131 3 : m_running_status.store(true);
132 3 : m_last_notified_busy.store(false);
133 3 : m_last_assignement_it = m_dataflow_availability.end();
134 :
135 3 : m_last_token_received = m_last_td_received = std::chrono::steady_clock::now();
136 :
137 : // 19-Dec-2024, KAB: check that TriggerDecision senders are ready to send. This is done
138 : // so that the IOManager infrastructure fetches the necessary connection details from
139 : // the ConnectivityService at 'start' time, instead of the first time that the sender
140 : // is used to send a message. This avoids delays in the sending of the first TD in
141 : // the first data-taking run in a DAQ session. Such delays can lead to undesirable
142 : // system behavior like trigger inhibits.
143 3 : auto iom = iomanager::IOManager::get();
144 3 : if (m_busy_sender != nullptr) {
145 3 : bool is_ready = m_busy_sender->is_ready_for_sending(std::chrono::milliseconds(100));
146 3 : TLOG_DEBUG(0) << "The sender for TriggerInhibit messages " << (is_ready ? "is" : "is not") << " ready.";
147 : }
148 6 : for (auto trb_conn : m_trb_conn_ids) {
149 3 : auto sender = iom->get_sender<dfmessages::TriggerDecision>(trb_conn);
150 3 : if (sender != nullptr) {
151 3 : bool is_ready = sender->is_ready_for_sending(std::chrono::milliseconds(100));
152 3 : TLOG_DEBUG(0) << "The TriggerDecision sender for " << trb_conn << " " << (is_ready ? "is" : "is not") << " ready.";
153 : }
154 3 : }
155 3 : iom->add_callback<dfmessages::TriggerDecisionToken>(
156 3 : m_token_connection, std::bind(&DFOModule::receive_trigger_complete_token, this, std::placeholders::_1));
157 :
158 3 : iom->add_callback<dfmessages::TriggerDecision>(
159 3 : m_td_connection, std::bind(&DFOModule::receive_trigger_decision, this, std::placeholders::_1));
160 :
161 3 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting do_start() method";
162 3 : }
163 :
164 : void
165 3 : DFOModule::do_stop(const CommandData_t& /*args*/)
166 : {
167 3 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering do_stop() method";
168 :
169 3 : m_running_status.store(false);
170 :
171 3 : auto iom = iomanager::IOManager::get();
172 3 : iom->remove_callback<dfmessages::TriggerDecision>(m_td_connection);
173 :
174 3 : const int wait_steps = 20;
175 3 : auto step_timeout = m_stop_timeout / wait_steps;
176 3 : int step_counter = 0;
177 3 : while (!is_empty() && step_counter < wait_steps) {
178 0 : TLOG() << get_name() << ": stop delayed while waiting for " << used_slots() << " TDs to completed";
179 0 : std::this_thread::sleep_for(step_timeout);
180 0 : ++step_counter;
181 : }
182 :
183 3 : iom->remove_callback<dfmessages::TriggerDecisionToken>(m_token_connection);
184 :
185 3 : std::list<std::shared_ptr<AssignedTriggerDecision>> remnants;
186 5 : for (auto& app : m_dataflow_availability) {
187 2 : auto temp = app.second->flush();
188 2 : for (auto& td : temp) {
189 0 : remnants.push_back(td);
190 : }
191 2 : }
192 :
193 3 : for (auto& r : remnants) {
194 0 : ers::error(IncompleteTriggerDecision(ERS_HERE, r->decision.trigger_number, m_run_number));
195 : }
196 :
197 3 : std::lock_guard<std::mutex> guard(m_trigger_counters_mutex);
198 3 : m_trigger_counters.clear();
199 :
200 6 : TLOG() << get_name() << " successfully stopped";
201 3 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting do_stop() method";
202 3 : }
203 :
204 : void
205 3 : DFOModule::do_scrap(const CommandData_t& /*args*/)
206 : {
207 3 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering do_scrap() method";
208 :
209 3 : m_dataflow_availability.clear();
210 :
211 6 : TLOG() << get_name() << " successfully scrapped";
212 3 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting do_scrap() method";
213 3 : }
214 :
215 : void
216 5 : DFOModule::receive_trigger_decision(const dfmessages::TriggerDecision& decision)
217 : {
218 5 : TLOG_DEBUG(TLVL_TRIGDEC_RECEIVED) << get_name() << " Received TriggerDecision for trigger_number "
219 0 : << decision.trigger_number << " and run " << decision.run_number
220 5 : << " (current run is " << m_run_number << ")";
221 5 : if (decision.run_number != m_run_number) {
222 1 : ers::error(DFOModuleRunNumberMismatch(
223 2 : ERS_HERE, decision.run_number, m_run_number, "MLT", decision.trigger_number));
224 1 : return;
225 : }
226 :
227 4 : auto decision_received = std::chrono::steady_clock::now();
228 4 : ++m_received_decisions;
229 4 : auto trigger_types = unpack_types(decision.trigger_type);
230 8 : for ( const auto t : trigger_types ) {
231 4 : ++get_trigger_counter(t).received;
232 : }
233 :
234 4 : std::chrono::steady_clock::time_point decision_assigned;
235 4 : do {
236 :
237 4 : auto assignment = find_slot(decision);
238 :
239 4 : if (assignment == nullptr) { // this can happen if all application are in error state
240 0 : ers::error(UnableToAssign(ERS_HERE, decision.trigger_number));
241 0 : usleep(500);
242 0 : notify_trigger_if_needed();
243 0 : continue;
244 : }
245 :
246 4 : TLOG_DEBUG(TLVL_TRIGDEC_RECEIVED) << get_name() << " Slot found for trigger_number " << decision.trigger_number
247 0 : << " on connection " << assignment->connection_name
248 4 : << ", number of used slots is " << used_slots();
249 4 : decision_assigned = std::chrono::steady_clock::now();
250 4 : auto dispatch_successful = dispatch(assignment);
251 :
252 4 : if (dispatch_successful) {
253 3 : assign_trigger_decision(assignment);
254 3 : TLOG_DEBUG(TLVL_TRIGDEC_RECEIVED) << get_name() << " Assigned trigger_number " << decision.trigger_number
255 3 : << " to connection " << assignment->connection_name;
256 3 : break;
257 : } else {
258 2 : ers::error(
259 2 : TRBModuleAppUpdate(ERS_HERE, assignment->connection_name, "Could not send Trigger Decision"));
260 1 : m_dataflow_availability[assignment->connection_name]->set_in_error(true);
261 : }
262 :
263 5 : } while (m_running_status.load());
264 :
265 4 : notify_trigger_if_needed();
266 :
267 4 : m_waiting_for_decision +=
268 4 : std::chrono::duration_cast<std::chrono::microseconds>(decision_received - m_last_td_received).count();
269 4 : m_last_td_received = std::chrono::steady_clock::now();
270 4 : m_deciding_destination +=
271 4 : std::chrono::duration_cast<std::chrono::microseconds>(decision_assigned - decision_received).count();
272 4 : m_forwarding_decision +=
273 4 : std::chrono::duration_cast<std::chrono::microseconds>(m_last_td_received - decision_assigned).count();
274 4 : }
275 :
276 : std::shared_ptr<AssignedTriggerDecision>
277 4 : DFOModule::find_slot(const dfmessages::TriggerDecision& decision)
278 : {
279 :
280 : // this find_slot assings the decision with a round-robin logic
281 : // across all the available applications.
282 : // Applications in error are skipped.
283 : // we only probe the applications once.
284 : // if they are all unavailable the assignment is set to
285 : // the application with the lowest used slots
286 : // returning a nullptr will be considered as an error
287 : // from the upper level code
288 :
289 4 : std::shared_ptr<AssignedTriggerDecision> output = nullptr;
290 4 : auto minimum_occupied = m_dataflow_availability.end();
291 4 : size_t minimum = std::numeric_limits<size_t>::max();
292 4 : unsigned int counter = 0;
293 :
294 4 : auto candidate_it = m_last_assignement_it;
295 4 : if (candidate_it == m_dataflow_availability.end())
296 2 : candidate_it = m_dataflow_availability.begin();
297 :
298 8 : while (output == nullptr && counter < m_dataflow_availability.size()) {
299 :
300 4 : ++counter;
301 4 : ++candidate_it;
302 4 : if (candidate_it == m_dataflow_availability.end())
303 4 : candidate_it = m_dataflow_availability.begin();
304 :
305 : // get rid of the applications in error state
306 4 : if (candidate_it->second->is_in_error()) {
307 0 : continue;
308 : }
309 :
310 : // monitor
311 4 : auto slots = candidate_it->second->used_slots();
312 4 : if (slots < minimum) {
313 4 : minimum = slots;
314 4 : minimum_occupied = candidate_it;
315 : }
316 :
317 4 : if (candidate_it->second->is_busy())
318 1 : continue;
319 :
320 3 : output = candidate_it->second->make_assignment(decision);
321 3 : m_last_assignement_it = candidate_it;
322 : }
323 :
324 4 : if (!output) {
325 : // in this case all applications were busy
326 : // so we assign the decision to that with the lowest
327 : // number of assignments
328 1 : if (minimum_occupied != m_dataflow_availability.end()) {
329 1 : output = minimum_occupied->second->make_assignment(decision);
330 1 : m_last_assignement_it = minimum_occupied;
331 1 : ers::warning(AssignedToBusyApp(ERS_HERE, decision.trigger_number, minimum_occupied->first, minimum));
332 : }
333 : }
334 :
335 4 : if (output != nullptr) {
336 4 : TLOG_DEBUG(TLVL_WORK_STEPS) << "Assigned TriggerDecision with trigger number " << decision.trigger_number
337 4 : << " to TRB at connection " << output->connection_name;
338 : }
339 4 : return output;
340 0 : }
341 :
342 : void
343 6 : DFOModule::generate_opmon_data()
344 : {
345 :
346 6 : opmon::DFOInfo info;
347 6 : info.set_tokens_received( m_received_tokens.exchange(0) );
348 6 : info.set_decisions_sent(m_sent_decisions.exchange(0));
349 6 : info.set_decisions_received(m_received_decisions.exchange(0));
350 6 : info.set_waiting_for_decision(m_waiting_for_decision.exchange(0));
351 6 : info.set_deciding_destination(m_deciding_destination.exchange(0));
352 6 : info.set_forwarding_decision(m_forwarding_decision.exchange(0));
353 6 : info.set_waiting_for_token(m_waiting_for_token.exchange(0));
354 6 : info.set_processing_token(m_processing_token.exchange(0));
355 6 : publish( std::move(info) );
356 :
357 6 : std::lock_guard<std::mutex> guard(m_trigger_counters_mutex);
358 9 : for ( auto & [type, counts] : m_trigger_counters ) {
359 3 : opmon::TriggerInfo ti;
360 3 : ti.set_received(counts.received.exchange(0));
361 3 : ti.set_completed(counts.completed.exchange(0));
362 3 : auto name = dunedaq::trgdataformats::get_trigger_candidate_type_names()[type];
363 6 : publish( std::move(ti), {{"type", name}} );
364 3 : }
365 9 : }
366 :
367 : void
368 9 : DFOModule::receive_trigger_complete_token(const dfmessages::TriggerDecisionToken& token)
369 : {
370 9 : if (token.run_number == 0 && token.trigger_number == 0) {
371 2 : if (m_dataflow_availability.count(token.decision_destination) == 0) {
372 2 : TLOG_DEBUG(TLVL_CONFIG) << "Creating dataflow availability struct for uid " << token.decision_destination;
373 2 : auto entry = m_dataflow_availability[token.decision_destination] =
374 4 : std::make_shared<TriggerRecordBuilderData>(token.decision_destination, m_busy_threshold, m_free_threshold);
375 2 : register_node(token.decision_destination, entry);
376 2 : } else {
377 0 : TLOG() << TRBModuleAppUpdate(ERS_HERE, token.decision_destination, "Has reconnected");
378 0 : auto app_it = m_dataflow_availability.find(token.decision_destination);
379 0 : app_it->second->set_in_error(false);
380 : }
381 5 : return;
382 : }
383 :
384 7 : TLOG_DEBUG(TLVL_TDTOKEN_RECEIVED) << get_name() << " Received TriggerDecisionToken for trigger_number "
385 0 : << token.trigger_number << " and run " << token.run_number
386 7 : << " (current run is " << m_run_number << ")";
387 : // add a check to see if the application data found
388 7 : if (token.run_number != m_run_number) {
389 2 : std::ostringstream oss_source;
390 2 : oss_source << "TRB at connection " << token.decision_destination;
391 2 : ers::error(DFOModuleRunNumberMismatch(
392 4 : ERS_HERE, token.run_number, m_run_number, oss_source.str(), token.trigger_number));
393 2 : return;
394 2 : }
395 :
396 5 : auto app_it = m_dataflow_availability.find(token.decision_destination);
397 : // check if application data exists;
398 5 : if (app_it == m_dataflow_availability.end()) {
399 1 : ers::error(UnknownTokenSource(ERS_HERE, token.decision_destination));
400 1 : return;
401 : }
402 :
403 4 : ++m_received_tokens;
404 4 : auto callback_start = std::chrono::steady_clock::now();
405 :
406 4 : try {
407 4 : auto dec_ptr = app_it->second->complete_assignment(token.trigger_number, m_metadata_function);
408 3 : auto trigger_types = unpack_types(dec_ptr->decision.trigger_type);
409 6 : for ( const auto t : trigger_types ) ++ get_trigger_counter(t).completed;
410 4 : } catch (AssignedTriggerDecisionNotFound const& err) {
411 1 : ers::error(err);
412 1 : }
413 :
414 4 : if (app_it->second->is_in_error()) {
415 0 : TLOG() << TRBModuleAppUpdate(ERS_HERE, token.decision_destination, "Has reconnected");
416 0 : app_it->second->set_in_error(false);
417 : }
418 :
419 4 : notify_trigger_if_needed();
420 :
421 4 : m_waiting_for_token +=
422 4 : std::chrono::duration_cast<std::chrono::microseconds>(callback_start - m_last_token_received).count();
423 4 : m_last_token_received = std::chrono::steady_clock::now();
424 4 : m_processing_token +=
425 4 : std::chrono::duration_cast<std::chrono::microseconds>(m_last_token_received - callback_start).count();
426 : }
427 :
428 : bool
429 8 : DFOModule::is_busy() const
430 : {
431 12 : for (auto& dfapp : m_dataflow_availability) {
432 8 : if (!dfapp.second->is_busy())
433 4 : return false;
434 : }
435 4 : return true;
436 : }
437 :
438 : bool
439 3 : DFOModule::is_empty() const
440 : {
441 5 : for (auto& dfapp : m_dataflow_availability) {
442 2 : if (dfapp.second->used_slots() != 0)
443 0 : return false;
444 : }
445 3 : return true;
446 : }
447 :
448 : size_t
449 0 : DFOModule::used_slots() const
450 : {
451 0 : size_t total = 0;
452 0 : for (auto& dfapp : m_dataflow_availability) {
453 0 : total += dfapp.second->used_slots();
454 : }
455 0 : return total;
456 : }
457 :
458 : void
459 8 : DFOModule::notify_trigger_if_needed() const
460 : {
461 : // 19-Dec-2024, KAB, ELF, MaR: combined the is_busy() and notify_trigger() calls in
462 : // a single method (notify_trigger_if_needed), and protected the contents of the new
463 : // method with a mutex, to avoid a race condition in which a given is_busy() result
464 : // is determined, but by the time that the value is sent to the MLT, the busy state
465 : // has changed.
466 8 : std::lock_guard<std::mutex> guard(m_notify_trigger_mutex);
467 :
468 8 : bool busy = is_busy();
469 8 : if (busy == m_last_notified_busy.load())
470 5 : return;
471 :
472 : bool wasSentSuccessfully = false;
473 :
474 3 : do {
475 3 : try {
476 3 : dfmessages::TriggerInhibit message{ busy, m_run_number };
477 3 : m_busy_sender->send(std::move(message), m_queue_timeout);
478 2 : wasSentSuccessfully = true;
479 2 : TLOG_DEBUG(TLVL_NOTIFY_TRIGGER) << get_name() << " Sent BUSY status " << busy << " to trigger in run "
480 2 : << m_run_number;
481 1 : } catch (const ers::Issue& excpt) {
482 1 : std::ostringstream oss_warn;
483 1 : oss_warn << "Send with sender \"" << m_busy_sender->get_name() << "\" failed";
484 1 : ers::warning(iomanager::OperationFailed(ERS_HERE, oss_warn.str(), excpt));
485 1 : }
486 :
487 5 : } while (!wasSentSuccessfully && m_running_status.load());
488 :
489 3 : m_last_notified_busy.store(busy);
490 8 : }
491 :
492 : bool
493 4 : DFOModule::dispatch(const std::shared_ptr<AssignedTriggerDecision>& assignment)
494 : {
495 :
496 4 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering dispatch() method. assignment->connection_name: "
497 4 : << assignment->connection_name;
498 :
499 4 : bool wasSentSuccessfully = false;
500 4 : int retries = m_td_send_retries;
501 4 : auto iom = iomanager::IOManager::get();
502 4 : do {
503 :
504 4 : try {
505 4 : auto decision_copy = dfmessages::TriggerDecision(assignment->decision);
506 8 : iom->get_sender<dfmessages::TriggerDecision>(assignment->connection_name)
507 4 : ->send(std::move(decision_copy), m_queue_timeout);
508 3 : wasSentSuccessfully = true;
509 3 : ++m_sent_decisions;
510 3 : TLOG_DEBUG(TLVL_DISPATCH_TO_TRB) << get_name() << " Sent TriggerDecision for trigger_number "
511 0 : << decision_copy.trigger_number << " to TRB at connection "
512 3 : << assignment->connection_name << " for run number " << decision_copy.run_number;
513 5 : } catch (const ers::Issue& excpt) {
514 1 : std::ostringstream oss_warn;
515 1 : oss_warn << "Send to connection \"" << assignment->connection_name << "\" failed";
516 1 : ers::warning(iomanager::OperationFailed(ERS_HERE, oss_warn.str(), excpt));
517 1 : }
518 :
519 4 : retries--;
520 :
521 4 : } while (!wasSentSuccessfully && m_running_status.load() && retries > 0);
522 :
523 4 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting dispatch() method";
524 4 : return wasSentSuccessfully;
525 4 : }
526 :
527 : void
528 3 : DFOModule::assign_trigger_decision(const std::shared_ptr<AssignedTriggerDecision>& assignment)
529 : {
530 3 : m_dataflow_availability[assignment->connection_name]->add_assignment(assignment);
531 3 : }
532 :
533 : } // namespace dunedaq::dfmodules
534 :
535 5 : DEFINE_DUNE_DAQ_MODULE(dunedaq::dfmodules::DFOModule)
|