DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
dunedaq::datahandlinglibs::ZeroCopyRecordingRequestHandlerModel< ReadoutType, LatencyBufferType > Class Template Reference

#include <ZeroCopyRecordingRequestHandlerModel.hpp>

Inheritance diagram for dunedaq::datahandlinglibs::ZeroCopyRecordingRequestHandlerModel< ReadoutType, LatencyBufferType >:
[legend]
Collaboration diagram for dunedaq::datahandlinglibs::ZeroCopyRecordingRequestHandlerModel< ReadoutType, LatencyBufferType >:
[legend]

Public Types

using inherited = DefaultRequestHandlerModel<ReadoutType, LatencyBufferType>
 
- Public Types inherited from dunedaq::datahandlinglibs::DefaultRequestHandlerModel< ReadoutType, LatencyBufferType >
using RDT = ReadoutType
 
using LBT = LatencyBufferType
 
using RequestResult
 
using ResultCode
 
- Public Types inherited from dunedaq::opmonlib::MonitorableObject
using NodePtr = std::weak_ptr<MonitorableObject>
 
using NewNodePtr = std::shared_ptr<MonitorableObject>
 
using ElementId = std::string
 

Public Member Functions

 ZeroCopyRecordingRequestHandlerModel (std::shared_ptr< LatencyBufferType > &latency_buffer, std::unique_ptr< FrameErrorRegistry > &error_registry)
 
void conf (const appmodel::DataHandlerModule *conf) override
 
void record (const appfwk::DAQModule::CommandData_t &args) override
 
- Public Member Functions inherited from dunedaq::datahandlinglibs::DefaultRequestHandlerModel< ReadoutType, LatencyBufferType >
 DefaultRequestHandlerModel (std::shared_ptr< LatencyBufferType > &latency_buffer, std::unique_ptr< FrameErrorRegistry > &error_registry)
 
void scrap (const appfwk::DAQModule::CommandData_t &) override
 
void start (const appfwk::DAQModule::CommandData_t &)
 
void stop (const appfwk::DAQModule::CommandData_t &)
 
void cleanup_check () override
 Check if cleanup is necessary and execute it if necessary.
 
virtual void periodic_data_transmission () override
 Periodic data transmission - relevant for trigger in particular.
 
void issue_request (dfmessages::DataRequest datarequest, bool is_retry=false) override
 Issue a data request to the request handler.
 
virtual dunedaq::daqdataformats::timestamp_t get_cutoff_timestamp ()
 
virtual bool supports_cutoff_timestamp ()
 
void reset_oldest_time ()
 
std::uint64_t get_oldest_time () override
 Get oldest timestamp in the buffer.
 
- Public Member Functions inherited from dunedaq::datahandlinglibs::RequestHandlerConcept< ReadoutType, LatencyBufferType >
 RequestHandlerConcept ()
 
virtual ~RequestHandlerConcept ()
 
 RequestHandlerConcept (const RequestHandlerConcept &)=delete
 RequestHandlerConcept is not copy-constructible.
 
RequestHandlerConceptoperator= (const RequestHandlerConcept &)=delete
 RequestHandlerConcept is not copy-assginable.
 
 RequestHandlerConcept (RequestHandlerConcept &&)=delete
 RequestHandlerConcept is not move-constructible.
 
RequestHandlerConceptoperator= (RequestHandlerConcept &&)=delete
 RequestHandlerConcept is not move-assignable.
 
- Public Member Functions inherited from dunedaq::opmonlib::MonitorableObject
 MonitorableObject (const MonitorableObject &)=delete
 
MonitorableObjectoperator= (const MonitorableObject &)=delete
 
 MonitorableObject (MonitorableObject &&)=delete
 
MonitorableObjectoperator= (MonitorableObject &&)=delete
 
virtual ~MonitorableObject ()=default
 
auto get_opmon_id () const noexcept
 
auto get_opmon_level () const noexcept
 

Private Attributes

int m_fd
 
int m_oflag
 

Additional Inherited Members

- Static Public Member Functions inherited from dunedaq::opmonlib::MonitorableObject
static bool publishable_metric (OpMonLevel entry, OpMonLevel system) noexcept
 
- Protected Types inherited from dunedaq::datahandlinglibs::RequestHandlerConcept< ReadoutType, LatencyBufferType >
enum  ResultCode {
  kFound = 0 , kNotFound , kTooOld , kNotYet ,
  kPartial , kPartiallyOld , kCleanup , kUnknown
}
 
