9#ifndef DATAHANDLINGLIBS_INCLUDE_DATAHANDLINGLIBS_MODELS_READOUTMODEL_HPP_
10#define DATAHANDLINGLIBS_INCLUDE_DATAHANDLINGLIBS_MODELS_READOUTMODEL_HPP_
60namespace datahandlinglibs {
62template<
class ReadoutType,
class RequestHandlerType,
class LatencyBufferType,
class RawDataProcessorType,
class InputDataType = ReadoutType>
67 using RDT = ReadoutType;
68 using RHT = RequestHandlerType;
69 using LBT = LatencyBufferType;
70 using RPT = RawDataProcessorType;
71 using IDT = InputDataType;
103 void conf(
const nlohmann::json& args);
106 void scrap(
const nlohmann::json& args)
114 void start(
const nlohmann::json& args);
117 void stop(
const nlohmann::json& args);
120 void record(
const nlohmann::json& args)
override
151 return {
reinterpret_cast<RDT&
>(original) };
227 std::chrono::time_point<std::chrono::high_resolution_clock>
m_t0;
std::atomic< sum_request_t > m_sum_requests
daqdataformats::run_number_t m_run_number
void conf(const nlohmann::json &args)
std::atomic< num_lb_insert_failures_t > m_num_lb_insert_failures
std::chrono::microseconds m_raw_receiver_sleep_us
utilities::ReusableThread m_consumer_thread
std::unique_ptr< FrameErrorRegistry > m_error_registry
DataHandlingModel(std::atomic< bool > &run_marker)
std::string m_timesync_connection_name
std::atomic< sum_payload_t > m_sum_payloads
std::uint64_t timestamp_t
std::shared_ptr< RequestHandlerType > m_request_handler_impl
void stop(const nlohmann::json &args)
void init(const appmodel::DataHandlerModule *modconf)
Forward calls from the appfwk.
uint64_t m_processing_delay_ticks
int m_current_fake_trigger_id
daqdataformats::SourceID m_sourceid
static constexpr timestamp_t ms
void process_item(RDT &payload)
std::atomic< num_payload_t > m_num_payloads
std::function< void(RDT &&)> m_consume_callback
std::shared_ptr< timesync_sender_ct > m_timesync_sender
virtual void generate_opmon_data() override
virtual std::vector< RDT > transform_payload(IDT &original) const
static constexpr timestamp_t us
std::remove_const< std::invoke_result< decltype(&metric_t::sum_payloads), metric_t >::type >::type sum_payload_t
std::atomic< int > m_stats_packet_count
void consume_payload(RDT &&payload)
void record(const nlohmann::json &args) override
std::chrono::milliseconds m_raw_receiver_timeout_ms
std::remove_const< std::invoke_result< decltype(&metric_t::num_requests), metric_t >::type >::type num_request_t
std::remove_const< std::invoke_result< decltype(&metric_t::sum_requests), metric_t >::type >::type sum_request_t
utilities::ReusableThread m_timesync_thread
void scrap(const nlohmann::json &args)
std::atomic< bool > & m_run_marker
std::chrono::time_point< std::chrono::high_resolution_clock > m_t0
std::atomic< num_request_t > m_num_requests
std::shared_ptr< raw_receiver_ct > m_raw_data_receiver
void dispatch_requests(dfmessages::DataRequest &data_request)
void start(const nlohmann::json &args)
std::remove_const< std::invoke_result< decltype(&metric_t::num_data_input_timeouts), metric_t >::type >::type rawq_timeout_count_t
void run_consume()
Function that will be run in its own thread to read the raw packets from the connection and add them ...
void run_timesync()
Function that will be run in its own thread and sends periodic timesync messages by pushing them to t...
std::remove_const< std::invoke_result< decltype(&metric_t::num_payloads), metric_t >::type >::type num_payload_t
std::string m_raw_data_receiver_connection_name
bool m_request_handler_supports_cutoff_timestamp
std::remove_const< std::invoke_result< decltype(&metric_t::num_lb_insert_failures), metric_t >::type >::type num_lb_insert_failures_t
static constexpr timestamp_t s
std::atomic< rawq_timeout_count_t > m_rawq_timeout_count
std::shared_ptr< LatencyBufferType > m_latency_buffer_impl
uint32_t m_pid_of_current_process
std::shared_ptr< request_receiver_ct > m_data_request_receiver
std::shared_ptr< RawDataProcessorType > m_raw_processor_impl
static constexpr timestamp_t ns
virtual ~DataHandlingModel()=default
::uint64_t num_lb_insert_failures() const
::uint64_t num_requests() const
::uint64_t num_data_input_timeouts() const
::uint64_t sum_requests() const
::uint64_t num_payloads() const
::uint64_t sum_payloads() const
std::atomic< bool > run_marker
Global atomic for process lifetime.
This message represents a request for data sent to a single component of the DAQ.