DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
dunedaq::snbmodules::SNBDataHandlingModel< ReadoutType, RequestHandlerType, LatencyBufferType, RawDataProcessorType, InputDataType > Class Template Reference

#include <SNBDataHandlingModel.hpp>

Inheritance diagram for dunedaq::snbmodules::SNBDataHandlingModel< ReadoutType, RequestHandlerType, LatencyBufferType, RawDataProcessorType, InputDataType >:
[legend]
Collaboration diagram for dunedaq::snbmodules::SNBDataHandlingModel< ReadoutType, RequestHandlerType, LatencyBufferType, RawDataProcessorType, InputDataType >:
[legend]

Public Types

using RDT = ReadoutType
 
using RHT = RequestHandlerType
 
using LBT = LatencyBufferType
 
using RPT = RawDataProcessorType
 
using IDT = InputDataType
 
using timestamp_t = std::uint64_t
 
- Public Types inherited from dunedaq::opmonlib::MonitorableObject
using NodePtr = std::weak_ptr<MonitorableObject>
 
using NewNodePtr = std::shared_ptr<MonitorableObject>
 
using ElementId = std::string
 

Public Member Functions

 SNBDataHandlingModel (std::atomic< bool > &run_marker)
 
virtual ~SNBDataHandlingModel ()=default
 
void init (const appmodel::DataHandlerModule *modconf)
 Forward calls from the appfwk.
 
void conf (const appfwk::DAQModule::CommandData_t &args)
 
void scrap (const appfwk::DAQModule::CommandData_t &args)
 
void start (const appfwk::DAQModule::CommandData_t &args)
 
void stop (const appfwk::DAQModule::CommandData_t &args)
 
void record (const appfwk::DAQModule::CommandData_t &args) override
 
- Public Member Functions inherited from dunedaq::datahandlinglibs::DataHandlingConcept
 DataHandlingConcept ()
 
virtual ~DataHandlingConcept ()
 
 DataHandlingConcept (const DataHandlingConcept &)=delete
 DataHandlingConcept is not copy-constructible.
 
DataHandlingConceptoperator= (const DataHandlingConcept &)=delete
 DataHandlingConcept is not copy-assginable.
 
 DataHandlingConcept (DataHandlingConcept &&)=delete
 DataHandlingConcept is not move-constructible.
 
DataHandlingConceptoperator= (DataHandlingConcept &&)=delete
 DataHandlingConcept is not move-assignable.
 
- Public Member Functions inherited from dunedaq::opmonlib::MonitorableObject
 MonitorableObject (const MonitorableObject &)=delete
 
MonitorableObjectoperator= (const MonitorableObject &)=delete
 
 MonitorableObject (MonitorableObject &&)=delete
 
MonitorableObjectoperator= (MonitorableObject &&)=delete
 
virtual ~MonitorableObject ()=default
 
auto get_opmon_id () const noexcept
 
auto get_opmon_level () const noexcept
 

Public Attributes

std::function< void(IDT &&)> m_consume_callback
 

Static Public Attributes

static constexpr timestamp_t ns = 1
 
static constexpr timestamp_t us = 1000 * ns
 
static constexpr timestamp_t ms = 1000 * us
 
static constexpr timestamp_t s = 1000 * ms
 

Protected Types

using metric_t = dunedaq::datahandlinglibs::opmon::DataHandlerInfo
 
using num_payload_t = std::remove_const<std::invoke_result<decltype(&metric_t::num_payloads), metric_t>::type>::type
 
using sum_payload_t = std::remove_const<std::invoke_result<decltype(&metric_t::sum_payloads), metric_t>::type>::type
 
using num_request_t = std::remove_const<std::invoke_result<decltype(&metric_t::num_requests), metric_t>::type>::type
 
using sum_request_t = std::remove_const<std::invoke_result<decltype(&metric_t::sum_requests), metric_t>::type>::type
 
using rawq_timeout_count_t
 
using num_lb_insert_failures_t
 
using num_post_processing_delay_max_waits_t
 
using raw_receiver_ct = iomanager::ReceiverConcept<InputDataType>
 
using request_receiver_ct = iomanager::ReceiverConcept<dfmessages::DataRequest>
 
using timesync_sender_ct = iomanager::SenderConcept<dfmessages::TimeSync>
 

Protected Member Functions

void process_item (RDT &&payload)
 
void transform_and_process (IDT &&payload)
 
void consume_callback (IDT &&payload)
 
void run_consume ()
 Function that will be run in its own thread to read the raw packets from the connection and add them to the LB.
 
void run_timesync ()
 Function that will be run in its own thread and sends periodic timesync messages by pushing them to the connection.
 
void run_postprocess_scheduler ()
 
folly::coro::Task< void > postprocess_schedule ()
 
void dispatch_requests (dfmessages::DataRequest &data_request)
 
virtual std::vector< RDTtransform_payload (IDT &original) const
 
virtual void generate_opmon_data () override
 
- Protected Member Functions inherited from dunedaq::opmonlib::MonitorableObject
 MonitorableObject ()=default
 
void register_node (ElementId name, NewNodePtr)
 
void publish (google::protobuf::Message &&, CustomOrigin &&co={}, OpMonLevel l=to_level(EntryOpMonLevel::kDefault)) const noexcept
 

Protected Attributes

std::atomic< bool > & m_run_marker
 
bool m_callback_mode
 
bool m_fake_trigger
 
bool m_generate_timesync = false
 
int m_current_fake_trigger_id
 
daqdataformats::SourceID m_sourceid
 
daqdataformats::run_number_t m_run_number
 
uint64_t m_processing_delay_ticks
 
uint64_t m_post_processing_delay_min_wait
 
uint64_t m_post_processing_delay_max_wait
 
std::atomic< num_payload_tm_num_payloads { 0 }
 
std::atomic< sum_payload_tm_sum_payloads { 0 }
 
std::atomic< num_request_tm_num_requests { 0 }
 
std::atomic< sum_request_tm_sum_requests { 0 }
 
std::atomic< rawq_timeout_count_tm_rawq_timeout_count { 0 }
 
std::atomic< num_lb_insert_failures_tm_num_lb_insert_failures { 0 }
 
std::atomic< num_post_processing_delay_max_waits_tm_num_post_processing_delay_max_waits { 0 }
 
std::atomic< int > m_stats_packet_count { 0 }
 
utilities::ReusableThread m_consumer_thread
 
std::chrono::milliseconds m_raw_receiver_timeout_ms
 
std::chrono::microseconds m_raw_receiver_sleep_us
 
std::shared_ptr< raw_receiver_ctm_raw_data_receiver
 
std::string m_raw_data_receiver_connection_name
 
std::shared_ptr< request_receiver_ctm_data_request_receiver
 
std::shared_ptr< timesync_sender_ctm_timesync_sender
 
utilities::ReusableThread m_timesync_thread
 
std::string m_timesync_connection_name
 
utilities::ReusableThread m_postprocess_scheduler_thread
 
folly::coro::Baton m_baton
 
std::unique_ptr< folly::ThreadWheelTimekeeper > m_timekeeper
 
std::shared_ptr< LatencyBufferType > m_latency_buffer_impl
 
std::shared_ptr< RawDataProcessorType > m_raw_processor_impl
 
std::shared_ptr< RequestHandlerType > m_request_handler_impl
 
bool m_request_handler_supports_cutoff_timestamp
 
std::unique_ptr< datahandlinglibs::FrameErrorRegistrym_error_registry
 
std::chrono::time_point< std::chrono::high_resolution_clock > m_t0
 

Additional Inherited Members

- Static Public Member Functions inherited from dunedaq::opmonlib::MonitorableObject
static bool publishable_metric (OpMonLevel entry, OpMonLevel system) noexcept
 

Detailed Description

template<class ReadoutType, class RequestHandlerType, class LatencyBufferType, class RawDataProcessorType, class InputDataType = ReadoutType>
class dunedaq::snbmodules::SNBDataHandlingModel< ReadoutType, RequestHandlerType, LatencyBufferType, RawDataProcessorType, InputDataType >