- Protected Member Functions inherited from dunedaq::datahandlinglibs::DefaultRequestHandlerModel< ReadoutType, LatencyBufferType >
daqdataformats::FragmentHeader create_fragment_header (const dfmessages::DataRequest &dr)
 
std::unique_ptr< daqdataformats::Fragmentcreate_empty_fragment (const dfmessages::DataRequest &dr)
 
void dump_to_buffer (const void *data, std::size_t size, void *buffer, uint32_t buffer_pos, const std::size_t &buffer_size)
 
void periodic_cleanups ()
 
void periodic_data_transmissions ()
 
void cleanup ()
 
void check_waiting_requests ()
 
std::vector< std::pair< void *, size_t > > get_fragment_pieces (uint64_t start_win_ts, uint64_t end_win_ts, RequestResult &rres)
 
RequestResult data_request (dfmessages::DataRequest dr) override
 
virtual void generate_opmon_data () override
 
- Protected Member Functions inherited from dunedaq::datahandlinglibs::RequestHandlerConcept< ReadoutType, LatencyBufferType >
const std::string & resultCodeAsString (ResultCode rc)
 
- Protected Member Functions inherited from dunedaq::opmonlib::MonitorableObject
 MonitorableObject ()=default
 
void register_node (ElementId name, NewNodePtr)
 
void publish (google::protobuf::Message &&, CustomOrigin &&co={}, OpMonLevel l=to_level(EntryOpMonLevel::kDefault)) const noexcept
 
- Protected Attributes inherited from dunedaq::datahandlinglibs::DefaultRequestHandlerModel< ReadoutType, LatencyBufferType >
std::shared_ptr< LatencyBufferType > & m_latency_buffer
 
BufferedFileWriter m_buffered_writer
 
utilities::ReusableThread m_recording_thread
 
utilities::ReusableThread m_cleanup_thread
 
utilities::ReusableThread m_periodic_transmission_thread
 
std::map< dfmessages::DataRequest, int > m_request_counter
 
std::size_t m_max_requested_elements
 
std::mutex m_cv_mutex
 
std::condition_variable m_cv
 
std::atomic< boolm_cleanup_requested = false
 
std::atomic< int > m_requests_running = 0
 
std::vector< RequestElementm_waiting_requests
 
std::mutex m_waiting_requests_lock
 
std::unique_ptr< boost::asio::thread_pool > m_request_handler_thread_pool
 
size_t m_num_request_handling_threads = 0
 
std::unique_ptr< FrameErrorRegistry > & m_error_registry
 
std::chrono::time_point< std::chrono::high_resolution_clock > m_t0
 
std::atomic< boolm_run_marker = false
 
std::thread m_waiting_queue_thread
 
std::atomic< boolm_recording = false
 
std::atomic< uint64_t > m_next_timestamp_to_record = std::numeric_limits<uint64_t>::max()
 
bool m_configured
 
float m_pop_limit_pct
 
float m_pop_size_pct
 
unsigned m_pop_limit_size
 
size_t m_buffer_capacity
 
daqdataformats::SourceID m_sourceid
 
uint16_t m_detid
 
std::string m_output_file
 
size_t m_stream_buffer_size = 0
 
bool m_recording_configured = false
 
bool m_warn_on_timeout = true
 
bool m_warn_about_empty_buffer = true
 
uint32_t m_periodic_data_transmission_ms = 0
 
std::vector< std::string > m_frag_out_conn_ids
 
std::atomic< int > m_pop_counter
 
std::atomic< int > m_num_buffer_cleanups { 0 }
 
std::atomic< int > m_pop_reqs
 
std::atomic< int > m_pops_count
 
std::atomic< int > m_occupancy
 
std::atomic< int > m_num_requests_found { 0 }
 
std::atomic< int > m_num_requests_bad { 0 }
 
std::atomic< int > m_num_requests_old_window { 0 }
 
std::atomic< int > m_num_requests_delayed { 0 }
 
std::atomic< int > m_num_requests_uncategorized { 0 }
 
std::atomic< int > m_num_requests_timed_out { 0 }
 
std::atomic< int > m_handled_requests { 0 }
 
std::atomic< int > m_response_time_acc { 0 }
 
std::atomic< int > m_response_time_min { std::numeric_limits<int>::max() }
 
std::atomic< int > m_response_time_max { 0 }
 
std::atomic< int > m_payloads_written { 0 }
 
std::atomic< int > m_bytes_written { 0 }
 
std::atomic< uint64_t > m_num_periodic_sent { 0 }
 
std::atomic< uint64_t > m_num_periodic_send_failed { 0 }
 
std::atomic< uint64_t > m_oldest_timestamp { 0 }
 
