DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
DefaultRequestHandlerModel.hxx
Go to the documentation of this file.
1// Declarations for DefaultRequestHandlerModel
2
3namespace dunedaq {
4namespace datahandlinglibs {
5
6template<class RDT, class LBT>
7void
9{
10
11 auto reqh_conf = conf->get_module_configuration()->get_request_handler();
12 m_sourceid.id = conf->get_source_id();
13 m_sourceid.subsystem = RDT::subsystem;
14 m_detid = conf->get_detector_id();
15 m_pop_limit_pct = reqh_conf->get_pop_limit_pct();
16 m_pop_size_pct = reqh_conf->get_pop_size_pct();
17
18 m_buffer_capacity = conf->get_module_configuration()->get_latency_buffer()->get_size();
19 m_num_request_handling_threads = reqh_conf->get_handler_threads();
20 m_request_timeout_ms = reqh_conf->get_request_timeout();
21
22 for (auto output : conf->get_outputs()) {
23 if (output->get_data_type() == "Fragment") {
24 m_fragment_send_timeout_ms = output->get_send_timeout_ms();
25 // 19-Dec-2024, KAB: store the names/IDs of the Fragment output connections so that
26 // we can confirm that they are ready for sending at 'start' time.
27 m_frag_out_conn_ids.push_back(output->UID());
28 }
29 }
30
31 if (m_recording_configured == false) {
32 auto dr = reqh_conf->get_data_recorder();
33 if(dr != nullptr) {
34 m_output_file = dr->get_output_file();
35 if (remove(m_output_file.c_str()) == 0) {
36 TLOG_DEBUG(TLVL_WORK_STEPS) << "Removed existing output file from previous run: " << m_output_file << std::endl;
37 }
38 m_stream_buffer_size = dr->get_streaming_buffer_size();
39 m_buffered_writer.open(m_output_file, m_stream_buffer_size, dr->get_compression_algorithm(), dr->get_use_o_direct());
40 m_recording_configured = true;
41 }
42 }
43
44 m_warn_on_timeout = reqh_conf->get_warn_on_timeout();
45 m_warn_about_empty_buffer = reqh_conf->get_warn_on_empty_buffer();
46 m_periodic_data_transmission_ms = reqh_conf->get_periodic_data_transmission_ms();
47
48 if (m_pop_limit_pct < 0.0f || m_pop_limit_pct > 1.0f || m_pop_size_pct < 0.0f || m_pop_size_pct > 1.0f) {
49 ers::error(ConfigurationError(ERS_HERE, m_sourceid, "Auto-pop percentage out of range."));
50 } else {
51 m_pop_limit_size = m_pop_limit_pct * m_buffer_capacity;
52 }
53
54 m_recording_thread.set_name("recording", m_sourceid.id);
55 m_cleanup_thread.set_name("cleanup", m_sourceid.id);
56 m_periodic_transmission_thread.set_name("periodic", m_sourceid.id);
57
58 std::ostringstream oss;
59 oss << "RequestHandler configured. " << std::fixed << std::setprecision(2)
60 << "auto-pop limit: " << m_pop_limit_pct * 100.0f << "% "
61 << "auto-pop size: " << m_pop_size_pct * 100.0f << "%";
62 TLOG_DEBUG(TLVL_WORK_STEPS) << oss.str();
63}
64
65template<class RDT, class LBT>
66void
67DefaultRequestHandlerModel<RDT, LBT>::scrap(const appfwk::DAQModule::CommandData_t& /*args*/)
68{
69 if (m_buffered_writer.is_open()) {
70 m_buffered_writer.close();
71 }
72}
73
74template<class RDT, class LBT>
75void
76DefaultRequestHandlerModel<RDT, LBT>::start(const appfwk::DAQModule::CommandData_t& /*args*/)
77{
78 // Reset opmon variables
79 m_num_requests_found = 0;
80 m_num_requests_bad = 0;
81 m_num_requests_old_window = 0;
82 m_num_requests_delayed = 0;
83 m_num_requests_uncategorized = 0;
84 m_num_buffer_cleanups = 0;
85 m_num_requests_timed_out = 0;
86 m_handled_requests = 0;
87 m_response_time_acc = 0;
88 m_pop_reqs = 0;
89 m_pops_count = 0;
90 m_payloads_written = 0;
91 m_bytes_written = 0;
92
93 m_t0 = std::chrono::high_resolution_clock::now();
94
95 // 19-Dec-2024, KAB: check that Fragment senders are ready to send. This is done so
96 // that the IOManager infrastructure fetches the necessary connection details from
97 // the ConnectivityService at 'start' time, instead of the first time that the sender
98 // is used to send data. This avoids delays in the sending of the first fragment in
99 // the first data-taking run in a DAQ session. Such delays can lead to undesirable
100 // system behavior like trigger inhibits.
101 for (auto frag_out_conn : m_frag_out_conn_ids) {
103 if (sender != nullptr) {
104 bool is_ready = sender->is_ready_for_sending(std::chrono::milliseconds(100));
105 TLOG_DEBUG(0) << "The Fragment sender for " << frag_out_conn << " " << (is_ready ? "is" : "is not")
106 << " ready, my source_id is [" << m_sourceid << "]";
107 }
108 }
109
110 m_request_handler_thread_pool = std::make_unique<boost::asio::thread_pool>(m_num_request_handling_threads);
111
112 m_run_marker.store(true);
113 m_cleanup_thread.set_work(&DefaultRequestHandlerModel<RDT, LBT>::periodic_cleanups, this);
114 if(m_periodic_data_transmission_ms > 0) {
115 m_periodic_transmission_thread.set_work(&DefaultRequestHandlerModel<RDT, LBT>::periodic_data_transmissions, this);
116 }
117
118 m_waiting_queue_thread =
120}
121
122template<class RDT, class LBT>
123void
124DefaultRequestHandlerModel<RDT, LBT>::stop(const appfwk::DAQModule::CommandData_t& /*args*/)
125{
126 m_run_marker.store(false);
127 while (!m_recording_thread.get_readiness()) {
128 std::this_thread::sleep_for(std::chrono::milliseconds(10));
129 }
130 while (!m_cleanup_thread.get_readiness()) {
131 std::this_thread::sleep_for(std::chrono::milliseconds(10));
132 }
133 while (!m_periodic_transmission_thread.get_readiness()) {
134 std::this_thread::sleep_for(std::chrono::milliseconds(10));
135 }
136 m_waiting_queue_thread.join();
137 m_request_handler_thread_pool->join();
138}
139
140template<class RDT, class LBT>
141void
142DefaultRequestHandlerModel<RDT, LBT>::record(const appfwk::DAQModule::CommandData_t& /*args*/)
144 //auto conf = args.get<readoutconfig::RecordingParams>();
145 //FIXME: how do we pass the duration or recording?
146 int recording_time_sec = 1;
147 if (m_recording.load()) {
148 ers::error(CommandError(ERS_HERE, m_sourceid, "A recording is still running, no new recording was started!"));
149 return;
150 } else if (!m_buffered_writer.is_open()) {
151 ers::error(CommandError(ERS_HERE, m_sourceid, "DLH is not configured for recording"));
152 return;
153 }
154 m_recording_thread.set_work(
155 [&](int duration) {
156 TLOG() << "Start recording for " << duration << " second(s)" << std::endl;
157 m_recording.exchange(true);
158 auto start_of_recording = std::chrono::high_resolution_clock::now();
159 auto current_time = start_of_recording;
160 m_next_timestamp_to_record = 0;
161 RDT element_to_search;
162 while (std::chrono::duration_cast<std::chrono::seconds>(current_time - start_of_recording).count() < duration) {
163 if (!m_cleanup_requested || (m_next_timestamp_to_record == 0)) {
164 if (m_next_timestamp_to_record == 0) {
165 auto front = m_latency_buffer->front();
166 m_next_timestamp_to_record = front == nullptr ? 0 : front->get_timestamp();
167 }
168 element_to_search.set_timestamp(m_next_timestamp_to_record);
169 size_t processed_chunks_in_loop = 0;
170
171 {
172 std::unique_lock<std::mutex> lock(m_cv_mutex);
173 m_cv.wait(lock, [&] { return !m_cleanup_requested; });
174 m_requests_running++;
175 }
176 m_cv.notify_all();
177 auto chunk_iter = m_latency_buffer->lower_bound(element_to_search, true);
178 auto end = m_latency_buffer->end();
179 {
180 std::lock_guard<std::mutex> lock(m_cv_mutex);
181 m_requests_running--;
182 }
183 m_cv.notify_all();
184
185 for (; chunk_iter != end && chunk_iter.good() && processed_chunks_in_loop < 1000;) {
186 if ((*chunk_iter).get_timestamp() >= m_next_timestamp_to_record) {
187 if (!m_buffered_writer.write(reinterpret_cast<char*>(chunk_iter->begin()), // NOLINT
188 chunk_iter->get_payload_size())) {
189 ers::warning(CannotWriteToFile(ERS_HERE, m_output_file));
190 }
191 m_payloads_written++;
192 m_bytes_written += chunk_iter->get_payload_size();
193 processed_chunks_in_loop++;
194 m_next_timestamp_to_record = (*chunk_iter).get_timestamp() +
195 RDT::expected_tick_difference * (*chunk_iter).get_num_frames();
196 }
197 ++chunk_iter;
198 }
199 }
200 current_time = std::chrono::high_resolution_clock::now();
201 }
202 m_next_timestamp_to_record = std::numeric_limits<uint64_t>::max(); // NOLINT (build/unsigned)
203
204 TLOG() << "Stop recording" << std::endl;
205 m_recording.exchange(false);
206 m_buffered_writer.flush();
207 },
208 recording_time_sec);
209}
211template<class RDT, class LBT>
212void
214{
215 std::unique_lock<std::mutex> lock(m_cv_mutex);
216 if (m_latency_buffer->occupancy() > m_pop_limit_size && !m_cleanup_requested.exchange(true)) {
217 m_cv.wait(lock, [&] { return m_requests_running == 0; });
218 cleanup();
219 m_cleanup_requested = false;
220 m_cv.notify_all();
221 }
222}
223
224template<class RDT, class LBT>
225void
227{
228 boost::asio::post(*m_request_handler_thread_pool, [&, datarequest, is_retry]() { // start a thread from pool
229 auto t_req_begin = std::chrono::high_resolution_clock::now();
230 {
231 std::unique_lock<std::mutex> lock(m_cv_mutex);
232 m_cv.wait(lock, [&] { return !m_cleanup_requested; });
233 m_requests_running++;
234 }
235 m_cv.notify_all();
236 auto result = data_request(datarequest);
237 {
238 std::lock_guard<std::mutex> lock(m_cv_mutex);
239 m_requests_running--;
240 }
241 m_cv.notify_all();
242 if ((result.result_code == ResultCode::kNotYet || result.result_code == ResultCode::kPartial) && m_request_timeout_ms >0 && is_retry == false) {
243 TLOG_DEBUG(TLVL_WORK_STEPS) << "Re-queue request. "
244 << " with timestamp=" << result.data_request.trigger_timestamp;
245 std::lock_guard<std::mutex> wait_lock_guard(m_waiting_requests_lock);
246 m_waiting_requests.push_back(RequestElement(datarequest, std::chrono::high_resolution_clock::now()));
247 }
248 else {
249 try { // Send to fragment connection
250 TLOG_DEBUG(TLVL_WORK_STEPS) << "Sending fragment with trigger/sequence_number "
251 << result.fragment->get_trigger_number() << "."
252 << result.fragment->get_sequence_number() << ", run number "
253 << result.fragment->get_run_number() << ", and DetectorID "
254 << result.fragment->get_detector_id() << ", and SourceID "
255 << result.fragment->get_element_id() << ", and size "
256 << result.fragment->get_size() << ", and result code "
257 << result.result_code;
258 // Send fragment
260 ->send(std::move(result.fragment), std::chrono::milliseconds(m_fragment_send_timeout_ms));
261
262 } catch (const ers::Issue& excpt) {
263 ers::warning(CannotWriteToQueue(ERS_HERE, m_sourceid, datarequest.data_destination, excpt));
264 }
265 }
266
267 auto t_req_end = std::chrono::high_resolution_clock::now();
268 auto us_req_took = std::chrono::duration_cast<std::chrono::microseconds>(t_req_end - t_req_begin);
269 TLOG_DEBUG(TLVL_WORK_STEPS) << "Responding to data request took: " << us_req_took.count() << "[us]";
270 m_response_time_acc.fetch_add(us_req_took.count());
271 if ( us_req_took.count() > m_response_time_max.load() )
272 m_response_time_max.store(us_req_took.count());
273 if ( us_req_took.count() < m_response_time_min.load() )
274 m_response_time_min.store(us_req_took.count());
275 m_handled_requests++;
276 });
277}
278
279template<class RDT, class LBT>
280void
282 {
284
285 info.set_num_requests_handled(m_handled_requests.exchange(0));
286 info.set_num_requests_found(m_num_requests_found.exchange(0));
287 info.set_num_requests_bad(m_num_requests_bad.exchange(0));
288 info.set_num_requests_old_window(m_num_requests_old_window.exchange(0));
289 info.set_num_requests_delayed(m_num_requests_delayed.exchange(0));
290 info.set_num_requests_uncategorized(m_num_requests_uncategorized.exchange(0));
291 info.set_num_requests_timed_out(m_num_requests_timed_out.exchange(0));
292 info.set_num_requests_waiting(m_waiting_requests.size());
293
294 int new_pop_reqs = 0;
295 int new_pop_count = 0;
296 int new_occupancy = 0;
297 info.set_tot_request_response_time(m_response_time_acc.exchange(0));
298 info.set_max_request_response_time(m_response_time_max.exchange(0));
299 info.set_min_request_response_time(m_response_time_min.exchange(std::numeric_limits<int>::max()));
300 auto now = std::chrono::high_resolution_clock::now();
301 new_pop_reqs = m_pop_reqs.exchange(0);
302 new_pop_count = m_pops_count.exchange(0);
303 new_occupancy = m_occupancy;
304 double seconds = std::chrono::duration_cast<std::chrono::microseconds>(now - m_t0).count() / 1000000.;
305 TLOG_DEBUG(TLVL_HOUSEKEEPING) << "Cleanup request rate: " << new_pop_reqs / seconds / 1. << " [Hz]"
306 << " Dropped: " << new_pop_count << " Occupancy: " << new_occupancy;
307
308 if (info.num_requests_handled() > 0) {
309
310 info.set_avg_request_response_time(info.tot_request_response_time() / info.num_requests_handled());
311 TLOG_DEBUG(TLVL_HOUSEKEEPING) << "Completed requests: " << info.num_requests_handled()
312 << " | Avarage response time: " << info.avg_request_response_time() << "[us]"
313 << " | Periodic sends: " << info.num_periodic_sent();
314 }
315
316 m_t0 = now;
317
318 info.set_num_buffer_cleanups(m_num_buffer_cleanups.exchange(0));
319 info.set_num_periodic_sent(m_num_periodic_sent.exchange(0));
320 info.set_num_periodic_send_failed(m_num_periodic_send_failed.exchange(0));
321
322 this->publish(std::move(info));
323
325 rinfo.set_recording_status(m_recording? "Y" : "N");
326 rinfo.set_packets_recorded(m_payloads_written.exchange(0));
327 rinfo.set_bytes_recorded(m_bytes_written.exchange(0));
328 this->publish(std::move(rinfo));
329 }
330
331
332template<class RDT, class LBT>
333std::unique_ptr<daqdataformats::Fragment>
335{
336 auto frag_header = create_fragment_header(dr);
337 frag_header.error_bits |= (0x1 << static_cast<size_t>(daqdataformats::FragmentErrorBits::kDataNotFound));
338 auto fragment = std::make_unique<daqdataformats::Fragment>(std::vector<std::pair<void*, size_t>>());
339 fragment->set_header_fields(frag_header);
340 return fragment;
341}
342
343template<class RDT, class LBT>
344void
346{
347 while (m_run_marker.load()) {
348 cleanup_check();
349 std::this_thread::sleep_for(std::chrono::milliseconds(50));
350 }
351}
352
353template<class RDT, class LBT>
354void
356{
357 while (m_run_marker.load()) {
358 periodic_data_transmission();
359 std::this_thread::sleep_for(std::chrono::milliseconds(m_periodic_data_transmission_ms));
360 }
361}
362
363template<class RDT, class LBT>
364void
367
368template<class RDT, class LBT>
369void
371{
372 auto size_guess = m_latency_buffer->occupancy();
373 if (size_guess > m_pop_limit_size) {
374 ++m_pop_reqs;
375 unsigned to_pop = m_pop_size_pct * m_latency_buffer->occupancy();
376
377 unsigned popped = 0;
378 for (size_t i = 0; i < to_pop; ++i) {
379 if (m_latency_buffer->front()->get_timestamp() < m_next_timestamp_to_record) {
380 m_latency_buffer->pop(1);
381 popped++;
382 } else {
383 break;
384 }
385 }
386 m_occupancy = m_latency_buffer->occupancy();
387 m_pops_count += popped;
388 m_error_registry->remove_errors_until(m_latency_buffer->front()->get_timestamp());
389 }
390 // Update hte oldest timestamp monitorable
391 m_oldest_timestamp = m_latency_buffer->front()->get_timestamp();
392 m_num_buffer_cleanups++;
393}
394
395template<class RDT, class LBT>
396void
398{
399 // At run stop, we wait until all waiting requests have either:
400 //
401 // 1. been serviced because an item past the end of the window arrived in the buffer
402 // 2. timed out by going past m_request_timeout_ms, and returned a partial fragment
403 while (m_run_marker.load()) {
404 if (m_waiting_requests.size() > 0) {
405
406 std::lock_guard<std::mutex> lock_guard(m_waiting_requests_lock);
407
408 auto last_frame = m_latency_buffer->back(); // NOLINT
409 uint64_t newest_ts = last_frame == nullptr ? std::numeric_limits<uint64_t>::min() // NOLINT(build/unsigned)
410 : last_frame->get_timestamp();
411
412 for (auto iter = m_waiting_requests.begin(); iter!= m_waiting_requests.end();) {
413 if((*iter).request.request_information.window_end < newest_ts) {
414 issue_request((*iter).request, true);
415 iter = m_waiting_requests.erase(iter);
416 }
417 else if (std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::high_resolution_clock::now() - (*iter).start_time).count() >= m_request_timeout_ms) {
418 issue_request((*iter).request, true);
419 if (m_warn_on_timeout) {
420 ers::warning(dunedaq::datahandlinglibs::VerboseRequestTimedOut(ERS_HERE, m_sourceid,
421 (*iter).request.trigger_number,
422 (*iter).request.sequence_number,
423 (*iter).request.run_number,
424 (*iter).request.request_information.window_begin,
425 (*iter).request.request_information.window_end,
426 (*iter).request.data_destination));
427 }
428 m_num_requests_bad++;
429 m_num_requests_timed_out++;
430 iter = m_waiting_requests.erase(iter);
431 }
432 else {
433 ++iter;
434 }
435 }
436 }
437 std::this_thread::sleep_for(std::chrono::milliseconds(1));
438 }
439}
440
441template<class RDT, class LBT>
442std::vector<std::pair<void*, size_t>>
444 uint64_t end_win_ts,
445 RequestResult& rres)
446{
447
448 TLOG_DEBUG(TLVL_WORK_STEPS) << "Looking for frags between " << start_win_ts << " and " << end_win_ts;
449
450 std::vector<std::pair<void*, size_t>> frag_pieces;
451 // Data availability is calculated here
452 auto front_element = m_latency_buffer->front(); // NOLINT
453 auto last_element = m_latency_buffer->back(); // NOLINT
454 uint64_t last_ts = front_element->get_timestamp(); // NOLINT(build/unsigned)
455 uint64_t newest_ts = last_element->get_timestamp(); // NOLINT(build/unsigned)
456
457 if (start_win_ts > newest_ts) {
458 // No element is as small as the start window-> request is far in the future
459 rres.result_code = ResultCode::kNotYet; // give it another chance
460 }
461 else if (end_win_ts < last_ts ) {
462 rres.result_code = ResultCode::kTooOld;
463 }
464 else {
465 RDT request_element = RDT();
466 auto start_timestamp = start_win_ts - (request_element.get_num_frames() * RDT::expected_tick_difference);
467 if (start_timestamp < last_ts)
468 start_timestamp = last_ts;
469 request_element.set_timestamp(start_timestamp);
470
471 auto start_iter = m_error_registry->has_error("MISSING_FRAMES")
472 ? m_latency_buffer->lower_bound(request_element, true)
473 : m_latency_buffer->lower_bound(request_element, false);
474 if (!start_iter.good()) {
475 // Accessor problem
476 rres.result_code = ResultCode::kNotFound;
477 }
478 else {
479 TLOG_DEBUG(TLVL_WORK_STEPS) << "Lower bound found " << start_iter->get_timestamp() << ", --> distance from window: "
480 << int64_t(start_win_ts) - int64_t(start_iter->get_timestamp()) ;
481 if (end_win_ts >= newest_ts) {
482 rres.result_code = ResultCode::kPartial;
483 }
484 else if (start_win_ts < last_ts) {
485 rres.result_code = ResultCode::kPartiallyOld;
486 }
487 else {
488 rres.result_code = ResultCode::kFound;
489 }
490
491 auto elements_handled = 0;
492
493 RDT* element = &(*start_iter);
494
495 while (start_iter.good() && element->get_timestamp() < end_win_ts) {
496 if ( element->get_timestamp() + element->get_num_frames() * RDT::expected_tick_difference <= start_win_ts) {
497 //TLOG() << "skip processing for current element " << element->get_timestamp() << ", out of readout window.";
498 }
499
500 else if ( element->get_num_frames()>1 &&
501 ((element->get_timestamp() < start_win_ts &&
502 element->get_timestamp() + element->get_num_frames() * RDT::expected_tick_difference > start_win_ts)
503 ||
504 element->get_timestamp() + element->get_num_frames() * RDT::expected_tick_difference >
505 end_win_ts)) {
506 //TLOG() << "We don't need the whole aggregated object (e.g.: superchunk)" ;
507 for (auto frame_iter = element->begin(); frame_iter != element->end(); frame_iter++) {
508 if (get_frame_iterator_timestamp(frame_iter) > (start_win_ts - RDT::expected_tick_difference)&&
509 get_frame_iterator_timestamp(frame_iter) < end_win_ts ) {
510 frag_pieces.emplace_back(
511 std::make_pair<void*, size_t>(static_cast<void*>(&(*frame_iter)), element->get_frame_size()));
512 }
513 }
514 }
515 else {
516 //TLOG() << "Add element " << element->get_timestamp();
517 // We are somewhere in the middle -> the whole aggregated object (e.g.: superchunk) can be copied
518 frag_pieces.emplace_back(
519 std::make_pair<void*, size_t>(static_cast<void*>((*start_iter).begin()), element->get_payload_size()));
520 }
521
522 elements_handled++;
523 ++start_iter;
524 element = &(*start_iter);
525 }
526 }
527 }
528 TLOG_DEBUG(TLVL_WORK_STEPS) << "*** Number of frames retrieved: " << frag_pieces.size();
529 return frag_pieces;
530}
531
532template<class RDT, class LBT>
535{
536 // Prepare response
537 RequestResult rres(ResultCode::kUnknown, dr);
538
539 // Prepare FragmentHeader and empty Fragment pieces list
540 auto frag_header = create_fragment_header(dr);
541 std::vector<std::pair<void*, size_t>> frag_pieces;
542 std::ostringstream oss;
543
544 //bool local_data_not_found_flag = false;
545 if (m_latency_buffer->occupancy() == 0) {
546 if (m_warn_about_empty_buffer) {
547 ers::warning(RequestOnEmptyBuffer(ERS_HERE, m_sourceid, "Data not found"));
548 }
549 frag_header.error_bits |= (0x1 << static_cast<size_t>(daqdataformats::FragmentErrorBits::kDataNotFound));
550 rres.result_code = ResultCode::kNotFound;
551 ++m_num_requests_bad;
552 }
553 else {
554 frag_pieces = get_fragment_pieces(dr.request_information.window_begin, dr.request_information.window_end, rres);
555
556 auto front_element = m_latency_buffer->front(); // NOLINT
557 auto last_element = m_latency_buffer->back(); // NOLINT
558 uint64_t last_ts = front_element->get_timestamp(); // NOLINT(build/unsigned)
559 uint64_t newest_ts = last_element->get_timestamp(); // NOLINT(build/unsigned)
560 TLOG_DEBUG(TLVL_WORK_STEPS) << "Data request for trig/seq_num=" << dr.trigger_number
561 << "." << dr.sequence_number << " and SourceID[" << m_sourceid << "] with"
562 << " Trigger TS=" << dr.trigger_timestamp
563 << " Oldest stored TS=" << last_ts
564 << " Newest stored TS=" << newest_ts
565 << " Start of window TS=" << dr.request_information.window_begin
566 << " End of window TS=" << dr.request_information.window_end
567 << " Latency buffer occupancy=" << m_latency_buffer->occupancy()
568 << " frag_pieces result_code=" << rres.result_code
569 << " number of frag_pieces=" << frag_pieces.size();
570
571 switch (rres.result_code) {
572 case ResultCode::kTooOld:
573 // return empty frag
574 ++m_num_requests_old_window;
575 ++m_num_requests_bad;
576 frag_header.error_bits |= (0x1 << static_cast<size_t>(daqdataformats::FragmentErrorBits::kDataNotFound));
577 break;
578 case ResultCode::kPartiallyOld:
579 ++m_num_requests_old_window;
580 ++m_num_requests_found;
581 frag_header.error_bits |= (0x1 << static_cast<size_t>(daqdataformats::FragmentErrorBits::kIncomplete));
582 frag_header.error_bits |= (0x1 << static_cast<size_t>(daqdataformats::FragmentErrorBits::kDataNotFound));
583 break;
584 case ResultCode::kFound:
585 ++m_num_requests_found;
586 break;
587 case ResultCode::kPartial:
588 frag_header.error_bits |= (0x1 << static_cast<size_t>(daqdataformats::FragmentErrorBits::kIncomplete));
589 ++m_num_requests_delayed;
590 break;
591 case ResultCode::kNotYet:
592 frag_header.error_bits |= (0x1 << static_cast<size_t>(daqdataformats::FragmentErrorBits::kDataNotFound));
593 ++m_num_requests_delayed;
594 break;
595 default:
596 // Unknown result of data search
597 ++m_num_requests_bad;
598 frag_header.error_bits |= (0x1 << static_cast<size_t>(daqdataformats::FragmentErrorBits::kDataNotFound));
599 }
600 }
601 // Create fragment from pieces
602 rres.fragment = std::make_unique<daqdataformats::Fragment>(frag_pieces);
603
604 // Set header
605 rres.fragment->set_header_fields(frag_header);
606
607 return rres;
608}
609
610} // namespace datahandlinglibs
611} // namespace dunedaq
#define ERS_HERE
const dunedaq::appmodel::LatencyBuffer * get_latency_buffer() const
Get "latency_buffer" relationship value.
const dunedaq::appmodel::RequestHandler * get_request_handler() const
Get "request_handler" relationship value.
uint32_t get_detector_id() const
Get "detector_id" attribute value.
const dunedaq::appmodel::DataHandlerConf * get_module_configuration() const
Get "module_configuration" relationship value.
uint32_t get_source_id() const
Get "source_id" attribute value.
uint32_t get_size() const
Get "size" attribute value.
const std::vector< const dunedaq::confmodel::Connection * > & get_outputs() const
Get "outputs" relationship value. Output connections from this module.
void conf(const dunedaq::appmodel::DataHandlerModule *)
virtual void periodic_data_transmission() override
Periodic data transmission - relevant for trigger in particular.
void issue_request(dfmessages::DataRequest datarequest, bool is_retry=false) override
Issue a data request to the request handler.
RequestResult data_request(dfmessages::DataRequest dr) override
void scrap(const appfwk::DAQModule::CommandData_t &) override
void record(const appfwk::DAQModule::CommandData_t &args) override
void stop(const appfwk::DAQModule::CommandData_t &)
void cleanup_check() override
Check if cleanup is necessary and execute it if necessary.
typename dunedaq::datahandlinglibs::RequestHandlerConcept< ReadoutType, LatencyBufferType >::RequestResult RequestResult
std::unique_ptr< daqdataformats::Fragment > create_empty_fragment(const dfmessages::DataRequest &dr)
std::vector< std::pair< void *, size_t > > get_fragment_pieces(uint64_t start_win_ts, uint64_t end_win_ts, RequestResult &rres)
void start(const appfwk::DAQModule::CommandData_t &)
void set_recording_status(Arg_ &&arg, Args_... args)
Base class for any user define issue.
Definition Issue.hpp:69
static int64_t now()
#define TLOG_DEBUG(lvl,...)
Definition Logging.hpp:112
#define TLOG(...)
Definition macro.hpp:22
@ kIncomplete
Only part of the requested data is present in the fragment.
@ kDataNotFound
The requested data was not found at all, so the fragment is empty.
The DUNE-DAQ namespace.
static std::shared_ptr< iomanager::SenderConcept< Datatype > > get_iom_sender(iomanager::ConnectionId const &id)
void warning(const Issue &issue)
Definition ers.hpp:115
void error(const Issue &issue)
Definition ers.hpp:81
timestamp_t window_end
End of the data collection window.
timestamp_t window_begin
Start of the data collection window.
This message represents a request for data sent to a single component of the DAQ.
sequence_number_t sequence_number
Sequence Number of the request.
trigger_number_t trigger_number
Trigger number the request corresponds to.
timestamp_t trigger_timestamp
Timestamp of trigger.