9#ifndef TRIGGER_SRC_TRIGGER_TRIGGERGENERICMAKER_HPP_
10#define TRIGGER_SRC_TRIGGER_TRIGGERGENERICMAKER_HPP_
14#include "trigger/TimeSliceInputBuffer.hpp"
15#include "trigger/TimeSliceOutputBuffer.hpp"
16#include "trigger/triggergenericmakerinfo/InfoNljs.hpp"
36template<
class IN,
class OUT,
class MAKER>
44template<
class IN,
class OUT,
class MAKER>
85 void get_info(opmonlib::InfoCollector& ci,
int )
override
87 triggergenericmakerinfo::Info i;
91 if (
m_maker) { i.data_vs_system_ms =
m_maker->m_data_vs_system_time; }
92 else i.data_vs_system_ms = 0;
151 m_thread.start_working_thread(get_name());
181 void do_work(std::atomic<bool>& m_running_flag)
184 while (m_running_flag.load()) {
189 if (m_running_flag.load()) {
204 <<
" inputs (" <<
worker.get_low_level_input_count() <<
" sub-inputs) and successfully sent " <<
m_sent_count
213 }
catch (
const dunedaq::iomanager::TimeoutExpired& excpt) {
226 }
catch (
const dunedaq::iomanager::TimeoutExpired& excpt) {
243template<
class IN,
class OUT,
class MAKER>
262 std::vector<OUT> out_vec;
264 m_parent.m_maker->operator()(in, out_vec);
271 while (out_vec.size()) {
272 if (!
m_parent.send(std::move(out_vec.back()))) {
288template<
class A,
class B,
class MAKER>
294 ,
m_in_buffer(parent.get_name(), parent.m_algorithm_name)
295 ,
m_out_buffer(parent.get_name(), parent.m_algorithm_name, parent.m_buffer_time)
319 void process_slice(
const std::vector<A>& time_slice, std::vector<B>& out_vec)
323 for (
const A& x : time_slice) {
325 m_parent.m_maker->operator()(x, out_vec);
336 std::vector<B> elems;
343 std::vector<A> time_slice;
359 std::vector<A> time_slice;
397 if (elems.size() > 0) {
401 size_t n_output_windows=0;
412 TLOG_DEBUG(4) <<
"Sending heartbeat with start time " <<
out.start_time;
420 TLOG_DEBUG(4) <<
"Output set window ready with start time " <<
out.start_time <<
" end time " <<
out.end_time
421 <<
" and " <<
out.objects.size() <<
" members";
428 TLOG_DEBUG(4) <<
"process() done. Advanced output buffer by " << n_output_windows <<
" output windows";
435 std::vector<A> time_slice;
438 std::vector<B> elems;
441 if (elems.size() > 0) {
464 TLOG_DEBUG(1) <<
"Output set window ready with start time " <<
out.start_time <<
" end time " <<
out.end_time
465 <<
" and " <<
out.objects.size() <<
" members";
482template<
class A,
class OUT,
class MAKER>
488 ,
m_in_buffer(parent.get_name(), parent.m_algorithm_name)
502 void process_slice(
const std::vector<A>& time_slice, std::vector<OUT>& out_vec)
506 for (
const A& x : time_slice) {
508 m_parent.m_maker->operator()(x, out_vec);
519 std::vector<OUT> out_vec;
522 std::vector<A> time_slice;
538 std::vector<A> time_slice;
560 while (out_vec.size()) {
561 if (!
m_parent.send(std::move(out_vec.back()))) {
573 std::vector<A> time_slice;
576 std::vector<OUT> out_vec;
579 while (out_vec.size()) {
581 if (!
m_parent.send(std::move(out_vec.back()))) {
A set of TPs or TAs in a given time window, defined by its start and end times.
void do_configure(const CommandData_t &obj)
std::string m_algorithm_name
std::shared_ptr< sink_t > m_output_queue
dunedaq::iomanager::SenderConcept< OUT > sink_t
void do_work(std::atomic< bool > &m_running_flag)
TriggerGenericMaker(TriggerGenericMaker &&)=delete
void do_scrap(const CommandData_t &obj)
dunedaq::utilities::WorkerThread m_thread
void do_stop(const CommandData_t &)
daqdataformats::timestamp_t m_window_time
std::chrono::milliseconds m_queue_timeout
std::atomic< metric_counter_type > m_received_count
TriggerGenericMaker(const std::string &name)
virtual std::unique_ptr< MAKER > make_maker(const nlohmann::json &obj)=0
TriggerGenericMaker & operator=(const TriggerGenericMaker &)=delete
std::unique_ptr< MAKER > m_maker
daqdataformats::timestamp_t m_buffer_time
void do_start(const CommandData_t &startobj)
dfmessages::run_number_t m_run_number
virtual ~TriggerGenericMaker()
void set_windowing(daqdataformats::timestamp_t window_time, daqdataformats::timestamp_t buffer_time)
std::shared_ptr< source_t > m_input_queue
void get_info(opmonlib::InfoCollector &ci, int) override
decltype(triggergenericmakerinfo::Info::received_count) metric_counter_type
std::atomic< metric_counter_type > m_sent_count
void set_sourceid(uint32_t element_id)
TriggerGenericMaker & operator=(TriggerGenericMaker &&)=delete
void set_algorithm_name(const std::string &name)
dunedaq::iomanager::ReceiverConcept< IN > source_t
TriggerGenericMaker(const TriggerGenericMaker &)=delete
TriggerGenericWorker< IN, OUT, MAKER > worker
nlohmann::json m_maker_conf
void process(Set< A > &in)
size_t get_low_level_input_count()
TriggerGenericMaker< Set< A >, OUT, MAKER > & m_parent
void process_slice(const std::vector< A > &time_slice, std::vector< OUT > &out_vec)
size_t m_low_level_input_count
TimeSliceInputBuffer< A > m_in_buffer
TriggerGenericWorker(TriggerGenericMaker< Set< A >, OUT, MAKER > &parent)
TimeSliceInputBuffer< A > m_in_buffer
daqdataformats::timestamp_t m_prev_start_time
TriggerGenericWorker(TriggerGenericMaker< Set< A >, Set< B >, MAKER > &parent)
void process_slice(const std::vector< A > &time_slice, std::vector< B > &out_vec)
size_t get_low_level_input_count()
size_t m_low_level_input_count
TimeSliceOutputBuffer< B > m_out_buffer
void process(Set< A > &in)
TriggerGenericMaker< Set< A >, Set< B >, MAKER > & m_parent
size_t m_low_level_input_count
TriggerGenericWorker(TriggerGenericMaker< IN, OUT, MAKER > &parent)
size_t get_low_level_input_count()
TriggerGenericMaker< IN, OUT, MAKER > & m_parent
WorkerThread contains a thread which runs the do_work() function.
#define TLOG_DEBUG(lvl,...)
daqdataformats::run_number_t run_number_t
Copy daqdataformats::run_number_t.
FELIX Initialization std::string initerror FELIX queue timed out
Cannot add TPSet with start_time
void warning(const Issue &issue)
void fatal(const Issue &issue)
void error(const Issue &issue)