9#ifndef SNBMODULES_INCLUDE_SNBMODULES_READOUT_SNBDATAHANDLINGMODEL_HPP_
10#define SNBMODULES_INCLUDE_SNBMODULES_READOUT_SNBDATAHANDLINGMODEL_HPP_
48#include <folly/coro/Baton.h>
49#include <folly/coro/Task.h>
50#include <folly/futures/ThreadWheelTimekeeper.h>
66template<
class ReadoutType,
class RequestHandlerType,
class LatencyBufferType,
class RawDataProcessorType,
class InputDataType = ReadoutType>
71 using RDT = ReadoutType;
72 using RHT = RequestHandlerType;
73 using LBT = LatencyBufferType;
74 using RPT = RawDataProcessorType;
75 using IDT = InputDataType;
106 void conf(
const appfwk::DAQModule::CommandData_t& args);
109 void scrap(
const appfwk::DAQModule::CommandData_t& args)
117 void start(
const appfwk::DAQModule::CommandData_t& args);
120 void stop(
const appfwk::DAQModule::CommandData_t& args);
123 void record(
const appfwk::DAQModule::CommandData_t& args)
override
163 return {
reinterpret_cast<RDT&
>(original) };
247 std::chrono::time_point<std::chrono::high_resolution_clock>
m_t0;
::uint64_t num_lb_insert_failures() const
::uint64_t num_requests() const
::uint64_t num_post_processing_delay_max_waits() const
::uint64_t num_data_input_timeouts() const
::uint64_t sum_requests() const
::uint64_t num_payloads() const
::uint64_t sum_payloads() const
std::function< void(IDT &&)> m_consume_callback
std::atomic< num_lb_insert_failures_t > m_num_lb_insert_failures
std::remove_const< std::invoke_result< decltype(&metric_t::num_payloads), metric_t >::type >::type num_payload_t
bool m_request_handler_supports_cutoff_timestamp
utilities::ReusableThread m_postprocess_scheduler_thread
void start(const appfwk::DAQModule::CommandData_t &args)
SNBDataHandlingModel(std::atomic< bool > &run_marker)
virtual ~SNBDataHandlingModel()=default
std::shared_ptr< raw_receiver_ct > m_raw_data_receiver
void process_item(RDT &&payload)
std::string m_raw_data_receiver_connection_name
std::atomic< rawq_timeout_count_t > m_rawq_timeout_count
std::atomic< int > m_stats_packet_count
std::shared_ptr< RawDataProcessorType > m_raw_processor_impl
static constexpr timestamp_t ms
std::remove_const< std::invoke_result< decltype(&metric_t::num_post_processing_delay_max_waits), metric_t >::type >::type num_post_processing_delay_max_waits_t
virtual std::vector< RDT > transform_payload(IDT &original) const
std::remove_const< std::invoke_result< decltype(&metric_t::sum_requests), metric_t >::type >::type sum_request_t
uint64_t m_processing_delay_ticks
std::atomic< sum_request_t > m_sum_requests
void init(const appmodel::DataHandlerModule *modconf)
Forward calls from the appfwk.
virtual void generate_opmon_data() override
std::uint64_t timestamp_t
uint64_t m_post_processing_delay_min_wait
utilities::ReusableThread m_consumer_thread
std::shared_ptr< timesync_sender_ct > m_timesync_sender
std::chrono::milliseconds m_raw_receiver_timeout_ms
std::chrono::time_point< std::chrono::high_resolution_clock > m_t0
std::chrono::microseconds m_raw_receiver_sleep_us
std::shared_ptr< RequestHandlerType > m_request_handler_impl
folly::coro::Task< void > postprocess_schedule()
void transform_and_process(IDT &&payload)
daqdataformats::SourceID m_sourceid
void conf(const appfwk::DAQModule::CommandData_t &args)
void record(const appfwk::DAQModule::CommandData_t &args) override
static constexpr timestamp_t ns
std::unique_ptr< folly::ThreadWheelTimekeeper > m_timekeeper
void stop(const appfwk::DAQModule::CommandData_t &args)
std::atomic< num_payload_t > m_num_payloads
std::atomic< sum_payload_t > m_sum_payloads
static constexpr timestamp_t s
utilities::ReusableThread m_timesync_thread
std::remove_const< std::invoke_result< decltype(&metric_t::num_lb_insert_failures), metric_t >::type >::type num_lb_insert_failures_t
std::shared_ptr< LatencyBufferType > m_latency_buffer_impl
folly::coro::Baton m_baton
void dispatch_requests(dfmessages::DataRequest &data_request)
std::atomic< bool > & m_run_marker
int m_current_fake_trigger_id
std::atomic< num_post_processing_delay_max_waits_t > m_num_post_processing_delay_max_waits
std::shared_ptr< request_receiver_ct > m_data_request_receiver
uint64_t m_post_processing_delay_max_wait
std::remove_const< std::invoke_result< decltype(&metric_t::sum_payloads), metric_t >::type >::type sum_payload_t
std::atomic< num_request_t > m_num_requests
std::remove_const< std::invoke_result< decltype(&metric_t::num_data_input_timeouts), metric_t >::type >::type rawq_timeout_count_t
static constexpr timestamp_t us
void run_postprocess_scheduler()
std::unique_ptr< datahandlinglibs::FrameErrorRegistry > m_error_registry
void run_timesync()
Function that will be run in its own thread and sends periodic timesync messages by pushing them to t...
void consume_callback(IDT &&payload)
void run_consume()
Function that will be run in its own thread to read the raw packets from the connection and add them ...
std::string m_timesync_connection_name
void scrap(const appfwk::DAQModule::CommandData_t &args)
std::remove_const< std::invoke_result< decltype(&metric_t::num_requests), metric_t >::type >::type num_request_t
daqdataformats::run_number_t m_run_number
std::atomic< bool > run_marker
Global atomic for process lifetime.
This message represents a request for data sent to a single component of the DAQ.