Definition at line 71 of file SNBDataHandlingModel.hpp.

Member Typedef Documentation

◆ IDT

template<class ReadoutType , class RequestHandlerType , class LatencyBufferType , class RawDataProcessorType , class InputDataType = ReadoutType>
using dunedaq::snbmodules::SNBDataHandlingModel< ReadoutType, RequestHandlerType, LatencyBufferType, RawDataProcessorType, InputDataType >::IDT = InputDataType

Definition at line 79 of file SNBDataHandlingModel.hpp.

◆ LBT

template<class ReadoutType , class RequestHandlerType , class LatencyBufferType , class RawDataProcessorType , class InputDataType = ReadoutType>
using dunedaq::snbmodules::SNBDataHandlingModel< ReadoutType, RequestHandlerType, LatencyBufferType, RawDataProcessorType, InputDataType >::LBT = LatencyBufferType

Definition at line 77 of file SNBDataHandlingModel.hpp.

◆ metric_t

template<class ReadoutType , class RequestHandlerType , class LatencyBufferType , class RawDataProcessorType , class InputDataType = ReadoutType>
using dunedaq::snbmodules::SNBDataHandlingModel< ReadoutType, RequestHandlerType, LatencyBufferType, RawDataProcessorType, InputDataType >::metric_t = dunedaq::datahandlinglibs::opmon::DataHandlerInfo
protected

Definition at line 182 of file SNBDataHandlingModel.hpp.

◆ num_lb_insert_failures_t

template<class ReadoutType , class RequestHandlerType , class LatencyBufferType , class RawDataProcessorType , class InputDataType = ReadoutType>
using dunedaq::snbmodules::SNBDataHandlingModel< ReadoutType, RequestHandlerType, LatencyBufferType, RawDataProcessorType, InputDataType >::num_lb_insert_failures_t
protected
Initial value:
std::remove_const<std::invoke_result<decltype(&metric_t::num_lb_insert_failures), metric_t>::type>::type
dunedaq::datahandlinglibs::opmon::DataHandlerInfo metric_t

Definition at line 189 of file SNBDataHandlingModel.hpp.

◆ num_payload_t

template<class ReadoutType , class RequestHandlerType , class LatencyBufferType , class RawDataProcessorType , class InputDataType = ReadoutType>
using dunedaq::snbmodules::SNBDataHandlingModel< ReadoutType, RequestHandlerType, LatencyBufferType, RawDataProcessorType, InputDataType >::num_payload_t = std::remove_const<std::invoke_result<decltype(&metric_t::num_payloads), metric_t>::type>::type
protected

Definition at line 183 of file SNBDataHandlingModel.hpp.

◆ num_post_processing_delay_max_waits_t

template<class ReadoutType , class RequestHandlerType , class LatencyBufferType , class RawDataProcessorType , class InputDataType = ReadoutType>
using dunedaq::snbmodules::SNBDataHandlingModel< ReadoutType, RequestHandlerType, LatencyBufferType, RawDataProcessorType, InputDataType >::num_post_processing_delay_max_waits_t
protected
Initial value:
std::remove_const<
std::invoke_result<decltype(&metric_t::num_post_processing_delay_max_waits), metric_t>::type>::type

Definition at line 191 of file SNBDataHandlingModel.hpp.

◆ num_request_t

template<class ReadoutType , class RequestHandlerType , class LatencyBufferType , class RawDataProcessorType , class InputDataType = ReadoutType>
using dunedaq::snbmodules::SNBDataHandlingModel< ReadoutType, RequestHandlerType, LatencyBufferType, RawDataProcessorType, InputDataType >::num_request_t = std::remove_const<std::invoke_result<decltype(&metric_t::num_requests), metric_t>::type>::type
protected

Definition at line 185 of file SNBDataHandlingModel.hpp.

◆ raw_receiver_ct

template<class ReadoutType , class RequestHandlerType , class LatencyBufferType , class RawDataProcessorType , class InputDataType = ReadoutType>
using dunedaq::snbmodules::SNBDataHandlingModel< ReadoutType, RequestHandlerType, LatencyBufferType, RawDataProcessorType, InputDataType >::raw_receiver_ct = iomanager::ReceiverConcept<InputDataType>
protected

Definition at line 209 of file SNBDataHandlingModel.hpp.

◆ rawq_timeout_count_t

template<class ReadoutType , class RequestHandlerType , class LatencyBufferType , class RawDataProcessorType , class InputDataType = ReadoutType>
using dunedaq::snbmodules::SNBDataHandlingModel< ReadoutType, RequestHandlerType, LatencyBufferType, RawDataProcessorType, InputDataType >::rawq_timeout_count_t
protected
Initial value:
std::remove_const<std::invoke_result<decltype(&metric_t::num_data_input_timeouts), metric_t>::type>::type

Definition at line 187 of file SNBDataHandlingModel.hpp.

◆ RDT

template<class ReadoutType , class RequestHandlerType , class LatencyBufferType , class RawDataProcessorType , class InputDataType = ReadoutType>
using dunedaq::snbmodules::SNBDataHandlingModel< ReadoutType, RequestHandlerType, LatencyBufferType, RawDataProcessorType, InputDataType >::RDT = ReadoutType

Definition at line 75 of file SNBDataHandlingModel.hpp.

◆ request_receiver_ct

template<class ReadoutType , class RequestHandlerType , class LatencyBufferType , class RawDataProcessorType , class InputDataType = ReadoutType>
using dunedaq::snbmodules::SNBDataHandlingModel< ReadoutType, RequestHandlerType, LatencyBufferType, RawDataProcessorType, InputDataType >::request_receiver_ct = iomanager::ReceiverConcept<dfmessages::DataRequest>
protected

Definition at line 214 of file SNBDataHandlingModel.hpp.

◆ RHT

template<class ReadoutType , class RequestHandlerType , class LatencyBufferType , class RawDataProcessorType , class InputDataType = ReadoutType>
using dunedaq::snbmodules::SNBDataHandlingModel< ReadoutType, RequestHandlerType, LatencyBufferType, RawDataProcessorType, InputDataType >::RHT = RequestHandlerType

Definition at line 76 of file SNBDataHandlingModel.hpp.

◆ RPT

template<class ReadoutType , class RequestHandlerType , class LatencyBufferType , class RawDataProcessorType , class InputDataType = ReadoutType>
using dunedaq::snbmodules::SNBDataHandlingModel< ReadoutType, RequestHandlerType, LatencyBufferType, RawDataProcessorType, InputDataType >::RPT = RawDataProcessorType

Definition at line 78 of file SNBDataHandlingModel.hpp.

◆ sum_payload_t

template<class ReadoutType , class RequestHandlerType , class LatencyBufferType , class RawDataProcessorType , class InputDataType = ReadoutType>
using dunedaq::snbmodules::SNBDataHandlingModel< ReadoutType, RequestHandlerType, LatencyBufferType, RawDataProcessorType, InputDataType >::sum_payload_t = std::remove_const<std::invoke_result<decltype(&metric_t::sum_payloads), metric_t>::type>::type
protected

Definition at line 184 of file SNBDataHandlingModel.hpp.

◆ sum_request_t

template<class ReadoutType , class RequestHandlerType , class LatencyBufferType , class RawDataProcessorType , class InputDataType = ReadoutType>
using dunedaq::snbmodules::SNBDataHandlingModel< ReadoutType, RequestHandlerType, LatencyBufferType, RawDataProcessorType, InputDataType >::sum_request_t = std::remove_const<std::invoke_result<decltype(&metric_t::sum_requests), metric_t>::type>::type
protected

Definition at line 186 of file SNBDataHandlingModel.hpp.

◆ timestamp_t

template<class ReadoutType , class RequestHandlerType , class LatencyBufferType , class RawDataProcessorType , class InputDataType = ReadoutType>
using dunedaq::snbmodules::SNBDataHandlingModel< ReadoutType, RequestHandlerType, LatencyBufferType, RawDataProcessorType, InputDataType >::timestamp_t = std::uint64_t