int m_fragment_send_timeout_ms
 
- Protected Attributes inherited from dunedaq::datahandlinglibs::RequestHandlerConcept< ReadoutType, LatencyBufferType >
std::map< ResultCode, std::string > ResultCodeStrings
 

Detailed Description

template<class ReadoutType, class LatencyBufferType>
class dunedaq::datahandlinglibs::ZeroCopyRecordingRequestHandlerModel< ReadoutType, LatencyBufferType >

Definition at line 20 of file ZeroCopyRecordingRequestHandlerModel.hpp.

Member Typedef Documentation

◆ inherited

template<class ReadoutType , class LatencyBufferType >
using dunedaq::datahandlinglibs::ZeroCopyRecordingRequestHandlerModel< ReadoutType, LatencyBufferType >::inherited = DefaultRequestHandlerModel<ReadoutType, LatencyBufferType>

Definition at line 24 of file ZeroCopyRecordingRequestHandlerModel.hpp.

Constructor & Destructor Documentation

◆ ZeroCopyRecordingRequestHandlerModel()

template<class ReadoutType , class LatencyBufferType >
dunedaq::datahandlinglibs::ZeroCopyRecordingRequestHandlerModel< ReadoutType, LatencyBufferType >::ZeroCopyRecordingRequestHandlerModel ( std::shared_ptr< LatencyBufferType > & latency_buffer,
std::unique_ptr< FrameErrorRegistry > & error_registry )
inlineexplicit

Definition at line 27 of file ZeroCopyRecordingRequestHandlerModel.hpp.

30 {
31 TLOG_DEBUG(TLVL_WORK_STEPS) << "ZeroCopyRecordingRequestHandlerModel created...";
32 }
DefaultRequestHandlerModel(std::shared_ptr< LatencyBufferType > &latency_buffer, std::unique_ptr< FrameErrorRegistry > &error_registry)
#define TLOG_DEBUG(lvl,...)
Definition Logging.hpp:112

Member Function Documentation

◆ conf()

template<class ReadoutType , class LatencyBufferType >
void dunedaq::datahandlinglibs::ZeroCopyRecordingRequestHandlerModel< ReadoutType, LatencyBufferType >::conf ( const appmodel::DataHandlerModule * conf)
overridevirtual

Reimplemented from dunedaq::datahandlinglibs::DefaultRequestHandlerModel< ReadoutType, LatencyBufferType >.

Definition at line 9 of file ZeroCopyRecordingRequestHandlerModel.hxx.

10{
11 auto data_rec_conf = conf->get_module_configuration()->get_request_handler()->get_data_recorder();
12
13 if (data_rec_conf != nullptr) {
14 if (!data_rec_conf->get_output_file().empty()) {
15 inherited::m_sourceid.id = conf->get_source_id();
16 inherited::m_sourceid.subsystem = ReadoutType::subsystem;
17
18 // Check for alignment restrictions for filesystem block size. (XFS default: 4096)
19 if (inherited::m_latency_buffer->get_alignment_size() == 0 ||
20 sizeof(ReadoutType) * inherited::m_latency_buffer->size() % 4096) {
21 ers::error(ConfigurationError(ERS_HERE, inherited::m_sourceid, "Latency buffer is not 4kB aligned"));
22 }
23
24 // Check for sensible stream chunk size
25 inherited::m_stream_buffer_size = data_rec_conf->get_streaming_buffer_size();
26 if (inherited::m_stream_buffer_size % 4096 != 0) {
27 ers::error(ConfigurationError(ERS_HERE, inherited::m_sourceid, "Streaming chunk size is not divisible by 4kB!"));
28 }
29
30 // Prepare filename with full path
31 std::string file_full_path = data_rec_conf->get_output_file() + inherited::m_sourceid.to_string() + std::string(".bin");
32 inherited::m_output_file = file_full_path;
33
34
35 // RS: This will need to go away with the SNB store handler!
36 if (std::remove(file_full_path.c_str()) == 0) {
37 TLOG(TLVL_WORK_STEPS) << "Removed existing output file from previous run: " << file_full_path;
38 }
39
40 m_oflag = O_CREAT | O_WRONLY;
41 if (data_rec_conf->get_use_o_direct()) {
42 m_oflag |= O_DIRECT;
43 }
44 m_fd = ::open(file_full_path.c_str(), m_oflag, 0644);
45 if (m_fd == -1) {
46 TLOG() << "Failed to open file!";
47 throw ConfigurationError(ERS_HERE, inherited::m_sourceid, "Failed to open file!");
48 }
50
51 } else { // no output dir specified
52 TLOG(TLVL_WORK_STEPS) << "No output path is specified in data recorder config. Recording feature is inactive.";
53 }
54 } else {
55 TLOG(TLVL_WORK_STEPS) << "No recording config object specified. Recording feature is inactive.";
56 }
57
59}
#define ERS_HERE
void conf(const dunedaq::appmodel::DataHandlerModule *)
#define TLOG(...)
Definition macro.hpp:22
FELIX Initialization std::string initerror FELIX queue timed std::string queuename Unexpected chunk size
void error(const Issue &issue)
Definition ers.hpp:81
Subsystem subsystem
The general subsystem of the source of the data.
Definition SourceID.hpp:69
std::string to_string() const
Definition SourceID.hpp:83
ID_t id
Unique identifier of the source of the data.
Definition SourceID.hpp:74

