9#ifndef TRIGGER_SRC_TRIGGER_TRIGGERGENERICMAKER_HPP_
10#define TRIGGER_SRC_TRIGGER_TRIGGERGENERICMAKER_HPP_
14#include "trigger/TimeSliceInputBuffer.hpp"
15#include "trigger/TimeSliceOutputBuffer.hpp"
16#include "trigger/triggergenericmakerinfo/InfoNljs.hpp"
36template<
class IN,
class OUT,
class MAKER>
44template<
class IN,
class OUT,
class MAKER>
85 void get_info(opmonlib::InfoCollector& ci,
int )
override
87 triggergenericmakerinfo::Info i;
91 if (
m_maker) { i.data_vs_system_ms =
m_maker->m_data_vs_system_time; }
92 else i.data_vs_system_ms = 0;
143 virtual std::unique_ptr<MAKER>
make_maker(
const nlohmann::json& obj) = 0;
151 m_thread.start_working_thread(get_name());
181 void do_work(std::atomic<bool>& m_running_flag)
184 while (m_running_flag.load()) {
189 if (m_running_flag.load()) {
204 <<
" inputs (" <<
worker.get_low_level_input_count() <<
" sub-inputs) and successfully sent " <<
m_sent_count
213 }
catch (
const dunedaq::iomanager::TimeoutExpired& excpt) {
226 }
catch (
const dunedaq::iomanager::TimeoutExpired& excpt) {
243template<
class IN,
class OUT,
class MAKER>
262 std::vector<OUT> out_vec;
264 m_parent.m_maker->operator()(in, out_vec);
271 while (out_vec.size()) {
272 if (!
m_parent.send(std::move(out_vec.back()))) {
288template<
class A,
class B,
class MAKER>
294 , m_in_buffer(parent.get_name(), parent.m_algorithm_name)
295 , m_out_buffer(parent.get_name(), parent.m_algorithm_name, parent.m_buffer_time)
308 m_out_buffer.set_window_time(
m_parent.m_window_time);
309 m_out_buffer.set_buffer_time(
m_parent.m_buffer_time);
314 m_prev_start_time = 0;
315 m_out_buffer.reset();
319 void process_slice(
const std::vector<A>& time_slice, std::vector<B>& out_vec)
323 for (
const A& x : time_slice) {
325 m_parent.m_maker->operator()(x, out_vec);
336 std::vector<B> elems;
339 if (m_prev_start_time != 0 && in.
start_time < m_prev_start_time) {
343 std::vector<A> time_slice;
345 if (!m_in_buffer.buffer(in, time_slice,
start_time, end_time)) {
349 process_slice(time_slice, elems);
359 std::vector<A> time_slice;
361 if (m_in_buffer.flush(time_slice,
start_time, end_time)) {
367 process_slice(time_slice, elems);
378 m_out_buffer.buffer_heartbeat(heartbeat);
397 if (elems.size() > 0) {
398 m_out_buffer.buffer(elems);
401 size_t n_output_windows=0;
403 while (m_out_buffer.ready()) {
406 m_out_buffer.flush(
out);
412 TLOG_DEBUG(4) <<
"Sending heartbeat with start time " <<
out.start_time;
420 TLOG_DEBUG(4) <<
"Output set window ready with start time " <<
out.start_time <<
" end time " <<
out.end_time
421 <<
" and " <<
out.objects.size() <<
" members";
428 TLOG_DEBUG(4) <<
"process() done. Advanced output buffer by " << n_output_windows <<
" output windows";
435 std::vector<A> time_slice;
437 if (m_in_buffer.flush(time_slice,
start_time, end_time)) {
438 std::vector<B> elems;
440 process_slice(time_slice, elems);
441 if (elems.size() > 0) {
442 m_out_buffer.buffer(elems);
447 while (!m_out_buffer.empty()) {
449 m_out_buffer.flush(
out);
464 TLOG_DEBUG(1) <<
"Output set window ready with start time " <<
out.start_time <<
" end time " <<
out.end_time
465 <<
" and " <<
out.objects.size() <<
" members";
482template<
class A,
class OUT,
class MAKER>
488 , m_in_buffer(parent.get_name(), parent.m_algorithm_name)
502 void process_slice(
const std::vector<A>& time_slice, std::vector<OUT>& out_vec)
506 for (
const A& x : time_slice) {
508 m_parent.m_maker->operator()(x, out_vec);
519 std::vector<OUT> out_vec;
522 std::vector<A> time_slice;
524 if (!m_in_buffer.buffer(in, time_slice,
start_time, end_time)) {
528 process_slice(time_slice, out_vec);
538 std::vector<A> time_slice;
540 if (m_in_buffer.flush(time_slice,
start_time, end_time)) {
546 process_slice(time_slice, out_vec);
560 while (out_vec.size()) {
561 if (!
m_parent.send(std::move(out_vec.back()))) {
573 std::vector<A> time_slice;
575 if (m_in_buffer.flush(time_slice,
start_time, end_time)) {
576 std::vector<OUT> out_vec;
578 process_slice(time_slice, out_vec);
579 while (out_vec.size()) {
581 if (!
m_parent.send(std::move(out_vec.back()))) {
A set of TPs or TAs in a given time window, defined by its start and end times.
std::string m_algorithm_name
std::shared_ptr< sink_t > m_output_queue
void do_work(std::atomic< bool > &m_running_flag)
TriggerGenericMaker(TriggerGenericMaker &&)=delete
dunedaq::utilities::WorkerThread m_thread
daqdataformats::timestamp_t m_window_time
std::chrono::milliseconds m_queue_timeout
std::atomic< metric_counter_type > m_received_count
TriggerGenericMaker(const std::string &name)
virtual std::unique_ptr< MAKER > make_maker(const nlohmann::json &obj)=0
TriggerGenericMaker & operator=(const TriggerGenericMaker &)=delete
void do_start(const nlohmann::json &startobj)
std::unique_ptr< MAKER > m_maker
daqdataformats::timestamp_t m_buffer_time
dfmessages::run_number_t m_run_number
virtual ~TriggerGenericMaker()
void set_windowing(daqdataformats::timestamp_t window_time, daqdataformats::timestamp_t buffer_time)
std::shared_ptr< source_t > m_input_queue
void get_info(opmonlib::InfoCollector &ci, int) override
decltype(triggergenericmakerinfo::Info::received_count) metric_counter_type
std::atomic< metric_counter_type > m_sent_count
void do_scrap(const nlohmann::json &obj)
void set_sourceid(uint32_t element_id)
TriggerGenericMaker & operator=(TriggerGenericMaker &&)=delete
void set_algorithm_name(const std::string &name)
void do_stop(const nlohmann::json &)
void do_configure(const nlohmann::json &obj)
TriggerGenericMaker(const TriggerGenericMaker &)=delete
TriggerGenericWorker< IN, OUT, MAKER > worker
nlohmann::json m_maker_conf
void process(Set< A > &in)
size_t get_low_level_input_count()
TriggerGenericMaker< Set< A >, OUT, MAKER > & m_parent
void process_slice(const std::vector< A > &time_slice, std::vector< OUT > &out_vec)
size_t m_low_level_input_count
TimeSliceInputBuffer< A > m_in_buffer
TriggerGenericWorker(TriggerGenericMaker< Set< A >, OUT, MAKER > &parent)
TimeSliceInputBuffer< A > m_in_buffer
TriggerGenericWorker(TriggerGenericMaker< Set< A >, Set< B >, MAKER > &parent)
void process_slice(const std::vector< A > &time_slice, std::vector< B > &out_vec)
size_t get_low_level_input_count()
size_t m_low_level_input_count
TimeSliceOutputBuffer< B > m_out_buffer
void process(Set< A > &in)
TriggerGenericMaker< Set< A >, Set< B >, MAKER > & m_parent
size_t m_low_level_input_count
TriggerGenericWorker(TriggerGenericMaker< IN, OUT, MAKER > &parent)
size_t get_low_level_input_count()
TriggerGenericMaker< IN, OUT, MAKER > & m_parent
#define TLOG_DEBUG(lvl,...)
daqdataformats::run_number_t run_number_t
Copy daqdataformats::run_number_t.
FELIX Initialization std::string initerror FELIX queue timed out
Cannot add TPSet with start_time
void warning(const Issue &issue)
void fatal(const Issue &issue)
void error(const Issue &issue)