Definition at line 82 of file SNBDataHandlingModel.hpp.

◆ timesync_sender_ct

template<class ReadoutType , class RequestHandlerType , class LatencyBufferType , class RawDataProcessorType , class InputDataType = ReadoutType>
using dunedaq::snbmodules::SNBDataHandlingModel< ReadoutType, RequestHandlerType, LatencyBufferType, RawDataProcessorType, InputDataType >::timesync_sender_ct = iomanager::SenderConcept<dfmessages::TimeSync>
protected

Definition at line 223 of file SNBDataHandlingModel.hpp.

Constructor & Destructor Documentation

◆ SNBDataHandlingModel()

template<class ReadoutType , class RequestHandlerType , class LatencyBufferType , class RawDataProcessorType , class InputDataType = ReadoutType>
dunedaq::snbmodules::SNBDataHandlingModel< ReadoutType, RequestHandlerType, LatencyBufferType, RawDataProcessorType, InputDataType >::SNBDataHandlingModel ( std::atomic< bool > & run_marker)
inlineexplicit

Definition at line 89 of file SNBDataHandlingModel.hpp.

91 , m_callback_mode(false)
92 , m_fake_trigger(false)
97 , m_raw_data_receiver(nullptr)
99 , m_latency_buffer_impl(nullptr)
100 , m_raw_processor_impl(nullptr)
101 {
102 }
std::shared_ptr< raw_receiver_ct > m_raw_data_receiver
std::shared_ptr< RawDataProcessorType > m_raw_processor_impl
std::shared_ptr< LatencyBufferType > m_latency_buffer_impl
std::atomic< bool > run_marker
Global atomic for process lifetime.

◆ ~SNBDataHandlingModel()

template<class ReadoutType , class RequestHandlerType , class LatencyBufferType , class RawDataProcessorType , class InputDataType = ReadoutType>
virtual dunedaq::snbmodules::SNBDataHandlingModel< ReadoutType, RequestHandlerType, LatencyBufferType, RawDataProcessorType, InputDataType >::~SNBDataHandlingModel ( )
virtualdefault

Member Function Documentation

◆ conf()

template<class RDT , class RHT , class LBT , class RPT , class IDT >
void dunedaq::snbmodules::SNBDataHandlingModel< RDT, RHT, LBT, RPT, IDT >::conf ( const appfwk::DAQModule::CommandData_t & args)
virtual

Implements dunedaq::datahandlinglibs::DataHandlingConcept.

Definition at line 102 of file SNBDataHandlingModel.hxx.

103{
104 // Register callbacks if operating in that mode.
105 if (m_callback_mode) {
106 // Configure and register consume callback
108 std::bind(&SNBDataHandlingModel<RDT, RHT, LBT, RPT, IDT>::consume_callback, this, std::placeholders::_1);
109
110 // Register callback
113 }
114
115 // Configure threads:
119 }
122 m_timekeeper = std::make_unique<folly::ThreadWheelTimekeeper>();
123 }
124}
static std::shared_ptr< DataMoveCallbackRegistry > get()
std::unique_ptr< folly::ThreadWheelTimekeeper > m_timekeeper
void set_name(const std::string &name, int tid)
ID_t id
Unique identifier of the source of the data.
Definition SourceID.hpp:74

◆ consume_callback()

template<class RDT , class RHT , class LBT , class RPT , class IDT >
void dunedaq::snbmodules::SNBDataHandlingModel< RDT, RHT, LBT, RPT, IDT >::consume_callback ( IDT && payload)
protected

Definition at line 244 of file SNBDataHandlingModel.hxx.

245{
246 transform_and_process(std::move(payload));
247}

◆ dispatch_requests()

template<class RDT , class RHT , class LBT , class RPT , class IDT >
void dunedaq::snbmodules::SNBDataHandlingModel< RDT, RHT, LBT, RPT, IDT >::dispatch_requests ( dfmessages::DataRequest & data_request)
protected

Definition at line 477 of file SNBDataHandlingModel.hxx.

478{
479 if (data_request.request_information.component != m_sourceid) {
481 datahandlinglibs::RequestSourceIDMismatch(ERS_HERE, m_sourceid, data_request.request_information.component));
482 return;
483 }
484 TLOG_DEBUG(TLVL_QUEUE_POP) << "Received DataRequest" << " for trig/seq_number " << data_request.trigger_number << "."
485 << data_request.sequence_number << ", runno " << data_request.run_number
486 << ", trig timestamp " << data_request.trigger_timestamp
487 << ", SourceID: " << data_request.request_information.component << ", window begin/end "
488 << data_request.request_information.window_begin << "/"
489 << data_request.request_information.window_end
490 << ", dest: " << data_request.data_destination;
491 m_request_handler_impl->issue_request(data_request);
494}
#define ERS_HERE
std::shared_ptr< RequestHandlerType > m_request_handler_impl
#define TLOG_DEBUG(lvl,...)
Definition Logging.hpp:112
void error(const Issue &issue)
Definition ers.hpp:81

◆ generate_opmon_data()

template<class RDT , class RHT , class LBT , class RPT , class IDT >
void dunedaq::snbmodules::SNBDataHandlingModel< RDT, RHT, LBT, RPT, IDT >::generate_opmon_data ( )
overrideprotectedvirtual

Hook for customisable pubblication. The function can throw, exception will be caught by the monitoring thread

Reimplemented from dunedaq::opmonlib::MonitorableObject.

Definition at line 196 of file SNBDataHandlingModel.hxx.

197{
198 datahandlinglibs::opmon::DataHandlerInfo ri;
199 ri.set_sum_payloads(m_sum_payloads.load());
200 ri.set_num_payloads(m_num_payloads.exchange(0));
201
202 ri.set_num_data_input_timeouts(m_rawq_timeout_count.exchange(0));
203
204 auto now = std::chrono::high_resolution_clock::now();
205 int new_packets = m_stats_packet_count.exchange(0);
206 double seconds = std::chrono::duration_cast<std::chrono::microseconds>(now - m_t0).count() / 1000000.;
207 m_t0 = now;
208
209 // 08-May-2025, KAB: added a message to warn users when latency buffer inserts are failing.
210 int local_num_lb_insert_failures = m_num_lb_insert_failures.exchange(0);
211 if (local_num_lb_insert_failures != 0) {
212 ers::warning(datahandlinglibs::NonZeroLatencyBufferInsertFailures(
213 ERS_HERE, m_sourceid, local_num_lb_insert_failures, ri.num_payloads()));
214 }
215
216 ri.set_rate_payloads_consumed(new_packets / seconds / 1000.);
217 ri.set_num_lb_insert_failures(local_num_lb_insert_failures);
218 ri.set_sum_requests(m_sum_requests.load());
219 ri.set_num_requests(m_num_requests.exchange(0));
220 ri.set_num_post_processing_delay_max_waits(m_num_post_processing_delay_max_waits.exchange(0));
221 ri.set_last_daq_timestamp(m_raw_processor_impl->get_last_daq_time());
222 ri.set_newest_timestamp(m_raw_processor_impl->get_last_daq_time());
223 ri.set_oldest_timestamp(m_request_handler_impl->get_oldest_time());
224
225 this->publish(std::move(ri));
226}
void publish(google::protobuf::Message &&, CustomOrigin &&co={}, OpMonLevel l=to_level(EntryOpMonLevel::kDefault)) const noexcept
std::atomic< num_lb_insert_failures_t > m_num_lb_insert_failures
std::atomic< rawq_timeout_count_t > m_rawq_timeout_count
std::chrono::time_point< std::chrono::high_resolution_clock > m_t0
std::atomic< num_post_processing_delay_max_waits_t > m_num_post_processing_delay_max_waits
static int64_t now()
void warning(const Issue &issue)
Definition ers.hpp:115

◆ init()

template<class RDT , class RHT , class LBT , class RPT , class IDT >
void dunedaq::snbmodules::SNBDataHandlingModel< RDT, RHT, LBT, RPT, IDT >::init ( const appmodel::DataHandlerModule * mcfg)
virtual

