3#include <folly/CancellationToken.h>
4#include <folly/coro/BlockingWait.h>
5#include <folly/coro/CurrentExecutor.h>
6#include <folly/coro/Timeout.h>
7#include <folly/futures/ThreadWheelTimekeeper.h>
14template<
class RDT,
class RHT,
class LBT,
class RPT,
class IDT>
29 if (input->get_data_type() ==
"DataRequest") {
41 if (output->get_data_type() ==
"TimeSync") {
49 throw ResourceQueueError(
ERS_HERE,
"raw_input or frag_output",
"DataHandlingModel", excpt);
84 "Delayed postprocessing (post_processing_delay_ticks > 0) requires a sorted buffer (SkipList). "
85 "Queue buffers (FixedRateQueue, BinarySearchQueue) expect in-order data and must use "
86 "post_processing_delay_ticks = 0."));
96 }
catch (
const std::bad_alloc& be) {
102template<
class RDT,
class RHT,
class LBT,
class RPT,
class IDT>
122 if (m_processing_delay_ticks) {
123 m_postprocess_scheduler_thread.set_name(
"pprocsched", m_sourceid.id);
124 m_timekeeper = std::make_unique<folly::ThreadWheelTimekeeper>();
128template<
class RDT,
class RHT,
class LBT,
class RPT,
class IDT>
142 m_t0 = std::chrono::high_resolution_clock::now();
164template<
class RDT,
class RHT,
class LBT,
class RPT,
class IDT>
176 std::this_thread::sleep_for(std::chrono::milliseconds(10));
181 std::this_thread::sleep_for(std::chrono::milliseconds(10));
187 std::this_thread::sleep_for(std::chrono::milliseconds(10));
196template<
class RDT,
class RHT,
class LBT,
class RPT,
class IDT>
206 auto now = std::chrono::high_resolution_clock::now();
208 double seconds = std::chrono::duration_cast<std::chrono::microseconds>(
now -
m_t0).count() / 1000000.;
213 if (local_num_lb_insert_failures != 0) {
230template<
class RDT,
class RHT,
class LBT,
class RPT,
class IDT>
234 if constexpr (std::is_same_v<IDT, RDT>) {
238 for (
auto& i : transformed) {
244template<
class RDT,
class RHT,
class LBT,
class RPT,
class IDT>
251template<
class RDT,
class RHT,
class LBT,
class RPT,
class IDT>
263 payload.get_timestamp(),
266 (
static_cast<double>(diff1) / 62500.0)));
286template<
class RDT,
class RHT,
class LBT,
class RPT,
class IDT>
293template<
class RDT,
class RHT,
class LBT,
class RPT,
class IDT>
311 IDT& payload = opt_payload.value();
323template<
class RDT,
class RHT,
class LBT,
class RPT,
class IDT>
324folly::coro::Task<void>
337 const auto wait_data = [
this]() -> folly::coro::Task<void> {
340 auto token =
co_await folly::coro::co_current_cancellation_token;
341 folly::CancellationCallback cb(token, [
this] {
m_baton.post(); });
346 bool timeout =
false;
350 co_await folly::coro::timeout(
353 }
catch (
const folly::FutureTimeout&) {
363 if (
auto processed = sched_algo.
run(timeout); processed > 0) {
371template<
class RDT,
class RHT,
class LBT,
class RPT,
class IDT>
378 uint64_t msg_seqno = 0;
380 auto once_per_run =
true;
381 size_t zero_timestamp_count = 0;
382 size_t duplicate_timestamp_count = 0;
383 size_t total_timestamp_count = 0;
387 ++total_timestamp_count;
391 if (timesyncmsg.daq_time != 0 && timesyncmsg.daq_time != prev_timestamp) {
392 prev_timestamp = timesyncmsg.daq_time;
394 timesyncmsg.sequence_number = ++msg_seqno;
397 <<
" wall=" << timesyncmsg.system_time <<
" run=" << timesyncmsg.run_number
398 <<
" seqno=" << timesyncmsg.sequence_number
399 <<
" source_id=" << timesyncmsg.source_id;
402 m_timesync_sender->send(std::move(timesyncmsg_copy), std::chrono::milliseconds(500));
428 if (timesyncmsg.daq_time == 0) {
429 ++zero_timestamp_count;
431 if (timesyncmsg.daq_time == prev_timestamp) {
432 ++duplicate_timestamp_count;
435 TLOG() <<
"Timesync with DAQ time 0 won't be sent out as it's an invalid sync.";
436 once_per_run =
false;
439 }
catch (
const iomanager::TimeoutExpired& excpt) {
443 for (
size_t i = 0; i < 10; ++i) {
444 std::this_thread::sleep_for(std::chrono::milliseconds(10));
452 << zero_timestamp_count <<
"/" << duplicate_timestamp_count <<
"/"
453 << total_timestamp_count <<
")";
456template<
class RDT,
class RHT,
class LBT,
class RPT,
class IDT>
const dunedaq::appmodel::LatencyBuffer * get_latency_buffer() const
Get "latency_buffer" relationship value.
uint64_t get_post_processing_delay_ticks() const
Get "post_processing_delay_ticks" attribute value. Number of clock tick by which post processing of i...
const dunedaq::appmodel::DataProcessor * get_data_processor() const
Get "data_processor" relationship value.
uint64_t get_post_processing_delay_max_wait() const
Get "post_processing_delay_max_wait" attribute value. Maximum wait time (ms) before post processing c...
uint64_t get_post_processing_delay_min_wait() const
Get "post_processing_delay_min_wait" attribute value. Minimum time (ms) between consecutive post proc...
const dunedaq::appmodel::RequestHandler * get_request_handler() const
Get "request_handler" relationship value.
const dunedaq::appmodel::DataHandlerConf * get_module_configuration() const
Get "module_configuration" relationship value.
const dunedaq::appmodel::DataMoveCallbackConf * get_raw_data_callback() const
Get "raw_data_callback" relationship value. Configuration for raw data callback.
uint32_t get_source_id() const
Get "source_id" attribute value.
bool get_post_processing_enabled() const
Get "post_processing_enabled" attribute value.
const std::string & UID() const noexcept
const std::vector< const dunedaq::confmodel::Connection * > & get_inputs() const
Get "inputs" relationship value. List of connections to/from this module.
const std::vector< const dunedaq::confmodel::Connection * > & get_outputs() const
Get "outputs" relationship value. Output connections from this module.
std::atomic< sum_request_t > m_sum_requests
daqdataformats::run_number_t m_run_number
std::atomic< num_lb_insert_failures_t > m_num_lb_insert_failures
std::chrono::microseconds m_raw_receiver_sleep_us
utilities::ReusableThread m_consumer_thread
std::unique_ptr< FrameErrorRegistry > m_error_registry
void consume_callback(IDT &&payload)
std::string m_timesync_connection_name
std::atomic< sum_payload_t > m_sum_payloads
folly::coro::Baton m_baton
std::uint64_t timestamp_t
std::shared_ptr< RequestHandlerType > m_request_handler_impl
void init(const appmodel::DataHandlerModule *modconf)
Forward calls from the appfwk.
uint64_t m_processing_delay_ticks
int m_current_fake_trigger_id
daqdataformats::SourceID m_sourceid
uint64_t m_post_processing_delay_min_wait
std::atomic< num_payload_t > m_num_payloads
std::shared_ptr< timesync_sender_ct > m_timesync_sender
void stop(const appfwk::DAQModule::CommandData_t &args)
virtual void generate_opmon_data() override
virtual std::vector< RDT > transform_payload(IDT &original) const
static constexpr timestamp_t us
void start(const appfwk::DAQModule::CommandData_t &args)
std::atomic< int > m_stats_packet_count
void run_postprocess_scheduler()
std::chrono::milliseconds m_raw_receiver_timeout_ms
utilities::ReusableThread m_timesync_thread
uint64_t m_post_processing_delay_max_wait
std::atomic< bool > & m_run_marker
std::chrono::time_point< std::chrono::high_resolution_clock > m_t0
const appmodel::DataMoveCallbackConf * m_raw_data_callback_conf
std::atomic< num_request_t > m_num_requests
std::shared_ptr< raw_receiver_ct > m_raw_data_receiver
void conf(const appfwk::DAQModule::CommandData_t &args)
void dispatch_requests(dfmessages::DataRequest &data_request)
void process_item(RDT &&payload)
void run_consume()
Function that will be run in its own thread to read the raw packets from the connection and add them ...
void run_timesync()
Function that will be run in its own thread and sends periodic timesync messages by pushing them to t...
std::function< void(IDT &&)> m_consume_callback
void transform_and_process(IDT &&payload)
std::unique_ptr< folly::Timekeeper > m_timekeeper
folly::coro::Task< void > postprocess_schedule()
std::string m_raw_data_receiver_connection_name
std::atomic< num_post_processing_delay_max_waits_t > m_num_post_processing_delay_max_waits
bool m_request_handler_supports_cutoff_timestamp
std::atomic< rawq_timeout_count_t > m_rawq_timeout_count
std::shared_ptr< LatencyBufferType > m_latency_buffer_impl
std::shared_ptr< request_receiver_ct > m_data_request_receiver
std::shared_ptr< RawDataProcessorType > m_raw_processor_impl
utilities::ReusableThread m_postprocess_scheduler_thread
static std::shared_ptr< DataMoveCallbackRegistry > get()
void set_sum_payloads(::uint64_t value)
void set_num_lb_insert_failures(::uint64_t value)
void set_rate_payloads_consumed(double value)
void set_sum_requests(::uint64_t value)
void set_num_post_processing_delay_max_waits(::uint64_t value)
void set_last_daq_timestamp(::uint64_t value)
void set_oldest_timestamp(::uint64_t value)
void set_num_data_input_timeouts(::uint64_t value)
::uint64_t num_payloads() const
void set_num_payloads(::uint64_t value)
void set_num_requests(::uint64_t value)
void set_newest_timestamp(::uint64_t value)
void register_node(ElementId name, NewNodePtr)
void publish(google::protobuf::Message &&, CustomOrigin &&co={}, OpMonLevel l=to_level(EntryOpMonLevel::kDefault)) const noexcept
Base class for any user define issue.
#define TLOG_DEBUG(lvl,...)
static std::shared_ptr< iomanager::SenderConcept< Datatype > > get_iom_sender(iomanager::ConnectionId const &id)
SourceID[" << sourceid << "] Command daqdataformats::SourceID Readout Initialization std::string initerror Configuration std::string conferror Configuration std::string conferror TimeSyncTransmissionFailed
static std::shared_ptr< iomanager::ReceiverConcept< Datatype > > get_iom_receiver(iomanager::ConnectionId const &id)
void warning(const Issue &issue)
void error(const Issue &issue)
This message represents a request for data sent to a single component of the DAQ.
sequence_number_t sequence_number
Sequence Number of the request.
std::string data_destination
ComponentRequest request_information
trigger_number_t trigger_number
Trigger number the request corresponds to.
timestamp_t trigger_timestamp
Timestamp of trigger.
run_number_t run_number
The current run number.
A synthetic message used to ensure that all elements of a DAQ system are synchronized.