4namespace datahandlinglibs {
6template<
class RDT,
class LBT>
13 m_sourceid.subsystem = RDT::subsystem;
15 m_pop_limit_pct = reqh_conf->get_pop_limit_pct();
16 m_pop_size_pct = reqh_conf->get_pop_size_pct();
19 m_num_request_handling_threads = reqh_conf->get_handler_threads();
20 m_request_timeout_ms = reqh_conf->get_request_timeout();
23 if (output->get_data_type() ==
"Fragment") {
24 m_fragment_send_timeout_ms = output->get_send_timeout_ms();
27 m_frag_out_conn_ids.push_back(output->UID());
31 if (m_recording_configured ==
false) {
32 auto dr = reqh_conf->get_data_recorder();
34 m_output_file = dr->get_output_file();
35 if (remove(m_output_file.c_str()) == 0) {
36 TLOG_DEBUG(TLVL_WORK_STEPS) <<
"Removed existing output file from previous run: " << m_output_file << std::endl;
38 m_stream_buffer_size = dr->get_streaming_buffer_size();
39 m_buffered_writer.open(m_output_file, m_stream_buffer_size, dr->get_compression_algorithm(), dr->get_use_o_direct());
40 m_recording_configured =
true;
44 m_warn_on_timeout = reqh_conf->get_warn_on_timeout();
45 m_warn_about_empty_buffer = reqh_conf->get_warn_on_empty_buffer();
46 m_periodic_data_transmission_ms = reqh_conf->get_periodic_data_transmission_ms();
48 if (m_pop_limit_pct < 0.0f || m_pop_limit_pct > 1.0f || m_pop_size_pct < 0.0f || m_pop_size_pct > 1.0f) {
51 m_pop_limit_size = m_pop_limit_pct * m_buffer_capacity;
52 m_max_requested_elements = m_pop_limit_size - m_pop_limit_size * m_pop_size_pct;
55 m_recording_thread.set_name(
"recording", m_sourceid.id);
56 m_cleanup_thread.set_name(
"cleanup", m_sourceid.id);
57 m_periodic_transmission_thread.set_name(
"periodic", m_sourceid.id);
59 std::ostringstream oss;
60 oss <<
"RequestHandler configured. " << std::fixed << std::setprecision(2)
61 <<
"auto-pop limit: " << m_pop_limit_pct * 100.0f <<
"% "
62 <<
"auto-pop size: " << m_pop_size_pct * 100.0f <<
"% "
63 <<
"max requested elements: " << m_max_requested_elements;
67template<
class RDT,
class LBT>
71 if (m_buffered_writer.is_open()) {
72 m_buffered_writer.close();
76template<
class RDT,
class LBT>
81 m_num_requests_found = 0;
82 m_num_requests_bad = 0;
83 m_num_requests_old_window = 0;
84 m_num_requests_delayed = 0;
85 m_num_requests_uncategorized = 0;
86 m_num_buffer_cleanups = 0;
87 m_num_requests_timed_out = 0;
88 m_handled_requests = 0;
89 m_response_time_acc = 0;
92 m_payloads_written = 0;
95 m_t0 = std::chrono::high_resolution_clock::now();
103 for (
auto frag_out_conn : m_frag_out_conn_ids) {
105 if (sender !=
nullptr) {
106 bool is_ready = sender->is_ready_for_sending(std::chrono::milliseconds(100));
107 TLOG_DEBUG(0) <<
"The Fragment sender for " << frag_out_conn <<
" " << (is_ready ?
"is" :
"is not")
108 <<
" ready, my source_id is [" << m_sourceid <<
"]";
112 m_request_handler_thread_pool = std::make_unique<boost::asio::thread_pool>(m_num_request_handling_threads);
114 m_run_marker.store(
true);
116 if(m_periodic_data_transmission_ms > 0) {
120 m_waiting_queue_thread =
124template<
class RDT,
class LBT>
128 m_run_marker.store(
false);
129 while (!m_recording_thread.get_readiness()) {
130 std::this_thread::sleep_for(std::chrono::milliseconds(10));
132 while (!m_cleanup_thread.get_readiness()) {
133 std::this_thread::sleep_for(std::chrono::milliseconds(10));
135 while (!m_periodic_transmission_thread.get_readiness()) {
136 std::this_thread::sleep_for(std::chrono::milliseconds(10));
138 m_waiting_queue_thread.join();
139 m_request_handler_thread_pool->join();
142template<
class RDT,
class LBT>
148 int recording_time_sec = 1;
149 if (m_recording.load()) {
152 }
else if (!m_buffered_writer.is_open()) {
156 m_recording_thread.set_work(
158 TLOG() <<
"Start recording for " << duration <<
" second(s)" << std::endl;
159 m_recording.exchange(
true);
160 auto start_of_recording = std::chrono::high_resolution_clock::now();
161 auto current_time = start_of_recording;
162 m_next_timestamp_to_record = 0;
163 RDT element_to_search;
164 while (std::chrono::duration_cast<std::chrono::seconds>(current_time - start_of_recording).count() < duration) {
165 if (!m_cleanup_requested || (m_next_timestamp_to_record == 0)) {
166 if (m_next_timestamp_to_record == 0) {
167 auto front = m_latency_buffer->front();
168 m_next_timestamp_to_record = front ==
nullptr ? 0 : front->get_timestamp();
170 element_to_search.set_timestamp(m_next_timestamp_to_record);
171 size_t processed_chunks_in_loop = 0;
174 std::unique_lock<std::mutex> lock(m_cv_mutex);
175 m_cv.wait(lock, [&] {
return !m_cleanup_requested; });
176 m_requests_running++;
179 auto chunk_iter = m_latency_buffer->lower_bound(element_to_search,
true);
180 auto end = m_latency_buffer->end();
182 std::lock_guard<std::mutex> lock(m_cv_mutex);
183 m_requests_running--;
187 for (; chunk_iter != end && chunk_iter.good() && processed_chunks_in_loop < 1000;) {
188 if ((*chunk_iter).get_timestamp() >= m_next_timestamp_to_record) {
189 if (!m_buffered_writer.write(
reinterpret_cast<char*
>(chunk_iter->begin()),
190 chunk_iter->get_payload_size())) {
193 m_payloads_written++;
194 m_bytes_written += chunk_iter->get_payload_size();
195 processed_chunks_in_loop++;
196 m_next_timestamp_to_record = (*chunk_iter).get_timestamp() +
197 RDT::expected_tick_difference * (*chunk_iter).get_num_frames();
202 current_time = std::chrono::high_resolution_clock::now();
204 m_next_timestamp_to_record = std::numeric_limits<uint64_t>::max();
206 TLOG() <<
"Stop recording" << std::endl;
207 m_recording.exchange(
false);
208 m_buffered_writer.flush();
213template<
class RDT,
class LBT>
217 std::unique_lock<std::mutex> lock(m_cv_mutex);
218 if (m_latency_buffer->occupancy() > m_pop_limit_size && !m_cleanup_requested.exchange(
true)) {
219 m_cv.wait(lock, [&] {
return m_requests_running == 0; });
221 m_cleanup_requested =
false;
226template<
class RDT,
class LBT>
230 boost::asio::post(*m_request_handler_thread_pool, [&, datarequest, is_retry]() {
231 auto t_req_begin = std::chrono::high_resolution_clock::now();
233 std::unique_lock<std::mutex> lock(m_cv_mutex);
234 m_cv.wait(lock, [&] {
return !m_cleanup_requested; });
235 m_requests_running++;
238 auto result = data_request(datarequest);
240 std::lock_guard<std::mutex> lock(m_cv_mutex);
241 m_requests_running--;
244 if ((result.result_code == ResultCode::kNotYet || result.result_code == ResultCode::kPartial) && m_request_timeout_ms >0 && is_retry ==
false) {
245 TLOG_DEBUG(TLVL_WORK_STEPS) <<
"Re-queue request. "
246 <<
" with timestamp=" << result.data_request.trigger_timestamp;
247 std::lock_guard<std::mutex> wait_lock_guard(m_waiting_requests_lock);
248 m_waiting_requests.push_back(
RequestElement(datarequest, std::chrono::high_resolution_clock::now()));
252 TLOG_DEBUG(TLVL_WORK_STEPS) <<
"Sending fragment with trigger/sequence_number "
253 << result.fragment->get_trigger_number() <<
"."
254 << result.fragment->get_sequence_number() <<
", run number "
255 << result.fragment->get_run_number() <<
", and DetectorID "
256 << result.fragment->get_detector_id() <<
", and SourceID "
257 << result.fragment->get_element_id() <<
", and size "
258 << result.fragment->get_size() <<
", and result code "
259 << result.result_code;
262 ->send(std::move(result.fragment), std::chrono::milliseconds(m_fragment_send_timeout_ms));
269 auto t_req_end = std::chrono::high_resolution_clock::now();
270 auto us_req_took = std::chrono::duration_cast<std::chrono::microseconds>(t_req_end - t_req_begin);
271 TLOG_DEBUG(TLVL_WORK_STEPS) <<
"Responding to data request took: " << us_req_took.count() <<
"[us]";
272 m_response_time_acc.fetch_add(us_req_took.count());
273 if ( us_req_took.count() > m_response_time_max.load() )
274 m_response_time_max.store(us_req_took.count());
275 if ( us_req_took.count() < m_response_time_min.load() )
276 m_response_time_min.store(us_req_took.count());
277 m_handled_requests++;
281template<
class RDT,
class LBT>
287 info.set_num_requests_handled(m_handled_requests.exchange(0));
288 info.set_num_requests_found(m_num_requests_found.exchange(0));
289 info.set_num_requests_bad(m_num_requests_bad.exchange(0));
290 info.set_num_requests_old_window(m_num_requests_old_window.exchange(0));
291 info.set_num_requests_delayed(m_num_requests_delayed.exchange(0));
292 info.set_num_requests_uncategorized(m_num_requests_uncategorized.exchange(0));
293 info.set_num_requests_timed_out(m_num_requests_timed_out.exchange(0));
294 info.set_num_requests_waiting(m_waiting_requests.size());
296 int new_pop_reqs = 0;
297 int new_pop_count = 0;
298 int new_occupancy = 0;
299 info.set_tot_request_response_time(m_response_time_acc.exchange(0));
300 info.set_max_request_response_time(m_response_time_max.exchange(0));
301 info.set_min_request_response_time(m_response_time_min.exchange(std::numeric_limits<int>::max()));
302 auto now = std::chrono::high_resolution_clock::now();
303 new_pop_reqs = m_pop_reqs.exchange(0);
304 new_pop_count = m_pops_count.exchange(0);
305 new_occupancy = m_occupancy;
306 double seconds = std::chrono::duration_cast<std::chrono::microseconds>(
now - m_t0).count() / 1000000.;
307 TLOG_DEBUG(TLVL_HOUSEKEEPING) <<
"Cleanup request rate: " << new_pop_reqs / seconds / 1. <<
" [Hz]"
308 <<
" Dropped: " << new_pop_count <<
" Occupancy: " << new_occupancy;
310 if (info.num_requests_handled() > 0) {
312 info.set_avg_request_response_time(info.tot_request_response_time() / info.num_requests_handled());
313 TLOG_DEBUG(TLVL_HOUSEKEEPING) <<
"Completed requests: " << info.num_requests_handled()
314 <<
" | Avarage response time: " << info.avg_request_response_time() <<
"[us]"
315 <<
" | Periodic sends: " << info.num_periodic_sent();
320 info.set_num_buffer_cleanups(m_num_buffer_cleanups.exchange(0));
321 info.set_num_periodic_sent(m_num_periodic_sent.exchange(0));
322 info.set_num_periodic_send_failed(m_num_periodic_send_failed.exchange(0));
324 this->publish(std::move(info));
330 this->publish(std::move(rinfo));
334template<
class RDT,
class LBT>
335std::unique_ptr<daqdataformats::Fragment>
338 auto frag_header = create_fragment_header(dr);
340 auto fragment = std::make_unique<daqdataformats::Fragment>(std::vector<std::pair<void*, size_t>>());
341 fragment->set_header_fields(frag_header);
345template<
class RDT,
class LBT>
349 while (m_run_marker.load()) {
351 std::this_thread::sleep_for(std::chrono::milliseconds(50));
355template<
class RDT,
class LBT>
359 while (m_run_marker.load()) {
360 periodic_data_transmission();
361 std::this_thread::sleep_for(std::chrono::milliseconds(m_periodic_data_transmission_ms));
365template<
class RDT,
class LBT>
370template<
class RDT,
class LBT>
374 auto size_guess = m_latency_buffer->occupancy();
375 if (size_guess > m_pop_limit_size) {
377 unsigned to_pop = m_pop_size_pct * m_latency_buffer->occupancy();
380 for (
size_t i = 0; i < to_pop; ++i) {
381 if (m_latency_buffer->front()->get_timestamp() < m_next_timestamp_to_record) {
382 m_latency_buffer->pop(1);
388 m_occupancy = m_latency_buffer->occupancy();
389 m_pops_count += popped;
390 m_error_registry->remove_errors_until(m_latency_buffer->front()->get_timestamp());
392 m_num_buffer_cleanups++;
395template<
class RDT,
class LBT>
403 while (m_run_marker.load()) {
404 if (m_waiting_requests.size() > 0) {
406 std::lock_guard<std::mutex> lock_guard(m_waiting_requests_lock);
408 auto last_frame = m_latency_buffer->back();
409 uint64_t newest_ts = last_frame ==
nullptr ? std::numeric_limits<uint64_t>::min()
410 : last_frame->get_timestamp();
412 for (
auto iter = m_waiting_requests.begin(); iter!= m_waiting_requests.end();) {
413 if((*iter).request.request_information.window_end < newest_ts) {
414 issue_request((*iter).request,
true);
415 iter = m_waiting_requests.erase(iter);
417 else if (std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::high_resolution_clock::now() - (*iter).start_time).count() >= m_request_timeout_ms) {
418 issue_request((*iter).request,
true);
419 if (m_warn_on_timeout) {
421 (*iter).request.trigger_number,
422 (*iter).request.sequence_number,
423 (*iter).request.run_number,
424 (*iter).request.request_information.window_begin,
425 (*iter).request.request_information.window_end,
426 (*iter).request.data_destination));
428 m_num_requests_bad++;
429 m_num_requests_timed_out++;
430 iter = m_waiting_requests.erase(iter);
437 std::this_thread::sleep_for(std::chrono::milliseconds(1));
441template<
class RDT,
class LBT>
442std::vector<std::pair<void*, size_t>>
448 TLOG_DEBUG(TLVL_WORK_STEPS) <<
"Looking for frags between " << start_win_ts <<
" and " << end_win_ts;
450 std::vector<std::pair<void*, size_t>> frag_pieces;
452 auto front_element = m_latency_buffer->front();
453 auto last_element = m_latency_buffer->back();
454 uint64_t last_ts = front_element->get_timestamp();
455 uint64_t newest_ts = last_element->get_timestamp();
457 if (start_win_ts > newest_ts) {
459 rres.result_code = ResultCode::kNotYet;
461 else if (end_win_ts < last_ts ) {
462 rres.result_code = ResultCode::kTooOld;
465 RDT request_element =
RDT();
466 request_element.set_timestamp(start_win_ts-(request_element.get_num_frames() * RDT::expected_tick_difference));
469 auto start_iter = m_error_registry->has_error(
"MISSING_FRAMES")
470 ? m_latency_buffer->lower_bound(request_element,
true)
471 : m_latency_buffer->lower_bound(request_element,
false);
472 if (!start_iter.good()) {
474 rres.result_code = ResultCode::kNotFound;
477 TLOG_DEBUG(TLVL_WORK_STEPS) <<
"Lower bound found " << start_iter->get_timestamp() <<
", --> distance from window: "
478 << int64_t(start_win_ts) - int64_t(start_iter->get_timestamp()) ;
479 if (end_win_ts > newest_ts) {
480 rres.result_code = ResultCode::kPartial;
482 else if (start_win_ts < last_ts) {
483 rres.result_code = ResultCode::kPartiallyOld;
486 rres.result_code = ResultCode::kFound;
489 auto elements_handled = 0;
491 RDT* element = &(*start_iter);
493 while (start_iter.good() && element->get_timestamp() < end_win_ts) {
494 if ( element->get_timestamp() + element->get_num_frames() * RDT::expected_tick_difference <= start_win_ts) {
498 else if ( element->get_num_frames()>1 &&
499 ((element->get_timestamp() < start_win_ts &&
500 element->get_timestamp() + element->get_num_frames() * RDT::expected_tick_difference > start_win_ts)
502 element->get_timestamp() + element->get_num_frames() * RDT::expected_tick_difference >
505 for (
auto frame_iter = element->begin(); frame_iter != element->end(); frame_iter++) {
508 frag_pieces.emplace_back(
509 std::make_pair<void*, size_t>(
static_cast<void*
>(&(*frame_iter)), element->get_frame_size()));
516 frag_pieces.emplace_back(
517 std::make_pair<void*, size_t>(
static_cast<void*
>((*start_iter).begin()), element->get_payload_size()));
522 element = &(*start_iter);
526 TLOG_DEBUG(TLVL_WORK_STEPS) <<
"*** Number of frames retrieved: " << frag_pieces.size();
530template<
class RDT,
class LBT>
538 auto frag_header = create_fragment_header(dr);
539 std::vector<std::pair<void*, size_t>> frag_pieces;
540 std::ostringstream oss;
543 if (m_latency_buffer->occupancy() == 0) {
544 if (m_warn_about_empty_buffer) {
548 rres.result_code = ResultCode::kNotFound;
549 ++m_num_requests_bad;
554 auto front_element = m_latency_buffer->front();
555 auto last_element = m_latency_buffer->back();
556 uint64_t last_ts = front_element->get_timestamp();
557 uint64_t newest_ts = last_element->get_timestamp();
559 <<
"." << dr.
sequence_number <<
" and SourceID[" << m_sourceid <<
"] with"
561 <<
" Oldest stored TS=" << last_ts
562 <<
" Newest stored TS=" << newest_ts
565 <<
" Latency buffer occupancy=" << m_latency_buffer->occupancy()
566 <<
" frag_pieces result_code=" << rres.result_code
567 <<
" number of frag_pieces=" << frag_pieces.size();
569 switch (rres.result_code) {
570 case ResultCode::kTooOld:
572 ++m_num_requests_old_window;
573 ++m_num_requests_bad;
576 case ResultCode::kPartiallyOld:
577 ++m_num_requests_old_window;
578 ++m_num_requests_found;
582 case ResultCode::kFound:
583 ++m_num_requests_found;
585 case ResultCode::kPartial:
587 ++m_num_requests_delayed;
589 case ResultCode::kNotYet:
591 ++m_num_requests_delayed;
595 ++m_num_requests_bad;
600 rres.fragment = std::make_unique<daqdataformats::Fragment>(frag_pieces);
603 rres.fragment->set_header_fields(frag_header);
const dunedaq::appmodel::LatencyBuffer * get_latency_buffer() const
Get "latency_buffer" relationship value.
const dunedaq::appmodel::RequestHandler * get_request_handler() const
Get "request_handler" relationship value.
uint32_t get_detector_id() const
Get "detector_id" attribute value.
const dunedaq::appmodel::DataHandlerConf * get_module_configuration() const
Get "module_configuration" relationship value.
uint32_t get_source_id() const
Get "source_id" attribute value.
uint32_t get_size() const
Get "size" attribute value.
const std::vector< const dunedaq::confmodel::Connection * > & get_outputs() const
Get "outputs" relationship value. Output connections from this module.
void conf(const dunedaq::appmodel::DataHandlerModule *)
virtual void periodic_data_transmission() override
Periodic data transmission - relevant for trigger in particular.
void issue_request(dfmessages::DataRequest datarequest, bool is_retry=false) override
Issue a data request to the request handler.
virtual void generate_opmon_data() override
RequestResult data_request(dfmessages::DataRequest dr) override
void check_waiting_requests()
void record(const nlohmann::json &args) override
void cleanup_check() override
Check if cleanup is necessary and execute it if necessary.
typename dunedaq::datahandlinglibs::RequestHandlerConcept< ReadoutType, LatencyBufferType >::RequestResult RequestResult
std::unique_ptr< daqdataformats::Fragment > create_empty_fragment(const dfmessages::DataRequest &dr)
void periodic_data_transmissions()
std::vector< std::pair< void *, size_t > > get_fragment_pieces(uint64_t start_win_ts, uint64_t end_win_ts, RequestResult &rres)
void scrap(const nlohmann::json &) override
void start(const nlohmann::json &)
void stop(const nlohmann::json &)
void set_packets_recorded(::uint64_t value)
void set_bytes_recorded(::uint64_t value)
void set_recording_status(Arg_ &&arg, Args_... args)
Base class for any user define issue.
#define TLOG_DEBUG(lvl,...)
uint64_t get_frame_iterator_timestamp(T iter)
static std::shared_ptr< iomanager::SenderConcept< Datatype > > get_iom_sender(iomanager::ConnectionId const &id)
void warning(const Issue &issue)
void error(const Issue &issue)
This message represents a request for data sent to a single component of the DAQ.
sequence_number_t sequence_number
Sequence Number of the request.
std::string data_destination
ComponentRequest request_information
trigger_number_t trigger_number
Trigger number the request corresponds to.
timestamp_t trigger_timestamp
Timestamp of trigger.