Forward calls from the appfwk.

Implements dunedaq::datahandlinglibs::DataHandlingConcept.

Definition at line 13 of file SNBDataHandlingModel.hxx.

14{
15 // Setup request queues
16 // setup_request_queues(mcfg);
17 try {
18 for (auto input : mcfg->get_inputs()) {
19 if (input->get_data_type() == "DataRequest") {
21 } else {
23 // Parse for prefix
24 std::string conn_name = input->UID();
25 const char delim = '_';
26 std::vector<std::string> words;
27 std::size_t start;
28 std::size_t end = 0;
29 while ((start = conn_name.find_first_not_of(delim, end)) != std::string::npos) {
30 end = conn_name.find(delim, start);
31 words.push_back(conn_name.substr(start, end - start));
32 }
33
34 TLOG_DEBUG(TLVL_WORK_STEPS) << "Initialize connection based on uid: " << m_raw_data_receiver_connection_name
35 << " front word: " << words.front();
36
37 std::string cb_prefix("cb");
38 if (words.front() == cb_prefix) {
39 m_callback_mode = true;
40 }
41
42 if (!m_callback_mode) {
44 m_raw_receiver_timeout_ms = std::chrono::milliseconds(input->get_recv_timeout_ms());
45 }
46 }
47 }
48 for (auto output : mcfg->get_outputs()) {
49 if (output->get_data_type() == "TimeSync") {
52 m_timesync_connection_name = output->UID();
53 break;
54 }
55 }
56 } catch (const ers::Issue& excpt) {
57 throw datahandlinglibs::ResourceQueueError(ERS_HERE, "raw_input or frag_output", "SNBDataHandlingModel", excpt);
58 }
59
60 // Raw input connection sensibility check
61 if (!m_callback_mode && m_raw_data_receiver == nullptr) {
62 ers::error(datahandlinglibs::ConfigurationError(ERS_HERE, m_sourceid, "Non callback mode, and receiver is unset!"));
63 }
64
65 // Instantiate functionalities
66 m_error_registry.reset(new datahandlinglibs::FrameErrorRegistry());
67 m_error_registry->set_ers_metadata("DLH of SourceID[" + std::to_string(mcfg->get_source_id()) + "] ");
68 m_latency_buffer_impl.reset(new LBT());
69 m_raw_processor_impl.reset(new RPT(m_error_registry, mcfg->get_post_processing_enabled()));
71
72 register_node(mcfg->get_module_configuration()->get_latency_buffer()->UID(), m_latency_buffer_impl);
73 register_node(mcfg->get_module_configuration()->get_data_processor()->UID(), m_raw_processor_impl);
74 register_node(mcfg->get_module_configuration()->get_request_handler()->UID(), m_request_handler_impl);
75
76 // m_request_handler_impl->init(args);
77 // m_raw_processor_impl->init(args);
79 m_fake_trigger = false;
80 m_raw_receiver_sleep_us = std::chrono::microseconds::zero();
81 m_sourceid.id = mcfg->get_source_id();
82 m_sourceid.subsystem = RDT::subsystem;
83 m_processing_delay_ticks = mcfg->get_module_configuration()->get_post_processing_delay_ticks();
84 m_post_processing_delay_min_wait = mcfg->get_module_configuration()->get_post_processing_delay_min_wait();
85 m_post_processing_delay_max_wait = mcfg->get_module_configuration()->get_post_processing_delay_max_wait();
86
87 // Configure implementations:
88 m_raw_processor_impl->conf(mcfg);
89 // Configure the latency buffer before the request handler so the request handler can check for alignment
90 // restrictions
91 try {
92 m_latency_buffer_impl->conf(mcfg->get_module_configuration()->get_latency_buffer());
93 } catch (const std::bad_alloc& be) {
95 datahandlinglibs::ConfigurationError(ERS_HERE, m_sourceid, "Latency Buffer can't be allocated with size!"));
96 }
97 m_request_handler_impl->conf(mcfg);
98}
void register_node(ElementId name, NewNodePtr)
void start(const appfwk::DAQModule::CommandData_t &args)
std::shared_ptr< timesync_sender_ct > m_timesync_sender
std::shared_ptr< request_receiver_ct > m_data_request_receiver
std::unique_ptr< datahandlinglibs::FrameErrorRegistry > m_error_registry
Base class for any user define issue.
Definition Issue.hpp:69
static std::shared_ptr< iomanager::SenderConcept< Datatype > > get_iom_sender(iomanager::ConnectionId const &id)
static std::shared_ptr< iomanager::ReceiverConcept< Datatype > > get_iom_receiver(iomanager::ConnectionId const &id)
Subsystem subsystem
The general subsystem of the source of the data.
Definition SourceID.hpp:69

◆ postprocess_schedule()

template<class RDT , class RHT , class LBT , class RPT , class IDT >
folly::coro::Task< void > dunedaq::snbmodules::SNBDataHandlingModel< RDT, RHT, LBT, RPT, IDT >::postprocess_schedule ( )
protected

Definition at line 326 of file SNBDataHandlingModel.hxx.

327{
328
329 // TLOG_DEBUG(TLVL_WORK_STEPS) << "Postprocess schedule coroutine started...";
330 timestamp_t newest_ts = 0;
331 timestamp_t end_win_ts = 0;
332 bool first_cycle = true;
333 auto last_post_proc_time = std::chrono::system_clock::now();
334 auto now = last_post_proc_time;
335 std::chrono::milliseconds milliseconds;
336 RDT processed_element;
337
338 // Deferral of the post processing, to allow elements being reordered in the LB
339 // Basically, find data older than a certain timestamp and process all data since the last post-processed element up
340 // to that value
341 while (m_run_marker.load()) {
342 try {
343 co_await folly::coro::timeout(
344 m_baton.operator co_await(), std::chrono::milliseconds{ m_post_processing_delay_max_wait }, m_timekeeper.get());
345 m_baton.reset();
346 } catch (const folly::FutureTimeout&) {
348 }
349
350 if (m_latency_buffer_impl->occupancy() == 0) {
351 continue;
352 }
353
354 now = std::chrono::system_clock::now();
355 milliseconds = std::chrono::duration_cast<std::chrono::milliseconds>(now - last_post_proc_time);
356
357 if (static_cast<uint64_t>(milliseconds.count()) <= m_post_processing_delay_min_wait) {
358 continue;
359 }
360
361 last_post_proc_time = now;
362
363 // Get the LB boundaries
364 auto tail = m_latency_buffer_impl->back();
365 newest_ts = tail->get_timestamp();
366
367 if (first_cycle) {
368 auto head = m_latency_buffer_impl->front();
369 processed_element.set_timestamp(head->get_timestamp());
370 first_cycle = false;
371 // TLOG() << "***** First pass post processing *****";
372 }
373
374 if (newest_ts - processed_element.get_timestamp() > m_processing_delay_ticks) {
375 end_win_ts = newest_ts - m_processing_delay_ticks;
376 auto start_iter = m_latency_buffer_impl->lower_bound(processed_element, false);
377 processed_element.set_timestamp(end_win_ts);
378 auto end_iter = m_latency_buffer_impl->lower_bound(processed_element, false);
379
380 for (auto it = start_iter; it != end_iter; ++it) {
381 m_raw_processor_impl->postprocess_item(&(*it));
385 }
386 }
387 }
388}

◆ process_item()

template<class RDT , class RHT , class LBT , class RPT , class IDT >
void dunedaq::snbmodules::SNBDataHandlingModel< RDT, RHT, LBT, RPT, IDT >::process_item ( RDT && payload)
protected

Definition at line 251 of file SNBDataHandlingModel.hxx.

