DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
SNBRequestHandlerModel.hpp
Go to the documentation of this file.
1
8#ifndef DATAHANDLINGLIBS_INCLUDE_DATAHANDLINGLIBS_MODELS_SNBRequestHandlerModel_HPP_
9#define DATAHANDLINGLIBS_INCLUDE_DATAHANDLINGLIBS_MODELS_SNBRequestHandlerModel_HPP_
10
15
17
25
31#include "logging/Logging.hpp"
32
33#include <boost/asio.hpp>
34
36#include <folly/concurrency/UnboundedQueue.h>
37
38#include <algorithm>
39#include <atomic>
40#include <chrono>
41#include <deque>
42#include <functional>
43#include <future>
44#include <iomanip>
45#include <limits>
46#include <map>
47#include <memory>
48#include <queue>
49#include <string>
50#include <thread>
51#include <utility>
52#include <vector>
53
57
58namespace dunedaq {
59namespace snbmodules {
60
61// This function takes the type returned by the begin() and end()
62// functions in a ReadoutType object and returns the timestamp that
63// should be used to determine whether the item pointed to by the
64// iterator is within the readout window. The "frame" type
65// ReadoutTypes all have a get_timestamp() function on the type
66// pointed to by the iterator (eg, WIBFrame), so we make this
67// "default" function return that. For other types, eg, those coming
68// from DS, there isn't a get_timestamp() function, so a different
69// template specialization is used in the appropriate package (eg,
70// trigger)
71
72template<class T>
73uint64_t
75{
76 return iter->get_timestamp();
77}
78
79template<class ReadoutType, class LatencyBufferType>
80class SNBRequestHandlerModel : public datahandlinglibs::RequestHandlerConcept<ReadoutType, LatencyBufferType>
81{
82public:
83 // Using shorter typenames
84 using RDT = ReadoutType;
85 using LBT = LatencyBufferType;
86
89 using ResultCode =
91
92 // Explicit constructor with binding LB and error registry
93 explicit SNBRequestHandlerModel(std::shared_ptr<LatencyBufferType>& latency_buffer,
94 std::unique_ptr<datahandlinglibs::FrameErrorRegistry>& error_registry)
95 : m_latency_buffer(latency_buffer)
100 , m_error_registry(error_registry)
102 , m_pop_counter{ 0 }
103 , m_pop_reqs(0)
104 , m_pops_count(0)
105 , m_occupancy(0)
106 //, m_response_time_log()
107 //, m_response_time_log_lock()
108 {
109 TLOG_DEBUG(TLVL_WORK_STEPS) << "SNBRequestHandlerModel created...";
110 }
111
112 // A struct that combines a data request, with the number of times it was issued internally
114 {
116 const std::chrono::time_point<std::chrono::high_resolution_clock>& tp_value)
117 : request(data_request)
118 , start_time(tp_value)
119 {
120 }
121
123 std::chrono::time_point<std::chrono::high_resolution_clock> start_time;
124 };
125
126 // Default configuration mechanism
128
129 // Default un-configure mechanism
130 void scrap(const appfwk::DAQModule::CommandData_t& /*args*/) override;
131
132 // Default start mechanism
133 void start(const appfwk::DAQModule::CommandData_t& /*args*/);
134
135 // Default stop mechanism
136 void stop(const appfwk::DAQModule::CommandData_t& /*args*/);
137
138 // Raw data recording implementation
139 void record(const appfwk::DAQModule::CommandData_t& args) override;
140
141 // A function that determines if a cleanup request should be issued based on LB occupancy
142 void cleanup_check() override;
143
144 // Periodic data transmission method invoked at configurable interval
145 virtual void periodic_data_transmission() override;
146
147 // Implementation of default request handling. (boost::asio post to a thread pool)
148 void issue_request(dfmessages::DataRequest datarequest, bool is_retry = false) override;
149
150 // Opmon get_info implementation
151 // void get_info(opmonlib::InfoCollector& ci, int /*level*/) override;
152
153 virtual dunedaq::daqdataformats::timestamp_t get_cutoff_timestamp() { return 0; }
154 virtual bool supports_cutoff_timestamp() { return false; }
155
156 // Resets last known/processed DAQ timestamp
158
159 // Returns last processed ReadoutTyped element's DAQ timestamp
160 std::uint64_t get_oldest_time() override { return m_oldest_timestamp.load(); } // NOLINT(build/unsigned)
161
162protected:
163 // An inline helper function that creates a fragment header based on a data request
179
180 // Helper function that creates and empty fragment.
181 std::unique_ptr<daqdataformats::Fragment> create_empty_fragment(const dfmessages::DataRequest& dr);
182
183 // An inline helper function that merges a set of byte arrays into a destination array
184 inline void dump_to_buffer(const void* data,
185 std::size_t size,
186 void* buffer,
187 uint32_t buffer_pos, // NOLINT(build/unsigned)
188 const std::size_t& buffer_size)
189 {
190 auto bytes_to_copy = size;
191 while (bytes_to_copy > 0) {
192 auto n = std::min(bytes_to_copy, buffer_size - buffer_pos);
193 std::memcpy(static_cast<char*>(buffer) + buffer_pos, static_cast<const char*>(data), n);
194 buffer_pos += n;
195 bytes_to_copy -= n;
196 if (buffer_pos == buffer_size) {
197 buffer_pos = 0;
198 }
199 }
200 }
201
202 // Cleanup thread's work function. Runs the cleanup() routine
203 void periodic_cleanups();
204
205 // Periodic data transmission thread's work function. Runs the periodic_data_transmission() routine
207
208 // LB cleanup implementation
209 void cleanup();
210
211 // Function that checks delayed requests that are waiting for not yet present data in LB
213
214 // Function that gathers fragment pieces from LB
215 std::vector<std::pair<void*, size_t>> get_fragment_pieces(uint64_t start_win_ts,
216 uint64_t end_win_ts,
217 RequestResult& rres);
218
219 // Override data_request functionality
221
222 // operational monitoring
223 virtual void generate_opmon_data() override;
224
225 // Data access (LB)
226 std::shared_ptr<LatencyBufferType>& m_latency_buffer;
227
228 // Data recording
231
234
235 // Bookkeeping of OOB requests
236 std::map<dfmessages::DataRequest, int> m_request_counter;
237
238 // Requests
239 std::mutex m_cv_mutex;
240 std::condition_variable m_cv;
241 std::atomic<bool> m_cleanup_requested = false;
242 std::atomic<int> m_requests_running = 0;
243 std::vector<RequestElement> m_waiting_requests;
245
246 // Data extractor threads pool and corresponding requests
247 std::unique_ptr<boost::asio::thread_pool> m_request_handler_thread_pool;
249
250 // Error registry
251 std::unique_ptr<datahandlinglibs::FrameErrorRegistry>& m_error_registry;
252 std::chrono::time_point<std::chrono::high_resolution_clock> m_t0;
253
254 // The run marker
255 std::atomic<bool> m_run_marker = false;
256 // Threads and handles
258 std::atomic<bool> m_recording = false;
259 std::atomic<uint64_t> m_next_timestamp_to_record = std::numeric_limits<uint64_t>::max(); // NOLINT (build/unsigned)
260
261 // Configuration
264 std::set<uint64_t> m_pop_list;
267 uint16_t m_detid;
268 std::string m_output_file;
271 bool m_warn_on_timeout = true; // Whether to warn when a request times out
272 bool m_warn_about_empty_buffer = true; // Whether to warn about an empty buffer when processing a request
274 std::vector<std::string> m_frag_out_conn_ids;
275
276 // Stats
277 std::atomic<int> m_pop_counter;
278 std::atomic<int> m_num_buffer_cleanups{ 0 };
279 std::atomic<int> m_pop_reqs;
280 std::atomic<int> m_pops_count;
281 std::atomic<int> m_occupancy;
282 std::atomic<int> m_num_requests_found{ 0 };
283 std::atomic<int> m_num_requests_bad{ 0 };
284 std::atomic<int> m_num_requests_old_window{ 0 };
285 std::atomic<int> m_num_requests_delayed{ 0 };
286 std::atomic<int> m_num_requests_uncategorized{ 0 };
287 std::atomic<int> m_num_requests_timed_out{ 0 };
288 std::atomic<int> m_handled_requests{ 0 };
289 std::atomic<int> m_response_time_acc{ 0 };
290 std::atomic<int> m_response_time_min{ std::numeric_limits<int>::max() };
291 std::atomic<int> m_response_time_max{ 0 };
292 std::atomic<int> m_payloads_written{ 0 };
293 std::atomic<int> m_bytes_written{ 0 };
294 std::atomic<uint64_t> m_num_periodic_sent{ 0 }; // NOLINT(build/unsigned)
295 std::atomic<uint64_t> m_num_periodic_send_failed{ 0 }; // NOLINT(build/unsigned)
296 std::atomic<uint64_t> m_oldest_timestamp{ 0 }; // NOLINT(build/unsigned)
297
298 // std::atomic<int> m_avg_req_count{ 0 }; // for opmon, later
299 // std::atomic<int> m_avg_resp_time{ 0 };
300 // Request response time log (kept for debugging if needed)
301 // std::deque<std::pair<int, int>> m_response_time_log;
302 // std::mutex m_response_time_log_lock;
303
305
306private:
308};
309
310} // namespace snbmodules
311} // namespace dunedaq
312
313// Declarations
315
316#endif // DATAHANDLINGLIBS_INCLUDE_DATAHANDLINGLIBS_MODELS_SNBRequestHandlerModel_HPP_
void start(const appfwk::DAQModule::CommandData_t &)
std::unique_ptr< datahandlinglibs::FrameErrorRegistry > & m_error_registry
typename dunedaq::datahandlinglibs::RequestHandlerConcept< ReadoutType, LatencyBufferType >::RequestResult RequestResult
RequestResult data_request(dfmessages::DataRequest dr) override
std::shared_ptr< LatencyBufferType > & m_latency_buffer
virtual dunedaq::daqdataformats::timestamp_t get_cutoff_timestamp()
void conf(const dunedaq::appmodel::DataHandlerModule *)
typename dunedaq::datahandlinglibs::RequestHandlerConcept< ReadoutType, LatencyBufferType >::ResultCode ResultCode
std::unique_ptr< daqdataformats::Fragment > create_empty_fragment(const dfmessages::DataRequest &dr)
void issue_request(dfmessages::DataRequest datarequest, bool is_retry=false) override
Issue a data request to the request handler.
void dump_to_buffer(const void *data, std::size_t size, void *buffer, uint32_t buffer_pos, const std::size_t &buffer_size)
daqdataformats::FragmentHeader create_fragment_header(const dfmessages::DataRequest &dr)
void scrap(const appfwk::DAQModule::CommandData_t &) override
void cleanup_check() override
Check if cleanup is necessary and execute it if necessary.
std::uint64_t get_oldest_time() override
Get oldest timestamp in the buffer.
void stop(const appfwk::DAQModule::CommandData_t &)
std::map< dfmessages::DataRequest, int > m_request_counter
std::vector< std::pair< void *, size_t > > get_fragment_pieces(uint64_t start_win_ts, uint64_t end_win_ts, RequestResult &rres)
SNBRequestHandlerModel(std::shared_ptr< LatencyBufferType > &latency_buffer, std::unique_ptr< datahandlinglibs::FrameErrorRegistry > &error_registry)
std::chrono::time_point< std::chrono::high_resolution_clock > m_t0
void record(const appfwk::DAQModule::CommandData_t &args) override
datahandlinglibs::BufferedFileWriter m_buffered_writer
std::unique_ptr< boost::asio::thread_pool > m_request_handler_thread_pool
virtual void periodic_data_transmission() override
Periodic data transmission - relevant for trigger in particular.
#define TLOG_DEBUG(lvl,...)
Definition Logging.hpp:112
uint32_t fragment_type_t
Type used to represent Fragment type ID.
Definition Types.hpp:28
uint64_t get_frame_iterator_timestamp(T iter)
The DUNE-DAQ namespace.
FELIX Initialization std::string initerror FELIX queue timed std::string queuename Unexpected chunk size
timestamp_t window_end
End of the data collection window.
timestamp_t window_begin
Start of the data collection window.
The header for a DUNE Fragment.
timestamp_t trigger_timestamp
Timestamp of the TriggerDecision.
fragment_type_t fragment_type
Type of the Fragment, indicating the format of the contained payload.
SourceID element_id
Component that generated the data in this Fragment.
uint16_t detector_id
Identifier for the subdetector that produced the raw data in the Fragment payload.
run_number_t run_number
Run number this Fragment is associated with.
sequence_number_t sequence_number
Sequence number of this Fragment within a trigger record.
trigger_number_t trigger_number
Trigger Number this Fragment is associated with.
timestamp_t window_end
Window end of data in the Fragment.
timestamp_t window_begin
Window begin of data in the Fragment.
fragment_size_t size
Size of the Fragment (including header and payload)
SourceID is a generalized representation of the source of a piece of data in the DAQ....
Definition SourceID.hpp:32
Subsystem subsystem
The general subsystem of the source of the data.
Definition SourceID.hpp:69
ID_t id
Unique identifier of the source of the data.
Definition SourceID.hpp:74
This message represents a request for data sent to a single component of the DAQ.
sequence_number_t sequence_number
Sequence Number of the request.
trigger_number_t trigger_number
Trigger number the request corresponds to.
timestamp_t trigger_timestamp
Timestamp of trigger.
run_number_t run_number
The current run number.
RequestElement(const dfmessages::DataRequest &data_request, const std::chrono::time_point< std::chrono::high_resolution_clock > &tp_value)
std::chrono::time_point< std::chrono::high_resolution_clock > start_time