9#ifndef DATAHANDLINGLIBS_INCLUDE_DATAHANDLINGLIBS_MODELS_READOUTMODEL_HPP_
10#define DATAHANDLINGLIBS_INCLUDE_DATAHANDLINGLIBS_MODELS_READOUTMODEL_HPP_
48#include <folly/coro/Baton.h>
49#include <folly/coro/Task.h>
50#include <folly/futures/Future.h>
65namespace datahandlinglibs {
67template<
class ReadoutType,
class RequestHandlerType,
class LatencyBufferType,
class RawDataProcessorType,
class InputDataType = ReadoutType>
72 using RDT = ReadoutType;
73 using RHT = RequestHandlerType;
74 using LBT = LatencyBufferType;
75 using RPT = RawDataProcessorType;
76 using IDT = InputDataType;
107 void conf(
const appfwk::DAQModule::CommandData_t& args);
110 void scrap(
const appfwk::DAQModule::CommandData_t& args)
118 void start(
const appfwk::DAQModule::CommandData_t& args);
121 void stop(
const appfwk::DAQModule::CommandData_t& args);
124 void record(
const appfwk::DAQModule::CommandData_t& args)
override
140 RawDataProcessorType& raw_processor_impl,
141 uint64_t processing_delay_ticks,
142 uint64_t post_processing_delay_min_wait,
143 uint64_t post_processing_delay_max_wait)
160 int processed = this->
do_run(timeout);
176 TLOG_DEBUG(TLVL_WORK_STEPS) <<
"Nothing to postprocess (empty buffer)";
184 TLOG() <<
"***** First pass post processing *****";
189 auto newest_ts = tail->get_timestamp();
192 std::chrono::time_point<std::chrono::system_clock>
now{ std::chrono::system_clock::now() };
198 TLOG_DEBUG(TLVL_WORK_STEPS) <<
"Nothing to postprocess (at or past cap)";
206 end_win_ts = std::min(end_win_ts, newest_ts + 1);
211 TLOG_DEBUG(TLVL_WORK_STEPS) <<
"Nothing to postprocess (data arrived too late, will be ignored)";
221 TLOG_DEBUG(TLVL_WORK_STEPS) <<
"Not ready to postprocess (m_processing_delay_ticks is greater)";
225 TLOG_DEBUG(TLVL_WORK_STEPS) <<
"Not ready to postprocess (too fast)";
241 if (!start_iter.good()) {
242 TLOG_DEBUG(TLVL_WORK_STEPS) <<
"Nothing to postprocess (!start_iter.good())";
246 if (start_iter == end_iter) {
247 TLOG_DEBUG(TLVL_WORK_STEPS) <<
"Nothing to postprocess (start_iter == end_iter)";
252 for (
auto it = start_iter; it != end_iter; ++it) {
256 TLOG_DEBUG(TLVL_WORK_STEPS) <<
"Invalid iterator in postprocessing loop";
308 return {
reinterpret_cast<RDT&
>(original) };
398 std::chrono::time_point<std::chrono::high_resolution_clock>
m_t0;
LatencyBufferType & m_latency_buffer_impl
const uint64_t m_post_processing_delay_max_wait
const uint64_t m_processing_delay_ticks
PostprocessScheduleAlgorithm(LatencyBufferType &latency_buffer_impl, RawDataProcessorType &raw_processor_impl, uint64_t processing_delay_ticks, uint64_t post_processing_delay_min_wait, uint64_t post_processing_delay_max_wait)
const timestamp_t m_max_wait_in_ticks
std::chrono::time_point< std::chrono::system_clock > m_last_post_proc_time
RawDataProcessorType & m_raw_processor_impl
const uint64_t m_post_processing_delay_min_wait
int m_consecutive_timeouts
std::atomic< sum_request_t > m_sum_requests
daqdataformats::run_number_t m_run_number
std::atomic< num_lb_insert_failures_t > m_num_lb_insert_failures
std::chrono::microseconds m_raw_receiver_sleep_us
utilities::ReusableThread m_consumer_thread
std::unique_ptr< FrameErrorRegistry > m_error_registry
void consume_callback(IDT &&payload)
DataHandlingModel(std::atomic< bool > &run_marker)
void record(const appfwk::DAQModule::CommandData_t &args) override
std::string m_timesync_connection_name
std::atomic< sum_payload_t > m_sum_payloads
folly::coro::Baton m_baton
std::uint64_t timestamp_t
std::shared_ptr< RequestHandlerType > m_request_handler_impl
void init(const appmodel::DataHandlerModule *modconf)
Forward calls from the appfwk.
uint64_t m_processing_delay_ticks
int m_current_fake_trigger_id
std::remove_const< std::invoke_result< decltype(&metric_t::num_post_processing_delay_max_waits), metric_t >::type >::type num_post_processing_delay_max_waits_t
daqdataformats::SourceID m_sourceid
static constexpr timestamp_t ms
uint64_t m_post_processing_delay_min_wait
std::atomic< num_payload_t > m_num_payloads
std::shared_ptr< timesync_sender_ct > m_timesync_sender
void stop(const appfwk::DAQModule::CommandData_t &args)
virtual void generate_opmon_data() override
virtual std::vector< RDT > transform_payload(IDT &original) const
static constexpr timestamp_t us
void start(const appfwk::DAQModule::CommandData_t &args)
virtual void invoke_postprocess_schedule_timeout_policy() const
std::remove_const< std::invoke_result< decltype(&metric_t::sum_payloads), metric_t >::type >::type sum_payload_t
void scrap(const appfwk::DAQModule::CommandData_t &args)
std::atomic< int > m_stats_packet_count
void run_postprocess_scheduler()
std::chrono::milliseconds m_raw_receiver_timeout_ms
std::remove_const< std::invoke_result< decltype(&metric_t::num_requests), metric_t >::type >::type num_request_t
std::remove_const< std::invoke_result< decltype(&metric_t::sum_requests), metric_t >::type >::type sum_request_t
utilities::ReusableThread m_timesync_thread
uint64_t m_post_processing_delay_max_wait
std::atomic< bool > & m_run_marker
std::chrono::time_point< std::chrono::high_resolution_clock > m_t0
std::atomic< num_request_t > m_num_requests
std::shared_ptr< raw_receiver_ct > m_raw_data_receiver
void conf(const appfwk::DAQModule::CommandData_t &args)
void dispatch_requests(dfmessages::DataRequest &data_request)
void process_item(RDT &&payload)
std::remove_const< std::invoke_result< decltype(&metric_t::num_data_input_timeouts), metric_t >::type >::type rawq_timeout_count_t
void run_consume()
Function that will be run in its own thread to read the raw packets from the connection and add them ...
void run_timesync()
Function that will be run in its own thread and sends periodic timesync messages by pushing them to t...
std::remove_const< std::invoke_result< decltype(&metric_t::num_payloads), metric_t >::type >::type num_payload_t
std::function< void(IDT &&)> m_consume_callback
void transform_and_process(IDT &&payload)
std::unique_ptr< folly::Timekeeper > m_timekeeper
folly::coro::Task< void > postprocess_schedule()
std::string m_raw_data_receiver_connection_name
std::atomic< num_post_processing_delay_max_waits_t > m_num_post_processing_delay_max_waits
bool m_request_handler_supports_cutoff_timestamp
std::remove_const< std::invoke_result< decltype(&metric_t::num_lb_insert_failures), metric_t >::type >::type num_lb_insert_failures_t
static constexpr timestamp_t s
std::atomic< rawq_timeout_count_t > m_rawq_timeout_count
std::shared_ptr< LatencyBufferType > m_latency_buffer_impl
std::shared_ptr< request_receiver_ct > m_data_request_receiver
std::shared_ptr< RawDataProcessorType > m_raw_processor_impl
static constexpr timestamp_t ns
utilities::ReusableThread m_postprocess_scheduler_thread
virtual ~DataHandlingModel()=default
::uint64_t num_lb_insert_failures() const
::uint64_t num_requests() const
::uint64_t num_post_processing_delay_max_waits() const
::uint64_t num_data_input_timeouts() const
::uint64_t sum_requests() const
::uint64_t num_payloads() const
::uint64_t sum_payloads() const
std::atomic< bool > run_marker
Global atomic for process lifetime.
#define TLOG_DEBUG(lvl,...)
This message represents a request for data sent to a single component of the DAQ.