252{
253 m_raw_processor_impl->preprocess_item(&payload);
255 int64_t diff1 = payload.get_timestamp() - m_request_handler_impl->get_cutoff_timestamp();
256 if (diff1 <= 0) {
257 // m_request_handler_impl->increment_tardy_tp_count();
258 ers::warning(datahandlinglibs::DataPacketArrivedTooLate(ERS_HERE,
261 payload.get_timestamp(),
262 m_request_handler_impl->get_cutoff_timestamp(),
263 diff1,
264 (static_cast<double>(diff1) / 62500.0)));
265 }
266 }
267 while (m_latency_buffer_impl->isFull()) {
268 std::this_thread::sleep_for(std::chrono::milliseconds(1));
269 }
270 if (!m_latency_buffer_impl->write(std::move(payload))) {
271 // TLOG_DEBUG(TLVL_TAKE_NOTE) << "***ERROR: Latency buffer insert failed! (Payload timestamp=" <<
272 // payload.get_timestamp() << ")";
274 return;
275 }
276
277 if (m_processing_delay_ticks == 0) {
278 m_raw_processor_impl->postprocess_item(m_latency_buffer_impl->back());
282 } else {
283 m_baton.post();
284 }
285}

◆ record()

template<class ReadoutType , class RequestHandlerType , class LatencyBufferType , class RawDataProcessorType , class InputDataType = ReadoutType>
void dunedaq::snbmodules::SNBDataHandlingModel< ReadoutType, RequestHandlerType, LatencyBufferType, RawDataProcessorType, InputDataType >::record ( const appfwk::DAQModule::CommandData_t & args)
inlineoverridevirtual

Implements dunedaq::datahandlinglibs::DataHandlingConcept.

Definition at line 127 of file SNBDataHandlingModel.hpp.

127{ m_request_handler_impl->record(args); }

◆ run_consume()

template<class RDT , class RHT , class LBT , class RPT , class IDT >
void dunedaq::snbmodules::SNBDataHandlingModel< RDT, RHT, LBT, RPT, IDT >::run_consume ( )
protectedvirtual

Function that will be run in its own thread to read the raw packets from the connection and add them to the LB.

Implements dunedaq::datahandlinglibs::DataHandlingConcept.

Definition at line 296 of file SNBDataHandlingModel.hxx.

297{
298
299 TLOG_DEBUG(TLVL_WORK_STEPS) << "Consumer thread started...";
301 m_num_payloads = 0;
302 m_sum_payloads = 0;
305
306 while (m_run_marker.load()) {
307 // Try to acquire data
308
309 auto opt_payload = m_raw_data_receiver->try_receive(m_raw_receiver_timeout_ms);
310
311 if (opt_payload) {
312 IDT& payload = opt_payload.value();
313 transform_and_process(std::move(payload));
314 } else {
316 // Protection against a zero sleep becoming a yield
317 if (m_raw_receiver_sleep_us != std::chrono::microseconds::zero())
318 std::this_thread::sleep_for(m_raw_receiver_sleep_us);
319 }
320 }
321 TLOG_DEBUG(TLVL_WORK_STEPS) << "Consumer thread joins... ";
322}

◆ run_postprocess_scheduler()

template<class RDT , class RHT , class LBT , class RPT , class IDT >
void dunedaq::snbmodules::SNBDataHandlingModel< RDT, RHT, LBT, RPT, IDT >::run_postprocess_scheduler ( )
protected

Definition at line 289 of file SNBDataHandlingModel.hxx.

290{
291 folly::coro::blockingWait(postprocess_schedule());
292}

◆ run_timesync()

template<class RDT , class RHT , class LBT , class RPT , class IDT >
void dunedaq::snbmodules::SNBDataHandlingModel< RDT, RHT, LBT, RPT, IDT >::run_timesync ( )
protectedvirtual

Function that will be run in its own thread and sends periodic timesync messages by pushing them to the connection.

Implements dunedaq::datahandlinglibs::DataHandlingConcept.

Definition at line 392 of file SNBDataHandlingModel.hxx.

393{
394 TLOG_DEBUG(TLVL_WORK_STEPS) << "TimeSync thread started...";
395 m_num_requests = 0;
396 m_sum_requests = 0;
397 uint64_t msg_seqno = 0;
398 timestamp_t prev_timestamp = 0;
399 auto once_per_run = true;
400 size_t zero_timestamp_count = 0;
401 size_t duplicate_timestamp_count = 0;
402 size_t total_timestamp_count = 0;
403 while (m_run_marker.load()) {
404 try {
405 auto timesyncmsg = dfmessages::TimeSync(m_raw_processor_impl->get_last_daq_time());
406 ++total_timestamp_count;
407 // daq_time is zero for the first received timesync, and may
408 // be the same as the previous daq_time if the data has
409 // stopped flowing. In both cases we don't send the TimeSync
410 if (timesyncmsg.daq_time != 0 && timesyncmsg.daq_time != prev_timestamp) {
411 prev_timestamp = timesyncmsg.daq_time;
412 timesyncmsg.run_number = m_run_number;
413 timesyncmsg.sequence_number = ++msg_seqno;
414 timesyncmsg.source_id = m_sourceid.id;
415 TLOG_DEBUG(TLVL_TIME_SYNCS) << "New timesync: daq=" << timesyncmsg.daq_time
416 << " wall=" << timesyncmsg.system_time << " run=" << timesyncmsg.run_number
417 << " seqno=" << timesyncmsg.sequence_number
418 << " source_id=" << timesyncmsg.source_id;
419 try {
420 dfmessages::TimeSync timesyncmsg_copy(timesyncmsg);
421 m_timesync_sender->send(std::move(timesyncmsg_copy), std::chrono::milliseconds(500));
422 } catch (ers::Issue& excpt) {
424 datahandlinglibs::TimeSyncTransmissionFailed(ERS_HERE, m_sourceid, m_timesync_connection_name, excpt));
425 }
426
427 if (m_fake_trigger) {
428 dfmessages::DataRequest dr;
430 dr.trigger_number = m_current_fake_trigger_id;
431 dr.trigger_timestamp = timesyncmsg.daq_time > 500 * us ? timesyncmsg.daq_time - 500 * us : 0;
432 auto width = 300000;
433 uint offset = 100;
434 dr.request_information.window_begin = dr.trigger_timestamp > offset ? dr.trigger_timestamp - offset : 0;
435 dr.request_information.window_end = dr.request_information.window_begin + width;
436 dr.request_information.component = m_sourceid;
437 dr.data_destination = "data_fragments_q";
438 TLOG_DEBUG(TLVL_WORK_STEPS) << "Issuing fake trigger based on timesync. " << " ts=" << dr.trigger_timestamp
439 << " window_begin=" << dr.request_information.window_begin
440 << " window_end=" << dr.request_information.window_end;
441 m_request_handler_impl->issue_request(dr);
442
445 }
446 } else {
447 if (timesyncmsg.daq_time == 0) {
448 ++zero_timestamp_count;
449 }
450 if (timesyncmsg.daq_time == prev_timestamp) {
451 ++duplicate_timestamp_count;
452 }
453 if (once_per_run) {
454 TLOG() << "Timesync with DAQ time 0 won't be sent out as it's an invalid sync.";
455 once_per_run = false;
456 }
457 }
458 } catch (const iomanager::TimeoutExpired& excpt) {
459 // ++m_timesyncqueue_timeout;
460 }
461 // Split up the 100ms sleep into 10 sleeps of 10ms, so we respond to "stop" quicker
462 for (size_t i = 0; i < 10; ++i) {
463 std::this_thread::sleep_for(std::chrono::milliseconds(10));
464 if (!m_run_marker.load()) {
465 break;
466 }
467 }
468 }
469 once_per_run = true;
470 TLOG_DEBUG(TLVL_WORK_STEPS) << "TimeSync thread joins... (timestamp count, zero/same/total = "
471 << zero_timestamp_count << "/" << duplicate_timestamp_count << "/"
472 << total_timestamp_count << ")";
473}
double offset
#define TLOG(...)
Definition macro.hpp:22

◆ scrap()

template<class ReadoutType , class RequestHandlerType , class LatencyBufferType , class RawDataProcessorType , class InputDataType = ReadoutType>
void dunedaq::snbmodules::SNBDataHandlingModel< ReadoutType, RequestHandlerType, LatencyBufferType, RawDataProcessorType, InputDataType >::scrap ( const appfwk::DAQModule::CommandData_t & args)
inlinevirtual

