DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
DefaultRequestHandlerModel.hpp
Go to the documentation of this file.
1
9#ifndef DATAHANDLINGLIBS_INCLUDE_DATAHANDLINGLIBS_MODELS_DEFAULTREQUESTHANDLERMODEL_HPP_
10#define DATAHANDLINGLIBS_INCLUDE_DATAHANDLINGLIBS_MODELS_DEFAULTREQUESTHANDLERMODEL_HPP_
11
16
18
26
30#include "logging/Logging.hpp"
33
34#include <boost/asio.hpp>
35
37#include <folly/concurrency/UnboundedQueue.h>
38
39#include <algorithm>
40#include <atomic>
41#include <chrono>
42#include <deque>
43#include <functional>
44#include <future>
45#include <iomanip>
46#include <limits>
47#include <map>
48#include <memory>
49#include <queue>
50#include <string>
51#include <thread>
52#include <utility>
53#include <vector>
54
58
59namespace dunedaq {
60namespace datahandlinglibs {
61
62// This function takes the type returned by the begin() and end()
63// functions in a ReadoutType object and returns the timestamp that
64// should be used to determine whether the item pointed to by the
65// iterator is within the readout window. The "frame" type
66// ReadoutTypes all have a get_timestamp() function on the type
67// pointed to by the iterator (eg, WIBFrame), so we make this
68// "default" function return that. For other types, eg, those coming
69// from DS, there isn't a get_timestamp() function, so a different
70// template specialization is used in the appropriate package (eg,
71// trigger)
72
73template<class T>
74uint64_t
76{
77 return iter->get_timestamp();
78}
79
80
81template<class ReadoutType, class LatencyBufferType>
82class DefaultRequestHandlerModel : public RequestHandlerConcept<ReadoutType, LatencyBufferType>
83{
84public:
85 // Using shorter typenames
86 using RDT = ReadoutType;
87 using LBT = LatencyBufferType;
88
91 using ResultCode =
93
94 // Explicit constructor with binding LB and error registry
95 explicit DefaultRequestHandlerModel(std::shared_ptr<LatencyBufferType>& latency_buffer,
96 std::unique_ptr<FrameErrorRegistry>& error_registry)
97 : m_latency_buffer(latency_buffer)
102 , m_error_registry(error_registry)
103 , m_pop_limit_pct(0.0f)
104 , m_pop_size_pct(0.0f)
107 , m_pop_counter{ 0 }
108 , m_pop_reqs(0)
109 , m_pops_count(0)
110 , m_occupancy(0)
111 //, m_response_time_log()
112 //, m_response_time_log_lock()
113 {
114 TLOG_DEBUG(TLVL_WORK_STEPS) << "DefaultRequestHandlerModel created...";
115 }
116
117 // A struct that combines a data request, with the number of times it was issued internally
119 {
121 const std::chrono::time_point<std::chrono::high_resolution_clock>& tp_value)
122 : request(data_request)
123 , start_time(tp_value)
124 {}
125
127 std::chrono::time_point<std::chrono::high_resolution_clock> start_time;
128 };
129
130 // Default configuration mechanism
132
133 // Default un-configure mechanism
134 void scrap(const nlohmann::json& /*args*/) override;
135
136 // Default start mechanism
137 void start(const nlohmann::json& /*args*/);
138
139 // Default stop mechanism
140 void stop(const nlohmann::json& /*args*/);
141
142 // Raw data recording implementation
143 void record(const nlohmann::json& args) override;
144
145 // A function that determines if a cleanup request should be issued based on LB occupancy
146 void cleanup_check() override;
147
148 // Periodic data transmission method invoked at configurable interval
149 virtual void periodic_data_transmission() override;
150
151 // Implementation of default request handling. (boost::asio post to a thread pool)
152 void issue_request(dfmessages::DataRequest datarequest, bool is_retry=false) override;
153
154 // Opmon get_info implementation
155 // void get_info(opmonlib::InfoCollector& ci, int /*level*/) override;
156
158 virtual bool supports_cutoff_timestamp() {return false;}
159
160protected:
161 // An inline helper function that creates a fragment header based on a data request
162 inline
178
179 // Helper function that creates and empty fragment.
180 std::unique_ptr<daqdataformats::Fragment> create_empty_fragment(const dfmessages::DataRequest& dr);
181
182 // An inline helper function that merges a set of byte arrays into a destination array
183 inline
184 void dump_to_buffer(const void* data, std::size_t size,
185 void* buffer, uint32_t buffer_pos, // NOLINT(build/unsigned)
186 const std::size_t& buffer_size)
187 {
188 auto bytes_to_copy = size;
189 while (bytes_to_copy > 0) {
190 auto n = std::min(bytes_to_copy, buffer_size - buffer_pos);
191 std::memcpy(static_cast<char*>(buffer) + buffer_pos, static_cast<const char*>(data), n);
192 buffer_pos += n;
193 bytes_to_copy -= n;
194 if (buffer_pos == buffer_size) {
195 buffer_pos = 0;
196 }
197 }
198 }
199
200 // Cleanup thread's work function. Runs the cleanup() routine
201 void periodic_cleanups();
202
203 // Periodic data transmission thread's work function. Runs the periodic_data_transmission() routine
205
206 // LB cleanup implementation
207 void cleanup();
208
209 // Function that checks delayed requests that are waiting for not yet present data in LB
211
212 // Function that gathers fragment pieces from LB
213 std::vector<std::pair<void*, size_t>> get_fragment_pieces(uint64_t start_win_ts,
214 uint64_t end_win_ts,
215 RequestResult& rres);
216
217 // Override data_request functionality
219
220
221 // operational monitoring
222 virtual void generate_opmon_data() override;
223
224 // Data access (LB)
225 std::shared_ptr<LatencyBufferType>& m_latency_buffer;
226
227 // Data recording
230
233
234 // Bookkeeping of OOB requests
235 std::map<dfmessages::DataRequest, int> m_request_counter;
236
237 // Requests
239 std::mutex m_cv_mutex;
240 std::condition_variable m_cv;
241 std::atomic<bool> m_cleanup_requested = false;
242 std::atomic<int> m_requests_running = 0;
243 std::vector<RequestElement> m_waiting_requests;
245
246 // Data extractor threads pool and corresponding requests
247 std::unique_ptr<boost::asio::thread_pool> m_request_handler_thread_pool;
249
250 // Error registry
251 std::unique_ptr<FrameErrorRegistry>& m_error_registry;
252 std::chrono::time_point<std::chrono::high_resolution_clock> m_t0;
253
254 // The run marker
255 std::atomic<bool> m_run_marker = false;
256 // Threads and handles
258 std::atomic<bool> m_recording = false;
259 std::atomic<uint64_t> m_next_timestamp_to_record = std::numeric_limits<uint64_t>::max(); // NOLINT (build/unsigned)
260
261 // Configuration
263 float m_pop_limit_pct; // buffer occupancy percentage to issue a pop request
264 float m_pop_size_pct; // buffer percentage to pop
265 unsigned m_pop_limit_size; // pop_limit_pct * buffer_capacity
268 uint16_t m_detid;
269 std::string m_output_file;
272 bool m_warn_on_timeout = true; // Whether to warn when a request times out
273 bool m_warn_about_empty_buffer = true; // Whether to warn about an empty buffer when processing a request
275 std::vector<std::string> m_frag_out_conn_ids;
276
277 // Stats
278 std::atomic<int> m_pop_counter;
279 std::atomic<int> m_num_buffer_cleanups{ 0 };
280 std::atomic<int> m_pop_reqs;
281 std::atomic<int> m_pops_count;
282 std::atomic<int> m_occupancy;
283 std::atomic<int> m_num_requests_found{ 0 };
284 std::atomic<int> m_num_requests_bad{ 0 };
285 std::atomic<int> m_num_requests_old_window{ 0 };
286 std::atomic<int> m_num_requests_delayed{ 0 };
287 std::atomic<int> m_num_requests_uncategorized{ 0 };
288 std::atomic<int> m_num_requests_timed_out{ 0 };
289 std::atomic<int> m_handled_requests{ 0 };
290 std::atomic<int> m_response_time_acc{ 0 };
291 std::atomic<int> m_response_time_min{ std::numeric_limits<int>::max() };
292 std::atomic<int> m_response_time_max{ 0 };
293 std::atomic<int> m_payloads_written{ 0 };
294 std::atomic<int> m_bytes_written{ 0 };
295 std::atomic<uint64_t> m_num_periodic_sent{ 0 }; // NOLINT(build/unsigned)
296 std::atomic<uint64_t> m_num_periodic_send_failed{ 0 }; // NOLINT(build/unsigned)
297
298 // std::atomic<int> m_avg_req_count{ 0 }; // for opmon, later
299 // std::atomic<int> m_avg_resp_time{ 0 };
300 // Request response time log (kept for debugging if needed)
301 // std::deque<std::pair<int, int>> m_response_time_log;
302 // std::mutex m_response_time_log_lock;
303
305private:
307
308};
309
310} // namespace datahandlinglibs
311} // namespace dunedaq
312
313// Declarations
315
316#endif // DATAHANDLINGLIBS_INCLUDE_DATAHANDLINGLIBS_MODELS_DEFAULTREQUESTHANDLERMODEL_HPP_
void conf(const dunedaq::appmodel::DataHandlerModule *)
virtual void periodic_data_transmission() override
Periodic data transmission - relevant for trigger in particular.
void issue_request(dfmessages::DataRequest datarequest, bool is_retry=false) override
Issue a data request to the request handler.
RequestResult data_request(dfmessages::DataRequest dr) override
typename dunedaq::datahandlinglibs::RequestHandlerConcept< ReadoutType, LatencyBufferType >::ResultCode ResultCode
void dump_to_buffer(const void *data, std::size_t size, void *buffer, uint32_t buffer_pos, const std::size_t &buffer_size)
DefaultRequestHandlerModel(std::shared_ptr< LatencyBufferType > &latency_buffer, std::unique_ptr< FrameErrorRegistry > &error_registry)
void cleanup_check() override
Check if cleanup is necessary and execute it if necessary.
typename dunedaq::datahandlinglibs::RequestHandlerConcept< ReadoutType, LatencyBufferType >::RequestResult RequestResult
std::unique_ptr< daqdataformats::Fragment > create_empty_fragment(const dfmessages::DataRequest &dr)
std::vector< std::pair< void *, size_t > > get_fragment_pieces(uint64_t start_win_ts, uint64_t end_win_ts, RequestResult &rres)
daqdataformats::FragmentHeader create_fragment_header(const dfmessages::DataRequest &dr)
virtual dunedaq::daqdataformats::timestamp_t get_cutoff_timestamp()
std::chrono::time_point< std::chrono::high_resolution_clock > m_t0
std::unique_ptr< boost::asio::thread_pool > m_request_handler_thread_pool
#define TLOG_DEBUG(lvl,...)
Definition Logging.hpp:112
uint64_t timestamp_t
Type used to represent DUNE timing system timestamps.
Definition Types.hpp:36
uint32_t fragment_type_t
Type used to represent Fragment type ID.
Definition Types.hpp:28
Including Qt Headers.
FELIX Initialization std::string initerror FELIX queue timed std::string queuename Unexpected chunk size
timestamp_t window_end
End of the data collection window.
timestamp_t window_begin
Start of the data collection window.
The header for a DUNE Fragment.
timestamp_t trigger_timestamp
Timestamp of the TriggerDecision.
fragment_type_t fragment_type
Type of the Fragment, indicating the format of the contained payload.
SourceID element_id
Component that generated the data in this Fragment.
uint16_t detector_id
Identifier for the subdetector that produced the raw data in the Fragment payload.
run_number_t run_number
Run number this Fragment is associated with.
sequence_number_t sequence_number
Sequence number of this Fragment within a trigger record.
trigger_number_t trigger_number
Trigger Number this Fragment is associated with.
timestamp_t window_end
Window end of data in the Fragment.
timestamp_t window_begin
Window begin of data in the Fragment.
fragment_size_t size
Size of the Fragment (including header and payload)
SourceID is a generalized representation of the source of a piece of data in the DAQ....
Definition SourceID.hpp:32
Subsystem subsystem
The general subsystem of the source of the data.
Definition SourceID.hpp:69
ID_t id
Unique identifier of the source of the data.
Definition SourceID.hpp:74
RequestElement(const dfmessages::DataRequest &data_request, const std::chrono::time_point< std::chrono::high_resolution_clock > &tp_value)
std::chrono::time_point< std::chrono::high_resolution_clock > start_time
This message represents a request for data sent to a single component of the DAQ.
sequence_number_t sequence_number
Sequence Number of the request.
trigger_number_t trigger_number
Trigger number the request corresponds to.
timestamp_t trigger_timestamp
Timestamp of trigger.
run_number_t run_number
The current run number.