13 if (data_rec_conf !=
nullptr) {
14 if (!data_rec_conf->get_output_file().empty()) {
16 inherited::m_sourceid.subsystem = ReadoutType::subsystem;
19 if (inherited::m_latency_buffer->get_alignment_size() == 0 ||
20 sizeof(ReadoutType) * inherited::m_latency_buffer->
size() % 4096) {
25 inherited::m_stream_buffer_size = data_rec_conf->get_streaming_buffer_size();
26 if (inherited::m_stream_buffer_size % 4096 != 0) {
31 std::string file_full_path = data_rec_conf->get_output_file() + inherited::m_sourceid.to_string() + std::string(
".bin");
32 inherited::m_output_file = file_full_path;
36 if (std::remove(file_full_path.c_str()) == 0) {
37 TLOG(TLVL_WORK_STEPS) <<
"Removed existing output file from previous run: " << file_full_path;
40 m_oflag = O_CREAT | O_WRONLY;
41 if (data_rec_conf->get_use_o_direct()) {
44 m_fd = ::open(file_full_path.c_str(), m_oflag, 0644);
46 TLOG() <<
"Failed to open file!";
49 inherited::m_recording_configured =
true;
52 TLOG(TLVL_WORK_STEPS) <<
"No output path is specified in data recorder config. Recording feature is inactive.";
55 TLOG(TLVL_WORK_STEPS) <<
"No recording config object specified. Recording feature is inactive.";
58 inherited::conf(conf);
66 if (inherited::m_recording.load()) {
68 CommandError(
ERS_HERE, inherited::m_sourceid,
"A recording is still running, no new recording was started!"));
73 int recording_time_sec = 0;
74 if (cmdargs.contains(
"duration")) {
75 recording_time_sec = cmdargs[
"duration"];
78 CommandError(
ERS_HERE, inherited::m_sourceid,
"A recording command with missing duration field received!"));
80 if (recording_time_sec == 0) {
82 CommandError(
ERS_HERE, inherited::m_sourceid,
"Recording for 0 seconds requested. Recording command is ignored!"));
86 inherited::m_recording_thread.set_work(
88 size_t chunk_size = inherited::m_stream_buffer_size;
89 size_t alignment_size = inherited::m_latency_buffer->get_alignment_size();
90 TLOG() <<
"Start recording for " << duration <<
" second(s)" << std::endl;
91 inherited::m_recording.exchange(
true);
92 auto start_of_recording = std::chrono::high_resolution_clock::now();
93 auto current_time = start_of_recording;
94 inherited::m_next_timestamp_to_record = 0;
96 const char* current_write_pointer =
nullptr;
97 const char* start_of_buffer_pointer =
98 reinterpret_cast<const char*
>(inherited::m_latency_buffer->start_of_buffer());
99 const char* current_end_pointer;
100 const char* end_of_buffer_pointer =
reinterpret_cast<const char*
>(inherited::m_latency_buffer->end_of_buffer());
102 size_t bytes_written = 0;
103 size_t failed_writes = 0;
105 while (std::chrono::duration_cast<std::chrono::seconds>(current_time - start_of_recording).count() < duration) {
106 if (!inherited::m_cleanup_requested || (inherited::m_next_timestamp_to_record == 0)) {
107 size_t considered_chunks_in_loop = 0;
111 std::unique_lock<std::mutex> lock(inherited::m_cv_mutex);
112 inherited::m_cv.wait(lock, [&] {
return !inherited::m_cleanup_requested; });
114 inherited::m_cv.notify_all();
119 if (inherited::m_next_timestamp_to_record == 0) {
120 auto begin = inherited::m_latency_buffer->begin();
121 if (begin == inherited::m_latency_buffer->end()) {
123 current_time = std::chrono::high_resolution_clock::now();
126 inherited::m_next_timestamp_to_record = begin->get_timestamp();
127 size_t skipped_frames = 0;
128 while (
reinterpret_cast<std::uintptr_t
>(&(*begin)) % alignment_size) {
134 current_time = std::chrono::high_resolution_clock::now();
135 inherited::m_next_timestamp_to_record = 0;
139 TLOG() <<
"Skipped " << skipped_frames <<
" frames";
140 current_write_pointer =
reinterpret_cast<const char*
>(&(*begin));
143 current_end_pointer =
reinterpret_cast<const char*
>(inherited::m_latency_buffer->back());
146 while (considered_chunks_in_loop < 100) {
147 auto iptr =
reinterpret_cast<std::uintptr_t
>(current_write_pointer);
148 if (iptr % alignment_size) {
150 TLOG() <<
"Error: Write pointer is not aligned";
152 bool failed_write =
false;
153 if (current_write_pointer + chunk_size < current_end_pointer) {
155 failed_write |= !::write(m_fd, current_write_pointer, chunk_size);
157 bytes_written += chunk_size;
159 current_write_pointer += chunk_size;
160 }
else if (current_end_pointer < current_write_pointer) {
161 if (current_write_pointer + chunk_size < end_of_buffer_pointer) {
163 failed_write |= !::write(m_fd, current_write_pointer, chunk_size);
165 bytes_written += chunk_size;
167 current_write_pointer += chunk_size;
171 fcntl(m_fd, F_SETFL, O_CREAT | O_WRONLY);
172 failed_write |= !::write(m_fd, current_write_pointer, end_of_buffer_pointer - current_write_pointer);
173 fcntl(m_fd, F_SETFL, m_oflag);
175 bytes_written += end_of_buffer_pointer - current_write_pointer;
177 current_write_pointer = start_of_buffer_pointer;
181 if (current_write_pointer == end_of_buffer_pointer) {
182 current_write_pointer = start_of_buffer_pointer;
189 considered_chunks_in_loop++;
191 inherited::m_next_timestamp_to_record =
192 reinterpret_cast<const ReadoutType*
>(
193 start_of_buffer_pointer +
194 (((current_write_pointer - start_of_buffer_pointer) / ReadoutType::fixed_payload_size) *
195 ReadoutType::fixed_payload_size))
199 current_time = std::chrono::high_resolution_clock::now();
203 if (current_write_pointer !=
nullptr) {
204 const char* last_started_frame =
205 start_of_buffer_pointer +
206 (((current_write_pointer - start_of_buffer_pointer) / ReadoutType::fixed_payload_size) *
207 ReadoutType::fixed_payload_size);
208 if (last_started_frame != current_write_pointer) {
209 fcntl(m_fd, F_SETFL, O_CREAT | O_WRONLY);
211 current_write_pointer,
212 (last_started_frame + ReadoutType::fixed_payload_size) - current_write_pointer)) {
215 bytes_written += (last_started_frame + ReadoutType::fixed_payload_size) - current_write_pointer;
221 inherited::m_next_timestamp_to_record = std::numeric_limits<uint64_t>::max();
223 TLOG() <<
"Stopped recording, wrote " << bytes_written <<
" bytes. Failed write count: " << failed_writes;
224 inherited::m_recording.exchange(
false);
225 }, recording_time_sec);