13 if (data_rec_conf !=
nullptr) {
14 if (!data_rec_conf->get_output_file().empty()) {
16 inherited::m_sourceid.subsystem = ReadoutType::subsystem;
19 if (inherited::m_latency_buffer->get_alignment_size() == 0 ||
20 sizeof(ReadoutType) * inherited::m_latency_buffer->
size() % 4096) {
21 ers::error(ConfigurationError(
ERS_HERE, inherited::m_sourceid,
"Latency buffer is not 4kB aligned"));
25 inherited::m_stream_buffer_size = data_rec_conf->get_streaming_buffer_size();
26 if (inherited::m_stream_buffer_size % 4096 != 0) {
27 ers::error(ConfigurationError(
ERS_HERE, inherited::m_sourceid,
"Streaming chunk size is not divisible by 4kB!"));
31 std::string file_full_path = data_rec_conf->get_output_file() + inherited::m_sourceid.to_string() + std::string(
".bin");
32 inherited::m_output_file = file_full_path;
36 if (std::remove(file_full_path.c_str()) == 0) {
37 TLOG(TLVL_WORK_STEPS) <<
"Removed existing output file from previous run: " << file_full_path;
40 m_oflag = O_CREAT | O_WRONLY;
41 if (data_rec_conf->get_use_o_direct()) {
44 m_fd = ::open(file_full_path.c_str(), m_oflag, 0644);
46 TLOG() <<
"Failed to open file!";
47 throw ConfigurationError(
ERS_HERE, inherited::m_sourceid,
"Failed to open file!");
49 inherited::m_recording_configured =
true;
52 TLOG(TLVL_WORK_STEPS) <<
"No output path is specified in data recorder config. Recording feature is inactive.";
55 TLOG(TLVL_WORK_STEPS) <<
"No recording config object specified. Recording feature is inactive.";
58 inherited::conf(conf);
65 const appfwk::DAQModule::CommandData_t& cmdargs)
67 if (inherited::m_recording.load()) {
69 CommandError(
ERS_HERE, inherited::m_sourceid,
"A recording is still running, no new recording was started!"));
74 int recording_time_sec = 0;
75 if (cmdargs.contains(
"duration")) {
76 recording_time_sec = cmdargs[
"duration"];
79 CommandError(
ERS_HERE, inherited::m_sourceid,
"A recording command with missing duration field received!"));
81 if (recording_time_sec == 0) {
83 CommandError(
ERS_HERE, inherited::m_sourceid,
"Recording for 0 seconds requested. Recording command is ignored!"));
87 inherited::m_recording_thread.set_work(
89 size_t chunk_size = inherited::m_stream_buffer_size;
90 size_t alignment_size = inherited::m_latency_buffer->get_alignment_size();
91 TLOG() <<
"Start recording for " << duration <<
" second(s)" << std::endl;
92 inherited::m_recording.exchange(
true);
93 auto start_of_recording = std::chrono::high_resolution_clock::now();
94 auto current_time = start_of_recording;
95 inherited::m_next_timestamp_to_record = 0;
97 const char* current_write_pointer =
nullptr;
98 const char* start_of_buffer_pointer =
99 reinterpret_cast<const char*
>(inherited::m_latency_buffer->start_of_buffer());
100 const char* current_end_pointer;
101 const char* end_of_buffer_pointer =
reinterpret_cast<const char*
>(inherited::m_latency_buffer->end_of_buffer());
103 size_t bytes_written = 0;
104 size_t failed_writes = 0;
106 while (std::chrono::duration_cast<std::chrono::seconds>(current_time - start_of_recording).count() < duration) {
107 if (!inherited::m_cleanup_requested || (inherited::m_next_timestamp_to_record == 0)) {
108 size_t considered_chunks_in_loop = 0;
112 std::unique_lock<std::mutex> lock(inherited::m_cv_mutex);
113 inherited::m_cv.wait(lock, [&] {
return !inherited::m_cleanup_requested; });
115 inherited::m_cv.notify_all();
120 if (inherited::m_next_timestamp_to_record == 0) {
121 auto begin = inherited::m_latency_buffer->begin();
122 if (begin == inherited::m_latency_buffer->end()) {
124 current_time = std::chrono::high_resolution_clock::now();
127 inherited::m_next_timestamp_to_record = begin->get_timestamp();
128 size_t skipped_frames = 0;
129 while (
reinterpret_cast<std::uintptr_t
>(&(*begin)) % alignment_size) {
135 current_time = std::chrono::high_resolution_clock::now();
136 inherited::m_next_timestamp_to_record = 0;
140 TLOG() <<
"Skipped " << skipped_frames <<
" frames";
141 current_write_pointer =
reinterpret_cast<const char*
>(&(*begin));
144 current_end_pointer =
reinterpret_cast<const char*
>(inherited::m_latency_buffer->back());
147 while (considered_chunks_in_loop < 100) {
148 auto iptr =
reinterpret_cast<std::uintptr_t
>(current_write_pointer);
149 if (iptr % alignment_size) {
151 TLOG() <<
"Error: Write pointer is not aligned";
153 bool failed_write =
false;
154 if (current_write_pointer + chunk_size < current_end_pointer) {
156 failed_write |= !::write(m_fd, current_write_pointer, chunk_size);
158 bytes_written += chunk_size;
160 current_write_pointer += chunk_size;
161 }
else if (current_end_pointer < current_write_pointer) {
162 if (current_write_pointer + chunk_size < end_of_buffer_pointer) {
164 failed_write |= !::write(m_fd, current_write_pointer, chunk_size);
166 bytes_written += chunk_size;
168 current_write_pointer += chunk_size;
172 fcntl(m_fd, F_SETFL, O_CREAT | O_WRONLY);
173 failed_write |= !::write(m_fd, current_write_pointer, end_of_buffer_pointer - current_write_pointer);
174 fcntl(m_fd, F_SETFL, m_oflag);
176 bytes_written += end_of_buffer_pointer - current_write_pointer;
178 current_write_pointer = start_of_buffer_pointer;
182 if (current_write_pointer == end_of_buffer_pointer) {
183 current_write_pointer = start_of_buffer_pointer;
190 considered_chunks_in_loop++;
192 inherited::m_next_timestamp_to_record =
193 reinterpret_cast<const ReadoutType*
>(
194 start_of_buffer_pointer +
195 (((current_write_pointer - start_of_buffer_pointer) / ReadoutType::fixed_payload_size) *
196 ReadoutType::fixed_payload_size))
200 current_time = std::chrono::high_resolution_clock::now();
204 if (current_write_pointer !=
nullptr) {
205 const char* last_started_frame =
206 start_of_buffer_pointer +
207 (((current_write_pointer - start_of_buffer_pointer) / ReadoutType::fixed_payload_size) *
208 ReadoutType::fixed_payload_size);
209 if (last_started_frame != current_write_pointer) {
210 fcntl(m_fd, F_SETFL, O_CREAT | O_WRONLY);
212 current_write_pointer,
213 (last_started_frame + ReadoutType::fixed_payload_size) - current_write_pointer)) {
216 bytes_written += (last_started_frame + ReadoutType::fixed_payload_size) - current_write_pointer;
222 inherited::m_next_timestamp_to_record = std::numeric_limits<uint64_t>::max();
224 TLOG() <<
"Stopped recording, wrote " << bytes_written <<
" bytes. Failed write count: " << failed_writes;
225 inherited::m_recording.exchange(
false);
226 }, recording_time_sec);