DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
SNBDataHandlingModel.hpp
Go to the documentation of this file.
1
9#ifndef SNBMODULES_INCLUDE_SNBMODULES_READOUT_SNBDATAHANDLINGMODEL_HPP_
10#define SNBMODULES_INCLUDE_SNBMODULES_READOUT_SNBDATAHANDLINGMODEL_HPP_
11
19
21
24#include "iomanager/Sender.hpp"
25
26#include "logging/Logging.hpp"
27
30
33
37
40
44
47
48#include <folly/coro/Baton.h>
49#include <folly/coro/Task.h>
50#include <folly/futures/ThreadWheelTimekeeper.h>
51
52#include <functional>
53#include <memory>
54#include <string>
55#include <utility>
56#include <vector>
57
62
63namespace dunedaq {
64namespace snbmodules {
65
66template<class ReadoutType,
67 class RequestHandlerType,
68 class LatencyBufferType,
69 class RawDataProcessorType,
70 class InputDataType = ReadoutType>
72{
73public:
74 // Using shorter typenames
75 using RDT = ReadoutType;
76 using RHT = RequestHandlerType;
77 using LBT = LatencyBufferType;
78 using RPT = RawDataProcessorType;
79 using IDT = InputDataType;
80
81 // Using timestamp typenames
82 using timestamp_t = std::uint64_t; // NOLINT(build/unsigned)
83 static inline constexpr timestamp_t ns = 1;
84 static inline constexpr timestamp_t us = 1000 * ns;
85 static inline constexpr timestamp_t ms = 1000 * us;
86 static inline constexpr timestamp_t s = 1000 * ms;
87
88 // Explicit constructor with run marker pass-through
89 explicit SNBDataHandlingModel(std::atomic<bool>& run_marker)
91 , m_callback_mode(false)
92 , m_fake_trigger(false)
97 , m_raw_data_receiver(nullptr)
99 , m_latency_buffer_impl(nullptr)
100 , m_raw_processor_impl(nullptr)
101 {
102 }
103
104 virtual ~SNBDataHandlingModel() = default;
105
106 // Initializes the readoutmodel and its internals
107 void init(const appmodel::DataHandlerModule* modconf);
108
109 // Configures the readoutmodel and its internals
110 void conf(const appfwk::DAQModule::CommandData_t& args);
111
112 // Unconfigures readoutmodel's internals
113 void scrap(const appfwk::DAQModule::CommandData_t& args)
114 {
115 m_request_handler_impl->scrap(args);
116 m_latency_buffer_impl->scrap(args);
117 m_raw_processor_impl->scrap(args);
118 }
119
120 // Starts readoutmodel's internals
121 void start(const appfwk::DAQModule::CommandData_t& args);
122
123 // Stops readoutmodel's internals
124 void stop(const appfwk::DAQModule::CommandData_t& args);
125
126 // Record function: invokes request handler's record implementation
127 void record(const appfwk::DAQModule::CommandData_t& args) override { m_request_handler_impl->record(args); }
128
129 // Opmon get_info call implementation
130 // void get_info(opmonlib::InfoCollector& ci, int level);
131
132 // Consume callback
133 std::function<void(IDT&&)> m_consume_callback;
134
135protected:
136 // Perform processing operations on payload
137 void process_item(RDT&& payload);
138
139 // Transform payload if needed, then perform processing
140 void transform_and_process(IDT&& payload);
141
142 // Raw data consume callback
143 void consume_callback(IDT&& payload);
144
145 // Raw data consumer's work function
146 void run_consume();
147
148 // Timesync thread's work function
149 void run_timesync();
150
151 // Postprocess scheduler thread's work function
153
154 // Postprocess schedule coroutine
155 folly::coro::Task<void> postprocess_schedule();
156
157 // Dispatch data request
158 void dispatch_requests(dfmessages::DataRequest& data_request);
159
160 // Transform input data type to readout
161 virtual std::vector<RDT> transform_payload(IDT& original) const { return { reinterpret_cast<RDT&>(original) }; }
162
163 // Operational monitoring
164 virtual void generate_opmon_data() override;
165
166 // Constructor params
167 std::atomic<bool>& m_run_marker;
168
169 // CONFIGURATION
170 // appfwk::app::ModInit m_queue_config;
180
181 // STATS
183 using num_payload_t = std::remove_const<std::invoke_result<decltype(&metric_t::num_payloads), metric_t>::type>::type;
184 using sum_payload_t = std::remove_const<std::invoke_result<decltype(&metric_t::sum_payloads), metric_t>::type>::type;
185 using num_request_t = std::remove_const<std::invoke_result<decltype(&metric_t::num_requests), metric_t>::type>::type;
186 using sum_request_t = std::remove_const<std::invoke_result<decltype(&metric_t::sum_requests), metric_t>::type>::type;
188 std::remove_const<std::invoke_result<decltype(&metric_t::num_data_input_timeouts), metric_t>::type>::type;
190 std::remove_const<std::invoke_result<decltype(&metric_t::num_lb_insert_failures), metric_t>::type>::type;
191 using num_post_processing_delay_max_waits_t = std::remove_const<
192 std::invoke_result<decltype(&metric_t::num_post_processing_delay_max_waits), metric_t>::type>::type;
193
194 std::atomic<num_payload_t> m_num_payloads{ 0 };
195 std::atomic<sum_payload_t> m_sum_payloads{ 0 };
196 std::atomic<num_request_t> m_num_requests{ 0 };
197 std::atomic<sum_request_t> m_sum_requests{ 0 };
198 std::atomic<rawq_timeout_count_t> m_rawq_timeout_count{ 0 };
199 std::atomic<num_lb_insert_failures_t> m_num_lb_insert_failures{ 0 };
200 std::atomic<num_post_processing_delay_max_waits_t> m_num_post_processing_delay_max_waits{ 0 };
201 std::atomic<int> m_stats_packet_count{ 0 };
202
203 // CONSUMER
205
206 // RAW RECEIVER
207 std::chrono::milliseconds m_raw_receiver_timeout_ms;
208 std::chrono::microseconds m_raw_receiver_sleep_us;
210 std::shared_ptr<raw_receiver_ct> m_raw_data_receiver;
212
213 // REQUEST RECEIVERS
215 std::shared_ptr<request_receiver_ct> m_data_request_receiver;
216
217 // FRAGMENT SENDER
218 // std::chrono::milliseconds m_fragment_sender_timeout_ms;
219 // using fragment_sender_ct = iomanager::SenderConcept<std::pair<std::unique_ptr<daqdataformats::Fragment>,
220 // std::string>>; std::shared_ptr<fragment_sender_ct> m_fragment_sender;
221
222 // TIME-SYNC
224 std::shared_ptr<timesync_sender_ct> m_timesync_sender;
227
228 // POSTPROCESS SCHEDULER
230 folly::coro::Baton m_baton;
231 std::unique_ptr<folly::ThreadWheelTimekeeper> m_timekeeper;
232
233 // LATENCY BUFFER
234 std::shared_ptr<LatencyBufferType> m_latency_buffer_impl;
235
236 // RAW PROCESSING
237 std::shared_ptr<RawDataProcessorType> m_raw_processor_impl;
238
239 // REQUEST HANDLER
240 std::shared_ptr<RequestHandlerType> m_request_handler_impl;
242
243 // ERROR REGISTRY
244 std::unique_ptr<datahandlinglibs::FrameErrorRegistry> m_error_registry;
245
246 // RUN START T0
247 std::chrono::time_point<std::chrono::high_resolution_clock> m_t0;
248};
249
250} // namespace snbmodules
251} // namespace dunedaq
252
253// Declarations
255
256#endif // SNBMODULES_INCLUDE_SNBMODULES_READOUT_SNBDATAHANDLINGMODEL_HPP_
std::atomic< num_lb_insert_failures_t > m_num_lb_insert_failures
std::remove_const< std::invoke_result< decltype(&metric_t::num_lb_insert_failures), metric_t >::type >::type num_lb_insert_failures_t
void start(const appfwk::DAQModule::CommandData_t &args)
SNBDataHandlingModel(std::atomic< bool > &run_marker)
std::shared_ptr< raw_receiver_ct > m_raw_data_receiver
std::atomic< rawq_timeout_count_t > m_rawq_timeout_count
std::remove_const< std::invoke_result< decltype(&metric_t::num_post_processing_delay_max_waits), metric_t >::type >::type num_post_processing_delay_max_waits_t
std::shared_ptr< RawDataProcessorType > m_raw_processor_impl
virtual std::vector< RDT > transform_payload(IDT &original) const
void init(const appmodel::DataHandlerModule *modconf)
Forward calls from the appfwk.
std::shared_ptr< timesync_sender_ct > m_timesync_sender
std::chrono::time_point< std::chrono::high_resolution_clock > m_t0
std::shared_ptr< RequestHandlerType > m_request_handler_impl
std::remove_const< std::invoke_result< decltype(&metric_t::num_payloads), metric_t >::type >::type num_payload_t
void conf(const appfwk::DAQModule::CommandData_t &args)
void record(const appfwk::DAQModule::CommandData_t &args) override
std::unique_ptr< folly::ThreadWheelTimekeeper > m_timekeeper
void stop(const appfwk::DAQModule::CommandData_t &args)
std::remove_const< std::invoke_result< decltype(&metric_t::sum_requests), metric_t >::type >::type sum_request_t
std::shared_ptr< LatencyBufferType > m_latency_buffer_impl
void dispatch_requests(dfmessages::DataRequest &data_request)
std::remove_const< std::invoke_result< decltype(&metric_t::sum_payloads), metric_t >::type >::type sum_payload_t
std::atomic< num_post_processing_delay_max_waits_t > m_num_post_processing_delay_max_waits
std::remove_const< std::invoke_result< decltype(&metric_t::num_requests), metric_t >::type >::type num_request_t
std::shared_ptr< request_receiver_ct > m_data_request_receiver
std::remove_const< std::invoke_result< decltype(&metric_t::num_data_input_timeouts), metric_t >::type >::type rawq_timeout_count_t
std::unique_ptr< datahandlinglibs::FrameErrorRegistry > m_error_registry
void run_timesync()
Function that will be run in its own thread and sends periodic timesync messages by pushing them to t...
void run_consume()
Function that will be run in its own thread to read the raw packets from the connection and add them ...
void scrap(const appfwk::DAQModule::CommandData_t &args)
std::atomic< bool > run_marker
Global atomic for process lifetime.
uint32_t run_number_t
Type used to represent run number.
Definition Types.hpp:20
ReadoutType
Which type of readout to use for TriggerDecision and DataRequest.
Definition Types.hpp:57
Including Qt Headers.
SourceID is a generalized representation of the source of a piece of data in the DAQ....
Definition SourceID.hpp:32
This message represents a request for data sent to a single component of the DAQ.