DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
TriggerGenericMaker.hpp
Go to the documentation of this file.
9#ifndef TRIGGER_SRC_TRIGGER_TRIGGERGENERICMAKER_HPP_
10#define TRIGGER_SRC_TRIGGER_TRIGGERGENERICMAKER_HPP_
11
12#include "trigger/Issues.hpp"
13#include "trigger/Set.hpp"
14#include "trigger/TimeSliceInputBuffer.hpp"
15#include "trigger/TimeSliceOutputBuffer.hpp"
16#include "trigger/triggergenericmakerinfo/InfoNljs.hpp"
17
18#include "appfwk/DAQModule.hpp"
20#include "dfmessages/Types.hpp"
24#include "iomanager/Sender.hpp"
25#include "logging/Logging.hpp"
27
28#include <algorithm>
29#include <memory>
30#include <string>
31#include <vector>
32
33namespace dunedaq::trigger {
34
35// Forward declare the class encapsulating partial specifications of do_work
36template<class IN, class OUT, class MAKER>
38
39// This template class reads IN items from queues, passes them to MAKER objects,
40// and writes the resulting OUT objects to another queue. The behavior of
41// passing IN objects to the MAKER and creating OUT objects from the MAKER is
42// encapsulated by TriggerGenericWorker<IN,OUT,MAKER> templates, defined later
43// in this file
44template<class IN, class OUT, class MAKER>
45class TriggerGenericMaker : public dunedaq::appfwk::DAQModule
46{
47 friend class TriggerGenericWorker<IN, OUT, MAKER>;
48
49public:
50 explicit TriggerGenericMaker(const std::string& name)
51 : DAQModule(name)
52 , m_thread(std::bind(&TriggerGenericMaker::do_work, this, std::placeholders::_1))
54 , m_sent_count(0)
55 , m_run_number(0)
56 , m_input_queue(nullptr)
57 , m_output_queue(nullptr)
58 , m_queue_timeout(100)
59 , m_algorithm_name("[uninitialized]")
60 , m_sourceid(dunedaq::daqdataformats::SourceID::s_invalid_id)
61 , m_buffer_time(0)
62 , m_window_time(625000)
63 , worker(*this) // should be last; may use other members
64 {
65 register_command("start", &TriggerGenericMaker::do_start);
66 register_command("stop", &TriggerGenericMaker::do_stop);
67 register_command("conf", &TriggerGenericMaker::do_configure);
68 register_command("scrap", &TriggerGenericMaker::do_scrap);
69 }
70
72
77
78 //void init(const nlohmann::json& obj) override
79 //{
80 // // TODO: Reimplement as OKS
81 // //m_input_queue = get_iom_receiver<IN>(appfwk::connection_uid(obj, "input"));
82 // //m_output_queue = get_iom_sender<OUT>(appfwk::connection_uid(obj, "output"));
83 //}
84
85 void get_info(opmonlib::InfoCollector& ci, int /*level*/) override
86 {
87 triggergenericmakerinfo::Info i;
88
89 i.received_count = m_received_count.load();
90 i.sent_count = m_sent_count.load();
91 if (m_maker) { i.data_vs_system_ms = m_maker->m_data_vs_system_time; }
92 else i.data_vs_system_ms = 0;
93
94 ci.add(i);
95 }
96
97protected:
98 void set_algorithm_name(const std::string& name) { m_algorithm_name = name; }
99
100 // Only applies to makers that output Set<B>
101 void set_sourceid(uint32_t element_id) // NOLINT(build/unsigned)
102 {
103 m_sourceid = element_id;
104 }
105
106 // Only applies to makers that output Set<B>
108 {
109 m_window_time = window_time;
110 m_buffer_time = buffer_time;
111 }
112
113private:
114 dunedaq::utilities::WorkerThread m_thread;
115
116 using metric_counter_type = decltype(triggergenericmakerinfo::Info::received_count);
117 std::atomic<metric_counter_type> m_received_count;
118 std::atomic<metric_counter_type> m_sent_count;
120
122 std::shared_ptr<source_t> m_input_queue;
123
125 std::shared_ptr<sink_t> m_output_queue;
126
127 std::chrono::milliseconds m_queue_timeout;
128
129 std::string m_algorithm_name;
130
131 uint32_t m_sourceid; // NOLINT(build/unsigned)
132
135
136 std::unique_ptr<MAKER> m_maker;
137 nlohmann::json m_maker_conf;
138
140
141 // This should return a unique_ptr to the MAKER created from conf command arguments.
142 // Should also call set_algorithm_name and set_geoid/set_windowing (if desired)
143 virtual std::unique_ptr<MAKER> make_maker(const nlohmann::json& obj) = 0;
144
145 void do_start(const nlohmann::json& startobj)
146 {
148 m_sent_count = 0;
150 worker.reconfigure();
151 m_thread.start_working_thread(get_name());
152 m_run_number = startobj.value<dunedaq::daqdataformats::run_number_t>("run", 0);
153 }
154
155 void do_stop(const nlohmann::json& /*obj*/)
156 {
157 m_thread.stop_working_thread();
158 }
159
160 void do_configure(const nlohmann::json& obj)
161 {
162 // P. Rodrigues 2022-07-13
163 // We stash the config here and don't actually create the maker
164 // algorithm until start time, so that the algorithm doesn't
165 // persist between runs and hold onto its state from the previous
166 // run
167 m_maker_conf = obj;
168
169 // worker should be notified that configuration potentially changed
170 worker.reconfigure();
171 }
172
173 void do_scrap(const nlohmann::json& obj)
174 {
175 m_input_queue.reset();
176 m_output_queue.reset();
177 m_maker.reset();
178 m_maker_conf.clear();
179 }
180
181 void do_work(std::atomic<bool>& m_running_flag)
182 {
183 // Loop until a stop is received
184 while (m_running_flag.load()) {
185 // While there are items in the input queue, continue draining even if
186 // the running_flag is false, but stop _immediately_ when input is empty
187 IN in;
188 while (receive(in)) {
189 if (m_running_flag.load()) {
190 worker.process(in);
191 }
192 }
193 }
194 // P. Rodrigues 2022-06-01. The argument here is whether to drop
195 // buffered outputs. We choose 'true' because some significant
196 // time can pass between the last input sent by readout and when
197 // we receive a stop. (This happens because stop is sent serially
198 // to readout units before trigger, and each RU takes ~1s to
199 // stop). So by the time we receive a stop command, our buffered
200 // outputs are stale and will cause tardy warnings from the zipper
201 // downstream
202 worker.drain(true);
203 TLOG() << get_name() << ": Exiting do_work() method for run " << m_run_number << ", received " << m_received_count
204 << " inputs (" << worker.get_low_level_input_count() << " sub-inputs) and successfully sent " << m_sent_count
205 << " outputs. ";
206 worker.reset();
207 }
208
209 bool receive(IN& in)
210 {
211 try {
212 in = m_input_queue->receive(m_queue_timeout);
213 } catch (const dunedaq::iomanager::TimeoutExpired& excpt) {
214 // it is perfectly reasonable that there might be no data in the queue
215 // some fraction of the times that we check, so we just continue on and try again
216 return false;
217 }
219 return true;
220 }
221
222 bool send(OUT&& out)
223 {
224 try {
225 m_output_queue->send(std::move(out), m_queue_timeout);
226 } catch (const dunedaq::iomanager::TimeoutExpired& excpt) {
227 ers::warning(excpt);
228 return false;
229 }
230 ++m_sent_count;
231 return true;
232 }
233};
234
235// To handle the different unpacking schemes implied by different templates,
236// do_work is broken out into its own template class that is a friend of
237// TriggerGenericMaker. C++ still does not support partial specification of a
238// single method in a template class, so this approach is the least redundant
239// way to achieve that functionality
240
241// The base template assumes the MAKER has an operator() with the signature
242// operator()(IN, std::vector<OUT>)
243template<class IN, class OUT, class MAKER>
245{
246public:
250
252
253 void reconfigure() {}
254
255 void reset() {
257 }
258
259 void process(IN& in)
260 {
262 std::vector<OUT> out_vec; // one input -> many outputs
263 try {
264 m_parent.m_maker->operator()(in, out_vec);
265 } catch (...) { // NOLINT TODO Benjamin Land <BenLand100@github.com> May 28-2021 can we restrict the possible
266 // exceptions triggeralgs might raise?
267 ers::fatal(AlgorithmFatalError(ERS_HERE, m_parent.get_name(), m_parent.m_algorithm_name));
268 return;
269 }
270
271 while (out_vec.size()) {
272 if (!m_parent.send(std::move(out_vec.back()))) {
273 ers::error(AlgorithmFailedToSend(ERS_HERE, m_parent.get_name(), m_parent.m_algorithm_name));
274 // out_vec.back() is dropped
275 }
276 out_vec.pop_back();
277 }
278 }
279
280 void drain(bool) {}
281
284};
285
286// Partial specialization for IN = Set<A>, OUT = Set<B> and assumes the MAKER has:
287// operator()(A, std::vector<B>)
288template<class A, class B, class MAKER>
289class TriggerGenericWorker<Set<A>, Set<B>, MAKER>
290{
291public: // NOLINT
293 : m_parent(parent)
294 , m_in_buffer(parent.get_name(), parent.m_algorithm_name)
295 , m_out_buffer(parent.get_name(), parent.m_algorithm_name, parent.m_buffer_time)
297 {}
298
300
301 TimeSliceInputBuffer<A> m_in_buffer;
302 TimeSliceOutputBuffer<B> m_out_buffer;
303
304 daqdataformats::timestamp_t m_prev_start_time = 0;
305
307 {
308 m_out_buffer.set_window_time(m_parent.m_window_time);
309 m_out_buffer.set_buffer_time(m_parent.m_buffer_time);
310 }
311
312 void reset()
313 {
314 m_prev_start_time = 0;
315 m_out_buffer.reset();
317 }
318
319 void process_slice(const std::vector<A>& time_slice, std::vector<B>& out_vec)
320 {
321 // time_slice is a full slice (all Set<A> combined), time ordered, vector of A
322 // call operator for each of the objects in the vector
323 for (const A& x : time_slice) {
324 try {
325 m_parent.m_maker->operator()(x, out_vec);
326 } catch (...) { // NOLINT TODO Benjamin Land <BenLand100@github.com> May 28-2021 can we restrict the possible
327 // exceptions triggeralgs might raise?
328 ers::fatal(AlgorithmFatalError(ERS_HERE, m_parent.get_name(), m_parent.m_algorithm_name));
329 return;
330 }
331 }
332 }
333
334 void process(Set<A>& in)
335 {
336 std::vector<B> elems; // Bs to buffer for the next window
337 switch (in.type) {
339 if (m_prev_start_time != 0 && in.start_time < m_prev_start_time) {
340 ers::warning(OutOfOrderSets(ERS_HERE, m_parent.get_name(), m_prev_start_time, in.start_time));
341 }
342 m_prev_start_time = in.start_time;
343 std::vector<A> time_slice;
345 if (!m_in_buffer.buffer(in, time_slice, start_time, end_time)) {
346 return; // no complete time slice yet (`in` was part of buffered slice)
347 }
348 m_low_level_input_count += time_slice.size();
349 process_slice(time_slice, elems);
350 } break;
352 // PAR 2022-04-27 We've got a heartbeat for time T, so we know
353 // we won't receive any more inputs for times t < T. Therefore
354 // we can flush all items in the input buffer, which have
355 // times t < T, because the input is time-ordered. We put the
356 // heartbeat in the output buffer, which will handle it
357 // appropriately
358
359 std::vector<A> time_slice;
361 if (m_in_buffer.flush(time_slice, start_time, end_time)) {
362 if (end_time > in.start_time) {
363 // This should never happen, but we check here so we at least get some output if it did
364 ers::fatal(OutOfOrderSets(ERS_HERE, m_parent.get_name(), end_time, in.start_time));
365 }
366 m_low_level_input_count += time_slice.size();
367 process_slice(time_slice, elems);
368 }
369
370 Set<B> heartbeat;
371 heartbeat.type = Set<B>::Type::kHeartbeat;
372 heartbeat.start_time = in.start_time;
373 heartbeat.end_time = in.end_time;
376
377 TLOG_DEBUG(4) << "Buffering heartbeat with start time " << heartbeat.start_time;
378 m_out_buffer.buffer_heartbeat(heartbeat);
379
380 // flush the maker
381 try {
382 // TODO Benjamin Land <BenLand100@github.com> July-14-2021 flushed events go into the buffer... until a window
383 // is ready?
384 m_parent.m_maker->flush(in.end_time, elems);
385 } catch (...) { // NOLINT TODO Benjamin Land <BenLand100@github.com> May-28-2021 can we restrict the possible
386 // exceptions triggeralgs might raise?
387 ers::fatal(AlgorithmFatalError(ERS_HERE, m_parent.get_name(), m_parent.m_algorithm_name));
388 return;
389 }
390 } break;
392 ers::error(UnknownSetError(ERS_HERE, m_parent.get_name(), m_parent.m_algorithm_name));
393 break;
394 }
395
396 // add new elements to output buffer
397 if (elems.size() > 0) {
398 m_out_buffer.buffer(elems);
399 }
400
401 size_t n_output_windows=0;
402 // emit completed windows
403 while (m_out_buffer.ready()) {
404 ++n_output_windows;
405 Set<B> out;
406 m_out_buffer.flush(out);
407 out.seqno = m_parent.m_sent_count;
410
411 if (out.type == Set<B>::Type::kHeartbeat) {
412 TLOG_DEBUG(4) << "Sending heartbeat with start time " << out.start_time;
413 if (!m_parent.send(std::move(out))) {
414 ers::error(AlgorithmFailedToSend(ERS_HERE, m_parent.get_name(), m_parent.m_algorithm_name));
415 // out is dropped
416 }
417 }
418 // Only form and send Set<B> if it has a nonzero number of objects
419 else if (out.type == Set<B>::Type::kPayload && out.objects.size() != 0) {
420 TLOG_DEBUG(4) << "Output set window ready with start time " << out.start_time << " end time " << out.end_time
421 << " and " << out.objects.size() << " members";
422 if (!m_parent.send(std::move(out))) {
423 ers::error(AlgorithmFailedToSend(ERS_HERE, m_parent.get_name(), m_parent.m_algorithm_name));
424 // out is dropped
425 }
426 }
427 }
428 TLOG_DEBUG(4) << "process() done. Advanced output buffer by " << n_output_windows << " output windows";
429 }
430
431 void drain(bool drop)
432 {
433 // First, send anything in the input buffer to the algorithm, and add any
434 // results to output buffer
435 std::vector<A> time_slice;
437 if (m_in_buffer.flush(time_slice, start_time, end_time)) {
438 std::vector<B> elems;
439 m_low_level_input_count += time_slice.size();
440 process_slice(time_slice, elems);
441 if (elems.size() > 0) {
442 m_out_buffer.buffer(elems);
443 }
444 }
445 // Second, drain the output buffer onto the queue. These may not be "fully
446 // formed" windows, but at this point we're getting no more data anyway.
447 while (!m_out_buffer.empty()) {
448 Set<B> out;
449 m_out_buffer.flush(out);
450 out.seqno = m_parent.m_sent_count;
453
454 if (out.type == Set<B>::Type::kHeartbeat) {
455 if(!drop) {
456 if (!m_parent.send(std::move(out))) {
457 ers::error(AlgorithmFailedToSend(ERS_HERE, m_parent.get_name(), m_parent.m_algorithm_name));
458 // out is dropped
459 }
460 }
461 }
462 // Only form and send Set<B> if it has a nonzero number of objects
463 else if (out.type == Set<B>::Type::kPayload && out.objects.size() != 0) {
464 TLOG_DEBUG(1) << "Output set window ready with start time " << out.start_time << " end time " << out.end_time
465 << " and " << out.objects.size() << " members";
466 if (!drop) {
467 if (!m_parent.send(std::move(out))) {
468 ers::error(AlgorithmFailedToSend(ERS_HERE, m_parent.get_name(), m_parent.m_algorithm_name));
469 // out is dropped
470 }
471 }
472 }
473 }
474 }
475
478};
479
480// Partial specialization for IN = Set<A> and assumes the the MAKER has:
481// operator()(A, std::vector<OUT>)
482template<class A, class OUT, class MAKER>
483class TriggerGenericWorker<Set<A>, OUT, MAKER>
484{
485public: // NOLINT
486 explicit TriggerGenericWorker(TriggerGenericMaker<Set<A>, OUT, MAKER>& parent)
487 : m_parent(parent)
488 , m_in_buffer(parent.get_name(), parent.m_algorithm_name)
490 {}
491
493
494 TimeSliceInputBuffer<A> m_in_buffer;
495
496 void reconfigure() {}
497
498 void reset() {
500 }
501
502 void process_slice(const std::vector<A>& time_slice, std::vector<OUT>& out_vec)
503 {
504 // time_slice is a full slice (all Set<A> combined), time ordered, vector of A
505 // call operator for each of the objects in the vector
506 for (const A& x : time_slice) {
507 try {
508 m_parent.m_maker->operator()(x, out_vec);
509 } catch (...) { // NOLINT TODO Benjamin Land <BenLand100@github.com> May 28-2021 can we restrict the possible
510 // exceptions triggeralgs might raise?
511 ers::fatal(AlgorithmFatalError(ERS_HERE, m_parent.get_name(), m_parent.m_algorithm_name));
512 return;
513 }
514 }
515 }
516
517 void process(Set<A>& in)
518 {
519 std::vector<OUT> out_vec; // either a whole time slice, heartbeat flushed, or empty
520 switch (in.type) {
522 std::vector<A> time_slice;
524 if (!m_in_buffer.buffer(in, time_slice, start_time, end_time)) {
525 return; // no complete time slice yet (`in` was part of buffered slice)
526 }
527 m_low_level_input_count += time_slice.size();
528 process_slice(time_slice, out_vec);
529 } break;
531 // TODO BJL May-28-2021 should anything happen with the heartbeat when OUT is not a Set<T>?
532 //
533 // PAR 2022-01-21 We've got a heartbeat for time T, so we know
534 // we won't receive any more inputs for times t < T. Therefore
535 // we can flush all items in the input buffer, which have
536 // times t < T, because the input is time-ordered
537 try {
538 std::vector<A> time_slice;
540 if (m_in_buffer.flush(time_slice, start_time, end_time)) {
541 if (end_time > in.start_time) {
542 // This should never happen, but we check here so we at least get some output if it did
543 ers::fatal(OutOfOrderSets(ERS_HERE, m_parent.get_name(), end_time, in.start_time));
544 }
545 m_low_level_input_count += time_slice.size();
546 process_slice(time_slice, out_vec);
547 }
548 m_parent.m_maker->flush(in.end_time, out_vec);
549 } catch (...) { // NOLINT TODO Benjamin Land <BenLand100@github.com> May 28-2021 can we restrict the possible
550 // exceptions triggeralgs might raise?
551 ers::fatal(AlgorithmFatalError(ERS_HERE, m_parent.get_name(), m_parent.m_algorithm_name));
552 return;
553 }
554 break;
556 ers::error(UnknownSetError(ERS_HERE, m_parent.get_name(), m_parent.m_algorithm_name));
557 break;
558 }
559
560 while (out_vec.size()) {
561 if (!m_parent.send(std::move(out_vec.back()))) {
562 ers::error(AlgorithmFailedToSend(ERS_HERE, m_parent.get_name(), m_parent.m_algorithm_name));
563 // out.back() is dropped
564 }
565 out_vec.pop_back();
566 }
567 }
568
569 void drain(bool drop)
570 {
571 // Send anything in the input buffer to the algorithm, and put any results
572 // on the output queue
573 std::vector<A> time_slice;
575 if (m_in_buffer.flush(time_slice, start_time, end_time)) {
576 std::vector<OUT> out_vec;
577 m_low_level_input_count += time_slice.size();
578 process_slice(time_slice, out_vec);
579 while (out_vec.size()) {
580 if (!drop) {
581 if (!m_parent.send(std::move(out_vec.back()))) {
582 ers::error(AlgorithmFailedToSend(ERS_HERE, m_parent.get_name(), m_parent.m_algorithm_name));
583 // out.back() is dropped
584 }
585 }
586 out_vec.pop_back();
587 }
588 }
589 }
590
593};
594
595} // namespace dunedaq::trigger
596
597#endif // TRIGGER_SRC_TRIGGER_TRIGGERGENERICMAKER_HPP_
#define ERS_HERE
A set of TPs or TAs in a given time window, defined by its start and end times.
Definition Set.hpp:26
timestamp_t start_time
Definition Set.hpp:55
origin_t origin
Definition Set.hpp:48
timestamp_t end_time
Definition Set.hpp:58
void do_work(std::atomic< bool > &m_running_flag)
TriggerGenericMaker(TriggerGenericMaker &&)=delete
dunedaq::utilities::WorkerThread m_thread
std::atomic< metric_counter_type > m_received_count
virtual std::unique_ptr< MAKER > make_maker(const nlohmann::json &obj)=0
TriggerGenericMaker & operator=(const TriggerGenericMaker &)=delete
void do_start(const nlohmann::json &startobj)
void set_windowing(daqdataformats::timestamp_t window_time, daqdataformats::timestamp_t buffer_time)
void get_info(opmonlib::InfoCollector &ci, int) override
decltype(triggergenericmakerinfo::Info::received_count) metric_counter_type
std::atomic< metric_counter_type > m_sent_count
void do_scrap(const nlohmann::json &obj)
TriggerGenericMaker & operator=(TriggerGenericMaker &&)=delete
void set_algorithm_name(const std::string &name)
void do_configure(const nlohmann::json &obj)
TriggerGenericMaker(const TriggerGenericMaker &)=delete
TriggerGenericWorker< IN, OUT, MAKER > worker
void process_slice(const std::vector< A > &time_slice, std::vector< OUT > &out_vec)
TriggerGenericWorker(TriggerGenericMaker< Set< A >, OUT, MAKER > &parent)
TriggerGenericWorker(TriggerGenericMaker< Set< A >, Set< B >, MAKER > &parent)
void process_slice(const std::vector< A > &time_slice, std::vector< B > &out_vec)
TriggerGenericMaker< Set< A >, Set< B >, MAKER > & m_parent
TriggerGenericWorker(TriggerGenericMaker< IN, OUT, MAKER > &parent)
TriggerGenericMaker< IN, OUT, MAKER > & m_parent
#define TLOG_DEBUG(lvl,...)
Definition Logging.hpp:112
#define TLOG(...)
Definition macro.hpp:22
uint32_t run_number_t
Type used to represent run number.
Definition Types.hpp:20
uint64_t timestamp_t
Type used to represent DUNE timing system timestamps.
Definition Types.hpp:36
daqdataformats::run_number_t run_number_t
Copy daqdataformats::run_number_t.
Definition Types.hpp:34
Including Qt Headers.
FELIX Initialization std::string initerror FELIX queue timed out
Cannot add TPSet with start_time
void warning(const Issue &issue)
Definition ers.hpp:115
void fatal(const Issue &issue)
Definition ers.hpp:88
void error(const Issue &issue)
Definition ers.hpp:81
SourceID is a generalized representation of the source of a piece of data in the DAQ....
Definition SourceID.hpp:32