DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
ZeroCopyRecordingRequestHandlerModel.hxx
Go to the documentation of this file.
1// Declarations for ZeroCopyRecordingRequestHandlerModel
2
3namespace dunedaq {
4namespace datahandlinglibs {
5
6// Special configuration that checks LB alignment and O_DIRECT flag on output file
7template<class ReadoutType, class LatencyBufferType>
8void
10{
11 auto data_rec_conf = conf->get_module_configuration()->get_request_handler()->get_data_recorder();
12
13 if (data_rec_conf != nullptr) {
14 if (!data_rec_conf->get_output_file().empty()) {
15 inherited::m_sourceid.id = conf->get_source_id();
16 inherited::m_sourceid.subsystem = ReadoutType::subsystem;
17
18 // Check for alignment restrictions for filesystem block size. (XFS default: 4096)
19 if (inherited::m_latency_buffer->get_alignment_size() == 0 ||
20 sizeof(ReadoutType) * inherited::m_latency_buffer->size() % 4096) {
21 ers::error(ConfigurationError(ERS_HERE, inherited::m_sourceid, "Latency buffer is not 4kB aligned"));
22 }
23
24 // Check for sensible stream chunk size
25 inherited::m_stream_buffer_size = data_rec_conf->get_streaming_buffer_size();
26 if (inherited::m_stream_buffer_size % 4096 != 0) {
27 ers::error(ConfigurationError(ERS_HERE, inherited::m_sourceid, "Streaming chunk size is not divisible by 4kB!"));
28 }
29
30 // Prepare filename with full path
31 std::string file_full_path = data_rec_conf->get_output_file() + inherited::m_sourceid.to_string() + std::string(".bin");
32 inherited::m_output_file = file_full_path;
33
34
35 // RS: This will need to go away with the SNB store handler!
36 if (std::remove(file_full_path.c_str()) == 0) {
37 TLOG(TLVL_WORK_STEPS) << "Removed existing output file from previous run: " << file_full_path;
38 }
39
40 m_oflag = O_CREAT | O_WRONLY;
41 if (data_rec_conf->get_use_o_direct()) {
42 m_oflag |= O_DIRECT;
43 }
44 m_fd = ::open(file_full_path.c_str(), m_oflag, 0644);
45 if (m_fd == -1) {
46 TLOG() << "Failed to open file!";
47 throw ConfigurationError(ERS_HERE, inherited::m_sourceid, "Failed to open file!");
48 }
49 inherited::m_recording_configured = true;
50
51 } else { // no output dir specified
52 TLOG(TLVL_WORK_STEPS) << "No output path is specified in data recorder config. Recording feature is inactive.";
53 }
54 } else {
55 TLOG(TLVL_WORK_STEPS) << "No recording config object specified. Recording feature is inactive.";
56 }
57
58 inherited::conf(conf);
59}
60
61// Special record command that writes to files from memory aligned LBs
62template<class ReadoutType, class LatencyBufferType>
63void
65{
66 if (inherited::m_recording.load()) {
68 CommandError(ERS_HERE, inherited::m_sourceid, "A recording is still running, no new recording was started!"));
69 return;
70 }
71
72// FIXME: Recording parameters to be clarified!
73 int recording_time_sec = 0;
74 if (cmdargs.contains("duration")) {
75 recording_time_sec = cmdargs["duration"];
76 } else {
78 CommandError(ERS_HERE, inherited::m_sourceid, "A recording command with missing duration field received!"));
79 }
80 if (recording_time_sec == 0) {
82 CommandError(ERS_HERE, inherited::m_sourceid, "Recording for 0 seconds requested. Recording command is ignored!"));
83 return;
84 }
85
86 inherited::m_recording_thread.set_work(
87 [&](int duration) {
88 size_t chunk_size = inherited::m_stream_buffer_size;
89 size_t alignment_size = inherited::m_latency_buffer->get_alignment_size();
90 TLOG() << "Start recording for " << duration << " second(s)" << std::endl;
91 inherited::m_recording.exchange(true);
92 auto start_of_recording = std::chrono::high_resolution_clock::now();
93 auto current_time = start_of_recording;
94 inherited::m_next_timestamp_to_record = 0;
95
96 const char* current_write_pointer = nullptr;
97 const char* start_of_buffer_pointer =
98 reinterpret_cast<const char*>(inherited::m_latency_buffer->start_of_buffer()); // NOLINT
99 const char* current_end_pointer;
100 const char* end_of_buffer_pointer = reinterpret_cast<const char*>(inherited::m_latency_buffer->end_of_buffer()); // NOLINT
101
102 size_t bytes_written = 0;
103 size_t failed_writes = 0;
104
105 while (std::chrono::duration_cast<std::chrono::seconds>(current_time - start_of_recording).count() < duration) {
106 if (!inherited::m_cleanup_requested || (inherited::m_next_timestamp_to_record == 0)) {
107 size_t considered_chunks_in_loop = 0;
108
109 // Wait for potential running cleanup to finish first
110 {
111 std::unique_lock<std::mutex> lock(inherited::m_cv_mutex);
112 inherited::m_cv.wait(lock, [&] { return !inherited::m_cleanup_requested; });
113 }
114 inherited::m_cv.notify_all();
115
116 // Some frames have to be skipped to start copying from an aligned piece of memory
117 // These frames cannot be written without O_DIRECT as this would mess up the alignment of the write pointer
118 // into the target file
119 if (inherited::m_next_timestamp_to_record == 0) {
120 auto begin = inherited::m_latency_buffer->begin();
121 if (begin == inherited::m_latency_buffer->end()) {
122 // There are no elements in the buffer, update time and try again
123 current_time = std::chrono::high_resolution_clock::now();
124 continue;
125 }
126 inherited::m_next_timestamp_to_record = begin->get_timestamp();
127 size_t skipped_frames = 0;
128 while (reinterpret_cast<std::uintptr_t>(&(*begin)) % alignment_size) { // NOLINT
129 ++begin;
130 skipped_frames++;
131 if (!begin.good()) {
132 // We reached the end of the buffer without finding an aligned element
133 // Reset the next timestamp to record and try again
134 current_time = std::chrono::high_resolution_clock::now();
135 inherited::m_next_timestamp_to_record = 0;
136 continue;
137 }
138 }
139 TLOG() << "Skipped " << skipped_frames << " frames";
140 current_write_pointer = reinterpret_cast<const char*>(&(*begin)); // NOLINT
141 }
142
143 current_end_pointer = reinterpret_cast<const char*>(inherited::m_latency_buffer->back()); // NOLINT
144
145 // Break the loop from time to time to update the timestamp and check if we should stop recording
146 while (considered_chunks_in_loop < 100) {
147 auto iptr = reinterpret_cast<std::uintptr_t>(current_write_pointer); // NOLINT
148 if (iptr % alignment_size) {
149 // This should never happen
150 TLOG() << "Error: Write pointer is not aligned";
151 }
152 bool failed_write = false;
153 if (current_write_pointer + chunk_size < current_end_pointer) {
154 // We can write a whole chunk to file
155 failed_write |= !::write(m_fd, current_write_pointer, chunk_size);
156 if (!failed_write) {
157 bytes_written += chunk_size;
158 }
159 current_write_pointer += chunk_size;
160 } else if (current_end_pointer < current_write_pointer) {
161 if (current_write_pointer + chunk_size < end_of_buffer_pointer) {
162 // Write whole chunk to file
163 failed_write |= !::write(m_fd, current_write_pointer, chunk_size);
164 if (!failed_write) {
165 bytes_written += chunk_size;
166 }
167 current_write_pointer += chunk_size;
168 } else {
169 // Write the last bit of the buffer without using O_DIRECT as it possibly doesn't fulfill the
170 // alignment requirement
171 fcntl(m_fd, F_SETFL, O_CREAT | O_WRONLY);
172 failed_write |= !::write(m_fd, current_write_pointer, end_of_buffer_pointer - current_write_pointer);
173 fcntl(m_fd, F_SETFL, m_oflag);
174 if (!failed_write) {
175 bytes_written += end_of_buffer_pointer - current_write_pointer;
176 }
177 current_write_pointer = start_of_buffer_pointer;
178 }
179 }
180
181 if (current_write_pointer == end_of_buffer_pointer) {
182 current_write_pointer = start_of_buffer_pointer;
183 }
184
185 if (failed_write) {
186 ++failed_writes;
187 ers::warning(CannotWriteToFile(ERS_HERE, inherited::m_output_file));
188 }
189 considered_chunks_in_loop++;
190 // This expression is "a bit" complicated as it finds the last frame that was written to file completely
191 inherited::m_next_timestamp_to_record =
192 reinterpret_cast<const ReadoutType*>( // NOLINT
193 start_of_buffer_pointer +
194 (((current_write_pointer - start_of_buffer_pointer) / ReadoutType::fixed_payload_size) *
195 ReadoutType::fixed_payload_size))
196 ->get_timestamp();
197 }
198 }
199 current_time = std::chrono::high_resolution_clock::now();
200 }
201
202 // Complete writing the last frame to file
203 if (current_write_pointer != nullptr) {
204 const char* last_started_frame =
205 start_of_buffer_pointer +
206 (((current_write_pointer - start_of_buffer_pointer) / ReadoutType::fixed_payload_size) *
207 ReadoutType::fixed_payload_size);
208 if (last_started_frame != current_write_pointer) {
209 fcntl(m_fd, F_SETFL, O_CREAT | O_WRONLY);
210 if (!::write(m_fd,
211 current_write_pointer,
212 (last_started_frame + ReadoutType::fixed_payload_size) - current_write_pointer)) {
213 ers::warning(CannotWriteToFile(ERS_HERE, inherited::m_output_file));
214 } else {
215 bytes_written += (last_started_frame + ReadoutType::fixed_payload_size) - current_write_pointer;
216 }
217 }
218 }
219 ::close(m_fd);
220
221 inherited::m_next_timestamp_to_record = std::numeric_limits<uint64_t>::max(); // NOLINT (build/unsigned)
222
223 TLOG() << "Stopped recording, wrote " << bytes_written << " bytes. Failed write count: " << failed_writes;
224 inherited::m_recording.exchange(false);
225 }, recording_time_sec);
226}
227
228} // namespace datahandlinglibs
229} // namespace dunedaq
#define ERS_HERE
const dunedaq::appmodel::RequestHandler * get_request_handler() const
Get "request_handler" relationship value.
const dunedaq::appmodel::DataHandlerConf * get_module_configuration() const
Get "module_configuration" relationship value.
uint32_t get_source_id() const
Get "source_id" attribute value.
const dunedaq::appmodel::DataRecorderConf * get_data_recorder() const
Get "data_recorder" relationship value.
#define TLOG(...)
Definition macro.hpp:22
Including Qt Headers.
ConfigurationError
Definition util.hpp:27
FELIX Initialization std::string initerror FELIX queue timed std::string queuename Unexpected chunk size
void warning(const Issue &issue)
Definition ers.hpp:115
void error(const Issue &issue)
Definition ers.hpp:81