◆ record()

template<class ReadoutType , class LatencyBufferType >
void dunedaq::datahandlinglibs::ZeroCopyRecordingRequestHandlerModel< ReadoutType, LatencyBufferType >::record ( const appfwk::DAQModule::CommandData_t & args)
overridevirtual

Reimplemented from dunedaq::datahandlinglibs::DefaultRequestHandlerModel< ReadoutType, LatencyBufferType >.

Definition at line 64 of file ZeroCopyRecordingRequestHandlerModel.hxx.

66{
67 if (inherited::m_recording.load()) {
69 CommandError(ERS_HERE, inherited::m_sourceid, "A recording is still running, no new recording was started!"));
70 return;
71 }
72
73// FIXME: Recording parameters to be clarified!
74 int recording_time_sec = 0;
75 if (cmdargs.contains("duration")) {
76 recording_time_sec = cmdargs["duration"];
77 } else {
79 CommandError(ERS_HERE, inherited::m_sourceid, "A recording command with missing duration field received!"));
80 }
81 if (recording_time_sec == 0) {
83 CommandError(ERS_HERE, inherited::m_sourceid, "Recording for 0 seconds requested. Recording command is ignored!"));
84 return;
85 }
86
88 [&](int duration) {
89 size_t chunk_size = inherited::m_stream_buffer_size;
90 size_t alignment_size = inherited::m_latency_buffer->get_alignment_size();
91 TLOG() << "Start recording for " << duration << " second(s)" << std::endl;
92 inherited::m_recording.exchange(true);
93 auto start_of_recording = std::chrono::high_resolution_clock::now();
94 auto current_time = start_of_recording;
96
97 const char* current_write_pointer = nullptr;
98 const char* start_of_buffer_pointer =
99 reinterpret_cast<const char*>(inherited::m_latency_buffer->start_of_buffer()); // NOLINT
100 const char* current_end_pointer;
101 const char* end_of_buffer_pointer = reinterpret_cast<const char*>(inherited::m_latency_buffer->end_of_buffer()); // NOLINT
102
103 size_t bytes_written = 0;
104 size_t failed_writes = 0;
105
106 while (std::chrono::duration_cast<std::chrono::seconds>(current_time - start_of_recording).count() < duration) {
108 size_t considered_chunks_in_loop = 0;
109
110 // Wait for potential running cleanup to finish first
111 {
112 std::unique_lock<std::mutex> lock(inherited::m_cv_mutex);
113 inherited::m_cv.wait(lock, [&] { return !inherited::m_cleanup_requested; });
114 }
115 inherited::m_cv.notify_all();
116
117 // Some frames have to be skipped to start copying from an aligned piece of memory
118 // These frames cannot be written without O_DIRECT as this would mess up the alignment of the write pointer
119 // into the target file
121 auto begin = inherited::m_latency_buffer->begin();
122 if (begin == inherited::m_latency_buffer->end()) {
123 // There are no elements in the buffer, update time and try again
124 current_time = std::chrono::high_resolution_clock::now();
125 continue;
126 }
127 inherited::m_next_timestamp_to_record = begin->get_timestamp();
128 size_t skipped_frames = 0;
129 while (reinterpret_cast<std::uintptr_t>(&(*begin)) % alignment_size) { // NOLINT
130 ++begin;
131 skipped_frames++;
132 if (!begin.good()) {
133 // We reached the end of the buffer without finding an aligned element
134 // Reset the next timestamp to record and try again
135 current_time = std::chrono::high_resolution_clock::now();
137 continue;
138 }
139 }
140 TLOG() << "Skipped " << skipped_frames << " frames";
141 current_write_pointer = reinterpret_cast<const char*>(&(*begin)); // NOLINT
142 }
143
144 current_end_pointer = reinterpret_cast<const char*>(inherited::m_latency_buffer->back()); // NOLINT
145
146 // Break the loop from time to time to update the timestamp and check if we should stop recording
147 while (considered_chunks_in_loop < 100) {
148 auto iptr = reinterpret_cast<std::uintptr_t>(current_write_pointer); // NOLINT
149 if (iptr % alignment_size) {
150 // This should never happen
151 TLOG() << "Error: Write pointer is not aligned";
152 }
153 bool failed_write = false;
154 if (current_write_pointer + chunk_size < current_end_pointer) {
155 // We can write a whole chunk to file
156 failed_write |= !::write(m_fd, current_write_pointer, chunk_size);
157 if (!failed_write) {
158 bytes_written += chunk_size;
159 }
160 current_write_pointer += chunk_size;
161 } else if (current_end_pointer < current_write_pointer) {
162 if (current_write_pointer + chunk_size < end_of_buffer_pointer) {
163 // Write whole chunk to file
164 failed_write |= !::write(m_fd, current_write_pointer, chunk_size);
165 if (!failed_write) {
166 bytes_written += chunk_size;
167 }
168 current_write_pointer += chunk_size;
169 } else {
170 // Write the last bit of the buffer without using O_DIRECT as it possibly doesn't fulfill the
171 // alignment requirement
172 fcntl(m_fd, F_SETFL, O_CREAT | O_WRONLY);
173 failed_write |= !::write(m_fd, current_write_pointer, end_of_buffer_pointer - current_write_pointer);
174 fcntl(m_fd, F_SETFL, m_oflag);
175 if (!failed_write) {
176 bytes_written += end_of_buffer_pointer - current_write_pointer;
177 }
178 current_write_pointer = start_of_buffer_pointer;
179 }
180 }
181
182 if (current_write_pointer == end_of_buffer_pointer) {
183 current_write_pointer = start_of_buffer_pointer;
184 }
185
186 if (failed_write) {
187 ++failed_writes;
189 }
190 considered_chunks_in_loop++;
191 // This expression is "a bit" complicated as it finds the last frame that was written to file completely
193 reinterpret_cast<const ReadoutType*>( // NOLINT
194 start_of_buffer_pointer +
195 (((current_write_pointer - start_of_buffer_pointer) / ReadoutType::fixed_payload_size) *
196 ReadoutType::fixed_payload_size))
197 ->get_timestamp();
198 }
199 }
200 current_time = std::chrono::high_resolution_clock::now();
201 }
202
203 // Complete writing the last frame to file
204 if (current_write_pointer != nullptr) {
205 const char* last_started_frame =
206 start_of_buffer_pointer +
207 (((current_write_pointer - start_of_buffer_pointer) / ReadoutType::fixed_payload_size) *
208 ReadoutType::fixed_payload_size);
209 if (last_started_frame != current_write_pointer) {
210 fcntl(m_fd, F_SETFL, O_CREAT | O_WRONLY);
211 if (!::write(m_fd,
212 current_write_pointer,
213 (last_started_frame + ReadoutType::fixed_payload_size) - current_write_pointer)) {
215 } else {
216 bytes_written += (last_started_frame + ReadoutType::fixed_payload_size) - current_write_pointer;
217 }
218 }
219 }
220 ::close(m_fd);
221
222 inherited::m_next_timestamp_to_record = std::numeric_limits<uint64_t>::max(); // NOLINT (build/unsigned)
223
224 TLOG() << "Stopped recording, wrote " << bytes_written << " bytes. Failed write count: " << failed_writes;
225 inherited::m_recording.exchange(false);
226 }, recording_time_sec);
227}
bool set_work(Function &&f, Args &&... args)
ReadoutType
Which type of readout to use for TriggerDecision and DataRequest.
Definition Types.hpp:57
void warning(const Issue &issue)
Definition ers.hpp:115

Member Data Documentation

◆ m_fd

template<class ReadoutType , class LatencyBufferType >
int dunedaq::datahandlinglibs::ZeroCopyRecordingRequestHandlerModel< ReadoutType, LatencyBufferType >::m_fd
private

Definition at line 41 of file ZeroCopyRecordingRequestHandlerModel.hpp.

◆ m_oflag

template<class ReadoutType , class LatencyBufferType >
int dunedaq::datahandlinglibs::ZeroCopyRecordingRequestHandlerModel< ReadoutType, LatencyBufferType >::m_oflag
private

Definition at line 42 of file ZeroCopyRecordingRequestHandlerModel.hpp.


The documentation for this class was generated from the following files: