DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
DataHandlingModel.hpp
Go to the documentation of this file.
1
9#ifndef DATAHANDLINGLIBS_INCLUDE_DATAHANDLINGLIBS_MODELS_READOUTMODEL_HPP_
10#define DATAHANDLINGLIBS_INCLUDE_DATAHANDLINGLIBS_MODELS_READOUTMODEL_HPP_
11
19
21
23#include "iomanager/Sender.hpp"
25
26#include "logging/Logging.hpp"
27
30
33
37
40
44
47
48#include <functional>
49#include <memory>
50#include <string>
51#include <utility>
52#include <vector>
53
58
59namespace dunedaq {
60namespace datahandlinglibs {
61
62template<class ReadoutType, class RequestHandlerType, class LatencyBufferType, class RawDataProcessorType, class InputDataType = ReadoutType>
64{
65public:
66 // Using shorter typenames
67 using RDT = ReadoutType;
68 using RHT = RequestHandlerType;
69 using LBT = LatencyBufferType;
70 using RPT = RawDataProcessorType;
71 using IDT = InputDataType;
72
73 // Using timestamp typenames
74 using timestamp_t = std::uint64_t; // NOLINT(build/unsigned)
75 static inline constexpr timestamp_t ns = 1;
76 static inline constexpr timestamp_t us = 1000 * ns;
77 static inline constexpr timestamp_t ms = 1000 * us;
78 static inline constexpr timestamp_t s = 1000 * ms;
79
80 // Explicit constructor with run marker pass-through
81 explicit DataHandlingModel(std::atomic<bool>& run_marker)
83 , m_callback_mode(false)
84 , m_fake_trigger(false)
89 , m_raw_data_receiver(nullptr)
91 , m_latency_buffer_impl(nullptr)
92 , m_raw_processor_impl(nullptr)
93 {
94 m_pid_of_current_process = getpid();
95 }
96
97 virtual ~DataHandlingModel() = default;
98
99 // Initializes the readoutmodel and its internals
100 void init(const appmodel::DataHandlerModule* modconf);
101
102 // Configures the readoutmodel and its internals
103 void conf(const nlohmann::json& args);
104
105 // Unconfigures readoutmodel's internals
106 void scrap(const nlohmann::json& args)
107 {
108 m_request_handler_impl->scrap(args);
109 m_latency_buffer_impl->scrap(args);
110 m_raw_processor_impl->scrap(args);
111 }
112
113 // Starts readoutmodel's internals
114 void start(const nlohmann::json& args);
115
116 // Stops readoutmodel's internals
117 void stop(const nlohmann::json& args);
118
119 // Record function: invokes request handler's record implementation
120 void record(const nlohmann::json& args) override
121 {
122 m_request_handler_impl->record(args);
123 }
124
125 // Opmon get_info call implementation
126 //void get_info(opmonlib::InfoCollector& ci, int level);
127
128 // Raw data consume callback
129 void consume_payload(RDT&& payload);
130
131 // Consume callback
132 std::function<void(RDT&&)> m_consume_callback;
133
134protected:
135
136 // Perform processing operations on payload
137 void process_item(RDT& payload);
138
139 // Raw data consumer's work function
140 void run_consume();
141
142 // Timesync thread's work function
143 void run_timesync();
144
145 // Dispatch data request
146 void dispatch_requests(dfmessages::DataRequest& data_request);
147
148 // Transform input data type to readout
149 virtual std::vector<RDT> transform_payload(IDT& original) const
150 {
151 return { reinterpret_cast<RDT&>(original) };
152 }
153
154 // Operational monitoring
155 virtual void generate_opmon_data() override;
156
157 // Constuctor params
158 std::atomic<bool>& m_run_marker;
159
160 // CONFIGURATION
161 //appfwk::app::ModInit m_queue_config;
169
170 // STATS
172 using num_payload_t = std::remove_const<std::invoke_result<decltype(&metric_t::num_payloads),metric_t>::type>::type;
173 using sum_payload_t = std::remove_const<std::invoke_result<decltype(&metric_t::sum_payloads),metric_t>::type>::type;
174 using num_request_t = std::remove_const<std::invoke_result<decltype(&metric_t::num_requests),metric_t>::type>::type;
175 using sum_request_t = std::remove_const<std::invoke_result<decltype(&metric_t::sum_requests),metric_t>::type>::type;
176 using rawq_timeout_count_t = std::remove_const<std::invoke_result<decltype(&metric_t::num_data_input_timeouts),metric_t>::type>::type;
177 using num_lb_insert_failures_t = std::remove_const<std::invoke_result<decltype(&metric_t::num_lb_insert_failures),metric_t>::type>::type;
178
179 std::atomic<num_payload_t> m_num_payloads{ 0 };
180 std::atomic<sum_payload_t> m_sum_payloads{ 0 };
181 std::atomic<num_request_t> m_num_requests{ 0 };
182 std::atomic<sum_request_t> m_sum_requests{ 0 };
183 std::atomic<rawq_timeout_count_t> m_rawq_timeout_count{ 0 };
184 std::atomic<num_lb_insert_failures_t> m_num_lb_insert_failures{ 0 };
185 std::atomic<int> m_stats_packet_count{ 0 };
186
187 // CONSUMER
189
190 // RAW RECEIVER
191 std::chrono::milliseconds m_raw_receiver_timeout_ms;
192 std::chrono::microseconds m_raw_receiver_sleep_us;
194 std::shared_ptr<raw_receiver_ct> m_raw_data_receiver;
196
197 // REQUEST RECEIVERS
199 std::shared_ptr<request_receiver_ct> m_data_request_receiver;
200
201 // FRAGMENT SENDER
202 //std::chrono::milliseconds m_fragment_sender_timeout_ms;
203 //using fragment_sender_ct = iomanager::SenderConcept<std::pair<std::unique_ptr<daqdataformats::Fragment>, std::string>>;
204 //std::shared_ptr<fragment_sender_ct> m_fragment_sender;
205
206 // TIME-SYNC
208 std::shared_ptr<timesync_sender_ct> m_timesync_sender;
212
213 // LATENCY BUFFER
214 std::shared_ptr<LatencyBufferType> m_latency_buffer_impl;
215
216 // RAW PROCESSING
217 std::shared_ptr<RawDataProcessorType> m_raw_processor_impl;
218
219 // REQUEST HANDLER
220 std::shared_ptr<RequestHandlerType> m_request_handler_impl;
222
223 // ERROR REGISTRY
224 std::unique_ptr<FrameErrorRegistry> m_error_registry;
225
226 // RUN START T0
227 std::chrono::time_point<std::chrono::high_resolution_clock> m_t0;
228};
229
230} // namespace datahandlinglibs
231} // namespace dunedaq
232
233// Declarations
235
236#endif // DATAHANDLINGLIBS_INCLUDE_DATAHANDLINGLIBS_MODELS_READOUTMODEL_HPP_
std::atomic< num_lb_insert_failures_t > m_num_lb_insert_failures
std::unique_ptr< FrameErrorRegistry > m_error_registry
DataHandlingModel(std::atomic< bool > &run_marker)
std::shared_ptr< RequestHandlerType > m_request_handler_impl
void init(const appmodel::DataHandlerModule *modconf)
Forward calls from the appfwk.
std::shared_ptr< timesync_sender_ct > m_timesync_sender
virtual std::vector< RDT > transform_payload(IDT &original) const
std::remove_const< std::invoke_result< decltype(&metric_t::sum_payloads), metric_t >::type >::type sum_payload_t
void record(const nlohmann::json &args) override
std::remove_const< std::invoke_result< decltype(&metric_t::num_requests), metric_t >::type >::type num_request_t
std::remove_const< std::invoke_result< decltype(&metric_t::sum_requests), metric_t >::type >::type sum_request_t
std::chrono::time_point< std::chrono::high_resolution_clock > m_t0
std::shared_ptr< raw_receiver_ct > m_raw_data_receiver
void dispatch_requests(dfmessages::DataRequest &data_request)
std::remove_const< std::invoke_result< decltype(&metric_t::num_data_input_timeouts), metric_t >::type >::type rawq_timeout_count_t
void run_consume()
Function that will be run in its own thread to read the raw packets from the connection and add them ...
void run_timesync()
Function that will be run in its own thread and sends periodic timesync messages by pushing them to t...
std::remove_const< std::invoke_result< decltype(&metric_t::num_payloads), metric_t >::type >::type num_payload_t
std::remove_const< std::invoke_result< decltype(&metric_t::num_lb_insert_failures), metric_t >::type >::type num_lb_insert_failures_t
std::atomic< rawq_timeout_count_t > m_rawq_timeout_count
std::shared_ptr< LatencyBufferType > m_latency_buffer_impl
std::shared_ptr< request_receiver_ct > m_data_request_receiver
std::shared_ptr< RawDataProcessorType > m_raw_processor_impl
std::atomic< bool > run_marker
Global atomic for process lifetime.
uint32_t run_number_t
Type used to represent run number.
Definition Types.hpp:20
Including Qt Headers.
SourceID is a generalized representation of the source of a piece of data in the DAQ....
Definition SourceID.hpp:32
This message represents a request for data sent to a single component of the DAQ.