Implements dunedaq::datahandlinglibs::DataHandlingConcept.

Definition at line 113 of file SNBDataHandlingModel.hpp.

114 {
115 m_request_handler_impl->scrap(args);
116 m_latency_buffer_impl->scrap(args);
117 m_raw_processor_impl->scrap(args);
118 }

◆ start()

template<class RDT , class RHT , class LBT , class RPT , class IDT >
void dunedaq::snbmodules::SNBDataHandlingModel< RDT, RHT, LBT, RPT, IDT >::start ( const appfwk::DAQModule::CommandData_t & args)
virtual

Implements dunedaq::datahandlinglibs::DataHandlingConcept.

Definition at line 128 of file SNBDataHandlingModel.hxx.

129{
130 // Reset opmon variables
131 m_sum_payloads = 0;
132 m_num_payloads = 0;
133 m_sum_requests = 0;
134 m_num_requests = 0;
139
140 m_t0 = std::chrono::high_resolution_clock::now();
141
143
144 TLOG_DEBUG(TLVL_WORK_STEPS) << "Starting threads...";
145 m_raw_processor_impl->start(args);
146 m_request_handler_impl->start(args);
147 if (!m_callback_mode) {
149 }
152 }
155 this);
156 }
157 // Register callback to receive and dispatch data requests
158 m_data_request_receiver->add_callback(
159 std::bind(&SNBDataHandlingModel<RDT, RHT, LBT, RPT, IDT>::dispatch_requests, this, std::placeholders::_1));
160}
void dispatch_requests(dfmessages::DataRequest &data_request)
void run_timesync()
Function that will be run in its own thread and sends periodic timesync messages by pushing them to t...
void run_consume()
Function that will be run in its own thread to read the raw packets from the connection and add them ...
bool set_work(Function &&f, Args &&... args)
uint32_t run_number_t
Type used to represent run number.
Definition Types.hpp:20

◆ stop()

template<class RDT , class RHT , class LBT , class RPT , class IDT >
void dunedaq::snbmodules::SNBDataHandlingModel< RDT, RHT, LBT, RPT, IDT >::stop ( const appfwk::DAQModule::CommandData_t & args)
virtual

Implements dunedaq::datahandlinglibs::DataHandlingConcept.

Definition at line 164 of file SNBDataHandlingModel.hxx.

165{
166 TLOG_DEBUG(TLVL_WORK_STEPS) << "Stoppping threads...";
167
168 // Stop receiving data requests as first thing
169 m_data_request_receiver->remove_callback();
170 // Stop the other threads
171 m_request_handler_impl->stop(args);
174 std::this_thread::sleep_for(std::chrono::milliseconds(10));
175 }
176 }
177 if (!m_callback_mode) {
179 std::this_thread::sleep_for(std::chrono::milliseconds(10));
180 }
181 }
183 m_baton.post(); // In case the coroutine is still waiting when the consumer has stopped
185 std::this_thread::sleep_for(std::chrono::milliseconds(10));
186 }
187 }
188 TLOG_DEBUG(TLVL_WORK_STEPS) << "Flushing latency buffer with occupancy: " << m_latency_buffer_impl->occupancy();
189 m_latency_buffer_impl->flush();
190 m_raw_processor_impl->stop(args);
191 m_raw_processor_impl->reset_last_daq_time();
192}

◆ transform_and_process()

template<class RDT , class RHT , class LBT , class RPT , class IDT >
void dunedaq::snbmodules::SNBDataHandlingModel< RDT, RHT, LBT, RPT, IDT >::transform_and_process ( IDT && payload)
protected

Definition at line 230 of file SNBDataHandlingModel.hxx.

231{
232 if constexpr (std::is_same_v<IDT, RDT>) {
233 process_item(std::move(payload));
234 } else {
235 auto transformed = transform_payload(payload);
236 for (auto& i : transformed) {
237 process_item(std::move(i));
238 }
239 }
240}
virtual std::vector< RDT > transform_payload(IDT &original) const

◆ transform_payload()

template<class ReadoutType , class RequestHandlerType , class LatencyBufferType , class RawDataProcessorType , class InputDataType = ReadoutType>
virtual std::vector< RDT > dunedaq::snbmodules::SNBDataHandlingModel< ReadoutType, RequestHandlerType, LatencyBufferType, RawDataProcessorType, InputDataType >::transform_payload ( IDT & original) const
inlineprotectedvirtual

Definition at line 161 of file SNBDataHandlingModel.hpp.

161{ return { reinterpret_cast<RDT&>(original) }; }

Member Data Documentation

◆ m_baton

template<class ReadoutType , class RequestHandlerType , class LatencyBufferType , class RawDataProcessorType , class InputDataType = ReadoutType>
folly::coro::Baton dunedaq::snbmodules::SNBDataHandlingModel< ReadoutType, RequestHandlerType, LatencyBufferType, RawDataProcessorType, InputDataType >::m_baton
protected

Definition at line 230 of file SNBDataHandlingModel.hpp.

◆ m_callback_mode

template<class ReadoutType , class RequestHandlerType , class LatencyBufferType , class RawDataProcessorType , class InputDataType = ReadoutType>
bool dunedaq::snbmodules::SNBDataHandlingModel< ReadoutType, RequestHandlerType, LatencyBufferType, RawDataProcessorType, InputDataType >::m_callback_mode
protected

Definition at line 171 of file SNBDataHandlingModel.hpp.

◆ m_consume_callback

template<class ReadoutType , class RequestHandlerType , class LatencyBufferType , class RawDataProcessorType , class InputDataType = ReadoutType>
std::function<void(IDT&&)> dunedaq::snbmodules::SNBDataHandlingModel< ReadoutType, RequestHandlerType, LatencyBufferType, RawDataProcessorType, InputDataType >::m_consume_callback

Definition at line 133 of file SNBDataHandlingModel.hpp.

◆ m_consumer_thread

template<class ReadoutType , class RequestHandlerType , class LatencyBufferType , class RawDataProcessorType , class InputDataType = ReadoutType>
utilities::ReusableThread dunedaq::snbmodules::SNBDataHandlingModel< ReadoutType, RequestHandlerType, LatencyBufferType, RawDataProcessorType, InputDataType >::m_consumer_thread
protected

Definition at line 204 of file SNBDataHandlingModel.hpp.

◆ m_current_fake_trigger_id

template<class ReadoutType , class RequestHandlerType , class LatencyBufferType , class RawDataProcessorType , class InputDataType = ReadoutType>
int dunedaq::snbmodules::SNBDataHandlingModel< ReadoutType, RequestHandlerType, LatencyBufferType, RawDataProcessorType, InputDataType >::m_current_fake_trigger_id
protected

Definition at line 174 of file SNBDataHandlingModel.hpp.

◆ m_data_request_receiver

template<class ReadoutType , class RequestHandlerType , class LatencyBufferType , class RawDataProcessorType , class InputDataType = ReadoutType>
std::shared_ptr<request_receiver_ct> dunedaq::snbmodules::SNBDataHandlingModel< ReadoutType, RequestHandlerType, LatencyBufferType, RawDataProcessorType, InputDataType >::m_data_request_receiver
protected

Definition at line 215 of file SNBDataHandlingModel.hpp.

◆ m_error_registry

template<class ReadoutType , class RequestHandlerType , class LatencyBufferType , class RawDataProcessorType , class InputDataType = ReadoutType>
std::unique_ptr<datahandlinglibs::FrameErrorRegistry> dunedaq::snbmodules::SNBDataHandlingModel< ReadoutType, RequestHandlerType, LatencyBufferType, RawDataProcessorType, InputDataType >::m_error_registry
protected

Definition at line 244 of file SNBDataHandlingModel.hpp.

◆ m_fake_trigger

template<class ReadoutType , class RequestHandlerType , class LatencyBufferType , class RawDataProcessorType , class InputDataType = ReadoutType>
bool dunedaq::snbmodules::SNBDataHandlingModel< ReadoutType, RequestHandlerType, LatencyBufferType, RawDataProcessorType, InputDataType >::m_fake_trigger
protected

Definition at line 172 of file SNBDataHandlingModel.hpp.

◆ m_generate_timesync

template<class ReadoutType , class RequestHandlerType , class LatencyBufferType , class RawDataProcessorType , class InputDataType = ReadoutType>
bool dunedaq::snbmodules::SNBDataHandlingModel< ReadoutType, RequestHandlerType, LatencyBufferType, RawDataProcessorType, InputDataType >::m_generate_timesync = false
protected

Definition at line 173 of file SNBDataHandlingModel.hpp.

◆ m_latency_buffer_impl

template<class ReadoutType , class RequestHandlerType , class LatencyBufferType , class RawDataProcessorType , class InputDataType = ReadoutType>
std::shared_ptr<LatencyBufferType> dunedaq::snbmodules::SNBDataHandlingModel< ReadoutType, RequestHandlerType, LatencyBufferType, RawDataProcessorType, InputDataType >::m_latency_buffer_impl
protected

Definition at line 234 of file SNBDataHandlingModel.hpp.

◆ m_num_lb_insert_failures

template<class ReadoutType , class RequestHandlerType , class LatencyBufferType , class RawDataProcessorType , class InputDataType = ReadoutType>
std::atomic<num_lb_insert_failures_t> dunedaq::snbmodules::SNBDataHandlingModel< ReadoutType, RequestHandlerType, LatencyBufferType, RawDataProcessorType, InputDataType >::m_num_lb_insert_failures { 0 }
protected

Definition at line 199 of file SNBDataHandlingModel.hpp.

199{ 0 };

◆ m_num_payloads

template<class ReadoutType , class RequestHandlerType , class LatencyBufferType , class RawDataProcessorType , class InputDataType = ReadoutType>
std::atomic<num_payload_t> dunedaq::snbmodules::SNBDataHandlingModel< ReadoutType, RequestHandlerType, LatencyBufferType, RawDataProcessorType, InputDataType >::m_num_payloads { 0 }
protected

Definition at line 194 of file SNBDataHandlingModel.hpp.

194{ 0 };

◆ m_num_post_processing_delay_max_waits

template<class ReadoutType , class RequestHandlerType , class LatencyBufferType , class RawDataProcessorType , class InputDataType = ReadoutType>
std::atomic<num_post_processing_delay_max_waits_t> dunedaq::snbmodules::SNBDataHandlingModel< ReadoutType, RequestHandlerType, LatencyBufferType, RawDataProcessorType, InputDataType >::m_num_post_processing_delay_max_waits { 0 }
protected

Definition at line 200 of file SNBDataHandlingModel.hpp.

200{ 0 };

◆ m_num_requests

template<class ReadoutType , class RequestHandlerType , class LatencyBufferType , class RawDataProcessorType , class InputDataType = ReadoutType>
std::atomic<num_request_t> dunedaq::snbmodules::SNBDataHandlingModel< ReadoutType, RequestHandlerType, LatencyBufferType, RawDataProcessorType, InputDataType >::m_num_requests { 0 }
protected

Definition at line 196 of file SNBDataHandlingModel.hpp.

196{ 0 };

◆ m_post_processing_delay_max_wait

template<class ReadoutType , class RequestHandlerType , class LatencyBufferType , class RawDataProcessorType , class InputDataType = ReadoutType>
uint64_t dunedaq::snbmodules::SNBDataHandlingModel< ReadoutType, RequestHandlerType, LatencyBufferType, RawDataProcessorType, InputDataType >::m_post_processing_delay_max_wait
protected

Definition at line 179 of file SNBDataHandlingModel.hpp.

◆ m_post_processing_delay_min_wait

template<class ReadoutType , class RequestHandlerType , class LatencyBufferType , class RawDataProcessorType , class InputDataType = ReadoutType>
uint64_t dunedaq::snbmodules::SNBDataHandlingModel< ReadoutType, RequestHandlerType, LatencyBufferType, RawDataProcessorType, InputDataType >::m_post_processing_delay_min_wait
protected

Definition at line 178 of file SNBDataHandlingModel.hpp.

◆ m_postprocess_scheduler_thread

template<class ReadoutType , class RequestHandlerType , class LatencyBufferType , class RawDataProcessorType , class InputDataType = ReadoutType>
utilities::ReusableThread dunedaq::snbmodules::SNBDataHandlingModel< ReadoutType, RequestHandlerType, LatencyBufferType, RawDataProcessorType, InputDataType >::m_postprocess_scheduler_thread
protected

Definition at line 229 of file SNBDataHandlingModel.hpp.

◆ m_processing_delay_ticks

template<class ReadoutType , class RequestHandlerType , class LatencyBufferType , class RawDataProcessorType , class InputDataType = ReadoutType>
uint64_t dunedaq::snbmodules::SNBDataHandlingModel< ReadoutType, RequestHandlerType, LatencyBufferType, RawDataProcessorType, InputDataType >::m_processing_delay_ticks
protected

Definition at line 177 of file SNBDataHandlingModel.hpp.

◆ m_raw_data_receiver

template<class ReadoutType , class RequestHandlerType , class LatencyBufferType , class RawDataProcessorType , class InputDataType = ReadoutType>
std::shared_ptr<raw_receiver_ct> dunedaq::snbmodules::SNBDataHandlingModel< ReadoutType, RequestHandlerType, LatencyBufferType, RawDataProcessorType, InputDataType >::m_raw_data_receiver
protected

Definition at line 210 of file SNBDataHandlingModel.hpp.

◆ m_raw_data_receiver_connection_name

template<class ReadoutType , class RequestHandlerType , class LatencyBufferType , class RawDataProcessorType , class InputDataType = ReadoutType>
std::string dunedaq::snbmodules::SNBDataHandlingModel< ReadoutType, RequestHandlerType, LatencyBufferType, RawDataProcessorType, InputDataType >::m_raw_data_receiver_connection_name
protected

Definition at line 211 of file SNBDataHandlingModel.hpp.

◆ m_raw_processor_impl

template<class ReadoutType , class RequestHandlerType , class LatencyBufferType , class RawDataProcessorType , class InputDataType = ReadoutType>
std::shared_ptr<RawDataProcessorType> dunedaq::snbmodules::SNBDataHandlingModel< ReadoutType, RequestHandlerType, LatencyBufferType, RawDataProcessorType, InputDataType >::m_raw_processor_impl
protected

Definition at line 237 of file SNBDataHandlingModel.hpp.

◆ m_raw_receiver_sleep_us

template<class ReadoutType , class RequestHandlerType , class LatencyBufferType , class RawDataProcessorType , class InputDataType = ReadoutType>
std::chrono::microseconds dunedaq::snbmodules::SNBDataHandlingModel< ReadoutType, RequestHandlerType, LatencyBufferType, RawDataProcessorType, InputDataType >::m_raw_receiver_sleep_us
protected

Definition at line 208 of file SNBDataHandlingModel.hpp.

◆ m_raw_receiver_timeout_ms

template<class ReadoutType , class RequestHandlerType , class LatencyBufferType , class RawDataProcessorType , class InputDataType = ReadoutType>
std::chrono::milliseconds dunedaq::snbmodules::SNBDataHandlingModel< ReadoutType, RequestHandlerType, LatencyBufferType, RawDataProcessorType, InputDataType >::m_raw_receiver_timeout_ms
protected

Definition at line 207 of file SNBDataHandlingModel.hpp.

◆ m_rawq_timeout_count

template<class ReadoutType , class RequestHandlerType , class LatencyBufferType , class RawDataProcessorType , class InputDataType = ReadoutType>
std::atomic<rawq_timeout_count_t> dunedaq::snbmodules::SNBDataHandlingModel< ReadoutType, RequestHandlerType, LatencyBufferType, RawDataProcessorType, InputDataType >::m_rawq_timeout_count { 0 }
protected

Definition at line 198 of file SNBDataHandlingModel.hpp.

198{ 0 };

◆ m_request_handler_impl

template<class ReadoutType , class RequestHandlerType , class LatencyBufferType , class RawDataProcessorType , class InputDataType = ReadoutType>
std::shared_ptr<RequestHandlerType> dunedaq::snbmodules::SNBDataHandlingModel< ReadoutType, RequestHandlerType, LatencyBufferType, RawDataProcessorType, InputDataType >::m_request_handler_impl
protected

Definition at line 240 of file SNBDataHandlingModel.hpp.

◆ m_request_handler_supports_cutoff_timestamp

template<class ReadoutType , class RequestHandlerType , class LatencyBufferType , class RawDataProcessorType , class InputDataType = ReadoutType>
bool dunedaq::snbmodules::SNBDataHandlingModel< ReadoutType, RequestHandlerType, LatencyBufferType, RawDataProcessorType, InputDataType >::m_request_handler_supports_cutoff_timestamp
protected

Definition at line 241 of file SNBDataHandlingModel.hpp.

◆ m_run_marker

template<class ReadoutType , class RequestHandlerType , class LatencyBufferType , class RawDataProcessorType , class InputDataType = ReadoutType>
std::atomic<bool>& dunedaq::snbmodules::SNBDataHandlingModel< ReadoutType, RequestHandlerType, LatencyBufferType, RawDataProcessorType, InputDataType >::m_run_marker
protected

Definition at line 167 of file SNBDataHandlingModel.hpp.

◆ m_run_number

template<class ReadoutType , class RequestHandlerType , class LatencyBufferType , class RawDataProcessorType , class InputDataType = ReadoutType>
daqdataformats::run_number_t dunedaq::snbmodules::SNBDataHandlingModel< ReadoutType, RequestHandlerType, LatencyBufferType, RawDataProcessorType, InputDataType >::m_run_number
protected

Definition at line 176 of file SNBDataHandlingModel.hpp.

◆ m_sourceid

template<class ReadoutType , class RequestHandlerType , class LatencyBufferType , class RawDataProcessorType , class InputDataType = ReadoutType>
daqdataformats::SourceID dunedaq::snbmodules::SNBDataHandlingModel< ReadoutType, RequestHandlerType, LatencyBufferType, RawDataProcessorType, InputDataType >::m_sourceid
protected

Definition at line 175 of file SNBDataHandlingModel.hpp.

◆ m_stats_packet_count

template<class ReadoutType , class RequestHandlerType , class LatencyBufferType , class RawDataProcessorType , class InputDataType = ReadoutType>
std::atomic<int> dunedaq::snbmodules::SNBDataHandlingModel< ReadoutType, RequestHandlerType, LatencyBufferType, RawDataProcessorType, InputDataType >::m_stats_packet_count { 0 }
protected

Definition at line 201 of file SNBDataHandlingModel.hpp.

201{ 0 };

◆ m_sum_payloads

template<class ReadoutType , class RequestHandlerType , class LatencyBufferType , class RawDataProcessorType , class InputDataType = ReadoutType>
std::atomic<sum_payload_t> dunedaq::snbmodules::SNBDataHandlingModel< ReadoutType, RequestHandlerType, LatencyBufferType, RawDataProcessorType, InputDataType >::m_sum_payloads { 0 }
protected

Definition at line 195 of file SNBDataHandlingModel.hpp.

195{ 0 };

◆ m_sum_requests

template<class ReadoutType , class RequestHandlerType , class LatencyBufferType , class RawDataProcessorType , class InputDataType = ReadoutType>
std::atomic<sum_request_t> dunedaq::snbmodules::SNBDataHandlingModel< ReadoutType, RequestHandlerType, LatencyBufferType, RawDataProcessorType, InputDataType >::m_sum_requests { 0 }
protected

Definition at line 197 of file SNBDataHandlingModel.hpp.

197{ 0 };

◆ m_t0

template<class ReadoutType , class RequestHandlerType , class LatencyBufferType , class RawDataProcessorType , class InputDataType = ReadoutType>
std::chrono::time_point<std::chrono::high_resolution_clock> dunedaq::snbmodules::SNBDataHandlingModel< ReadoutType, RequestHandlerType, LatencyBufferType, RawDataProcessorType, InputDataType >::m_t0
protected

Definition at line 247 of file SNBDataHandlingModel.hpp.

◆ m_timekeeper

template<class ReadoutType , class RequestHandlerType , class LatencyBufferType , class RawDataProcessorType , class InputDataType = ReadoutType>
std::unique_ptr<folly::ThreadWheelTimekeeper> dunedaq::snbmodules::SNBDataHandlingModel< ReadoutType, RequestHandlerType, LatencyBufferType, RawDataProcessorType, InputDataType >::m_timekeeper
protected

Definition at line 231 of file SNBDataHandlingModel.hpp.

◆ m_timesync_connection_name

template<class ReadoutType , class RequestHandlerType , class LatencyBufferType , class RawDataProcessorType , class InputDataType = ReadoutType>
std::string dunedaq::snbmodules::SNBDataHandlingModel< ReadoutType, RequestHandlerType, LatencyBufferType, RawDataProcessorType, InputDataType >::m_timesync_connection_name
protected

Definition at line 226 of file SNBDataHandlingModel.hpp.

◆ m_timesync_sender

template<class ReadoutType , class RequestHandlerType , class LatencyBufferType , class RawDataProcessorType , class InputDataType = ReadoutType>
std::shared_ptr<timesync_sender_ct> dunedaq::snbmodules::SNBDataHandlingModel< ReadoutType, RequestHandlerType, LatencyBufferType, RawDataProcessorType, InputDataType >::m_timesync_sender
protected

Definition at line 224 of file SNBDataHandlingModel.hpp.

◆ m_timesync_thread

template<class ReadoutType , class RequestHandlerType , class LatencyBufferType , class RawDataProcessorType , class InputDataType = ReadoutType>
utilities::ReusableThread dunedaq::snbmodules::SNBDataHandlingModel< ReadoutType, RequestHandlerType, LatencyBufferType, RawDataProcessorType, InputDataType >::m_timesync_thread
protected

Definition at line 225 of file SNBDataHandlingModel.hpp.

◆ ms

template<class ReadoutType , class RequestHandlerType , class LatencyBufferType , class RawDataProcessorType , class InputDataType = ReadoutType>
timestamp_t dunedaq::snbmodules::SNBDataHandlingModel< ReadoutType, RequestHandlerType, LatencyBufferType, RawDataProcessorType, InputDataType >::ms = 1000 * us
inlinestaticconstexpr

Definition at line 85 of file SNBDataHandlingModel.hpp.

◆ ns

template<class ReadoutType , class RequestHandlerType , class LatencyBufferType , class RawDataProcessorType , class InputDataType = ReadoutType>
timestamp_t dunedaq::snbmodules::SNBDataHandlingModel< ReadoutType, RequestHandlerType, LatencyBufferType, RawDataProcessorType, InputDataType >::ns = 1
inlinestaticconstexpr

Definition at line 83 of file SNBDataHandlingModel.hpp.

◆ s

template<class ReadoutType , class RequestHandlerType , class LatencyBufferType , class RawDataProcessorType , class InputDataType = ReadoutType>
timestamp_t dunedaq::snbmodules::SNBDataHandlingModel< ReadoutType, RequestHandlerType, LatencyBufferType, RawDataProcessorType, InputDataType >::s = 1000 * ms
inlinestaticconstexpr

Definition at line 86 of file SNBDataHandlingModel.hpp.

◆ us

template<class ReadoutType , class RequestHandlerType , class LatencyBufferType , class RawDataProcessorType , class InputDataType = ReadoutType>
timestamp_t dunedaq::snbmodules::SNBDataHandlingModel< ReadoutType, RequestHandlerType, LatencyBufferType, RawDataProcessorType, InputDataType >::us = 1000 * ns
inlinestaticconstexpr

Definition at line 84 of file SNBDataHandlingModel.hpp.


The documentation for this class was generated from the following files: