6template<
class RDT,
class LBT>
13 m_sourceid.subsystem = RDT::subsystem;
17 m_num_request_handling_threads = reqh_conf->get_handler_threads();
18 m_request_timeout_ms = reqh_conf->get_request_timeout();
21 if (output->get_data_type() ==
"Fragment") {
22 m_fragment_send_timeout_ms = output->get_send_timeout_ms();
25 m_frag_out_conn_ids.push_back(output->UID());
29 if (m_recording_configured ==
false) {
30 auto dr = reqh_conf->get_data_recorder();
32 m_output_file = dr->get_output_file();
33 if (remove(m_output_file.c_str()) == 0) {
34 TLOG_DEBUG(TLVL_WORK_STEPS) <<
"Removed existing output file from previous run: " << m_output_file << std::endl;
36 m_stream_buffer_size = dr->get_streaming_buffer_size();
37 m_buffered_writer.open(
38 m_output_file, m_stream_buffer_size, dr->get_compression_algorithm(), dr->get_use_o_direct());
39 m_recording_configured =
true;
43 m_warn_on_timeout = reqh_conf->get_warn_on_timeout();
44 m_warn_about_empty_buffer = reqh_conf->get_warn_on_empty_buffer();
45 m_periodic_data_transmission_ms = reqh_conf->get_periodic_data_transmission_ms();
47 m_recording_thread.set_name(
"recording", m_sourceid.id);
48 m_cleanup_thread.set_name(
"cleanup", m_sourceid.id);
49 m_periodic_transmission_thread.set_name(
"periodic", m_sourceid.id);
51 std::ostringstream oss;
52 oss <<
"RequestHandler configured. ";
56template<
class RDT,
class LBT>
60 if (m_buffered_writer.is_open()) {
61 m_buffered_writer.close();
65template<
class RDT,
class LBT>
70 m_num_requests_found = 0;
71 m_num_requests_bad = 0;
72 m_num_requests_old_window = 0;
73 m_num_requests_delayed = 0;
74 m_num_requests_uncategorized = 0;
75 m_num_buffer_cleanups = 0;
76 m_num_requests_timed_out = 0;
77 m_handled_requests = 0;
78 m_response_time_acc = 0;
81 m_payloads_written = 0;
84 m_t0 = std::chrono::high_resolution_clock::now();
92 for (
auto frag_out_conn : m_frag_out_conn_ids) {
94 if (sender !=
nullptr) {
95 bool is_ready = sender->is_ready_for_sending(std::chrono::milliseconds(100));
96 TLOG_DEBUG(0) <<
"The Fragment sender for " << frag_out_conn <<
" " << (is_ready ?
"is" :
"is not")
97 <<
" ready, my source_id is [" << m_sourceid <<
"]";
101 m_request_handler_thread_pool = std::make_unique<boost::asio::thread_pool>(m_num_request_handling_threads);
103 m_run_marker.store(
true);
105 if (m_periodic_data_transmission_ms > 0) {
112template<
class RDT,
class LBT>
116 m_run_marker.store(
false);
117 while (!m_recording_thread.get_readiness()) {
118 std::this_thread::sleep_for(std::chrono::milliseconds(10));
120 while (!m_cleanup_thread.get_readiness()) {
121 std::this_thread::sleep_for(std::chrono::milliseconds(10));
123 while (!m_periodic_transmission_thread.get_readiness()) {
124 std::this_thread::sleep_for(std::chrono::milliseconds(10));
126 TLOG() <<
"Latency buffer occupancy at stop: " << m_latency_buffer->occupancy();
127 m_waiting_queue_thread.join();
128 m_request_handler_thread_pool->join();
131template<
class RDT,
class LBT>
135 ers::error(datahandlinglibs::CommandError(
ERS_HERE, m_sourceid,
"DLH is not configured for recording"));
139template<
class RDT,
class LBT>
143 std::unique_lock<std::mutex> lock(m_cv_mutex);
144 if (!m_pop_list.empty() && !m_cleanup_requested.exchange(
true)) {
145 m_cv.wait(lock, [&] {
return m_requests_running == 0; });
147 m_cleanup_requested =
false;
152template<
class RDT,
class LBT>
156 boost::asio::post(*m_request_handler_thread_pool, [&, datarequest, is_retry]() {
157 auto t_req_begin = std::chrono::high_resolution_clock::now();
159 std::unique_lock<std::mutex> lock(m_cv_mutex);
160 m_cv.wait(lock, [&] {
return !m_cleanup_requested; });
161 m_requests_running++;
164 auto result = data_request(datarequest);
166 std::lock_guard<std::mutex> lock(m_cv_mutex);
167 m_requests_running--;
170 if ((result.result_code == ResultCode::kNotYet || result.result_code == ResultCode::kPartial) &&
171 m_request_timeout_ms > 0 && is_retry ==
false) {
172 TLOG_DEBUG(TLVL_WORK_STEPS) <<
"Re-queue request. "
173 <<
" with timestamp=" << result.data_request.trigger_timestamp;
174 std::lock_guard<std::mutex> wait_lock_guard(m_waiting_requests_lock);
175 m_waiting_requests.push_back(
RequestElement(datarequest, std::chrono::high_resolution_clock::now()));
178 TLOG_DEBUG(TLVL_WORK_STEPS) <<
"Sending fragment with trigger/sequence_number "
179 << result.fragment->get_trigger_number() <<
"."
180 << result.fragment->get_sequence_number() <<
", run number "
181 << result.fragment->get_run_number() <<
", and DetectorID "
182 << result.fragment->get_detector_id() <<
", and SourceID "
183 << result.fragment->get_element_id() <<
", and size " << result.fragment->get_size()
184 <<
", and result code " << result.result_code;
187 ->send(std::move(result.fragment), std::chrono::milliseconds(m_fragment_send_timeout_ms));
194 auto t_req_end = std::chrono::high_resolution_clock::now();
195 auto us_req_took = std::chrono::duration_cast<std::chrono::microseconds>(t_req_end - t_req_begin);
196 TLOG_DEBUG(TLVL_WORK_STEPS) <<
"Responding to data request took: " << us_req_took.count() <<
"[us]";
197 m_response_time_acc.fetch_add(us_req_took.count());
198 if (us_req_took.count() > m_response_time_max.load())
199 m_response_time_max.store(us_req_took.count());
200 if (us_req_took.count() < m_response_time_min.load())
201 m_response_time_min.store(us_req_took.count());
202 m_handled_requests++;
206template<
class RDT,
class LBT>
212 info.set_num_requests_handled(m_handled_requests.exchange(0));
213 info.set_num_requests_found(m_num_requests_found.exchange(0));
214 info.set_num_requests_bad(m_num_requests_bad.exchange(0));
215 info.set_num_requests_old_window(m_num_requests_old_window.exchange(0));
216 info.set_num_requests_delayed(m_num_requests_delayed.exchange(0));
217 info.set_num_requests_uncategorized(m_num_requests_uncategorized.exchange(0));
218 info.set_num_requests_timed_out(m_num_requests_timed_out.exchange(0));
219 info.set_num_requests_waiting(m_waiting_requests.size());
221 int new_pop_reqs = 0;
222 int new_pop_count = 0;
223 int new_occupancy = 0;
224 info.set_tot_request_response_time(m_response_time_acc.exchange(0));
225 info.set_max_request_response_time(m_response_time_max.exchange(0));
226 info.set_min_request_response_time(m_response_time_min.exchange(std::numeric_limits<int>::max()));
227 auto now = std::chrono::high_resolution_clock::now();
228 new_pop_reqs = m_pop_reqs.exchange(0);
229 new_pop_count = m_pops_count.exchange(0);
230 new_occupancy = m_occupancy;
231 double seconds = std::chrono::duration_cast<std::chrono::microseconds>(
now - m_t0).count() / 1000000.;
232 TLOG_DEBUG(TLVL_HOUSEKEEPING) <<
"Cleanup request rate: " << new_pop_reqs / seconds / 1. <<
" [Hz]"
233 <<
" Dropped: " << new_pop_count <<
" Occupancy: " << new_occupancy;
235 if (info.num_requests_handled() > 0) {
237 info.set_avg_request_response_time(info.tot_request_response_time() / info.num_requests_handled());
238 TLOG_DEBUG(TLVL_HOUSEKEEPING) <<
"Completed requests: " << info.num_requests_handled()
239 <<
" | Avarage response time: " << info.avg_request_response_time() <<
"[us]"
240 <<
" | Periodic sends: " << info.num_periodic_sent();
245 info.set_num_buffer_cleanups(m_num_buffer_cleanups.exchange(0));
246 info.set_num_periodic_sent(m_num_periodic_sent.exchange(0));
247 info.set_num_periodic_send_failed(m_num_periodic_send_failed.exchange(0));
249 this->publish(std::move(info));
255 this->publish(std::move(rinfo));
258template<
class RDT,
class LBT>
259std::unique_ptr<daqdataformats::Fragment>
262 auto frag_header = create_fragment_header(dr);
264 auto fragment = std::make_unique<daqdataformats::Fragment>(std::vector<std::pair<void*, size_t>>());
265 fragment->set_header_fields(frag_header);
269template<
class RDT,
class LBT>
273 while (m_run_marker.load()) {
275 std::this_thread::sleep_for(std::chrono::milliseconds(50));
279template<
class RDT,
class LBT>
283 while (m_run_marker.load()) {
284 periodic_data_transmission();
285 std::this_thread::sleep_for(std::chrono::milliseconds(m_periodic_data_transmission_ms));
289template<
class RDT,
class LBT>
295template<
class RDT,
class LBT>
302 std::lock_guard<std::mutex> lk(m_pop_list_mutex);
303 while (!m_pop_list.empty() && m_latency_buffer->occupancy() > 1) {
304 auto ts = m_latency_buffer->front()->get_timestamp();
305 if (m_pop_list.count(ts)) {
306 m_latency_buffer->pop(1);
308 m_pop_list.erase(ts);
314 m_occupancy = m_latency_buffer->occupancy();
315 m_pops_count += popped;
316 m_error_registry->remove_errors_until(m_latency_buffer->front()->get_timestamp());
319 m_oldest_timestamp = m_latency_buffer->front()->get_timestamp();
320 m_num_buffer_cleanups++;
323template<
class RDT,
class LBT>
331 while (m_run_marker.load()) {
332 if (m_waiting_requests.size() > 0) {
334 std::lock_guard<std::mutex> lock_guard(m_waiting_requests_lock);
336 auto last_frame = m_latency_buffer->back();
337 uint64_t newest_ts = last_frame ==
nullptr ? std::numeric_limits<uint64_t>::min()
338 : last_frame->get_timestamp();
340 for (
auto iter = m_waiting_requests.begin(); iter != m_waiting_requests.end();) {
341 if ((*iter).request.request_information.window_end <= newest_ts) {
342 issue_request((*iter).request,
true);
343 iter = m_waiting_requests.erase(iter);
344 }
else if (std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::high_resolution_clock::now() -
346 .count() >= m_request_timeout_ms) {
347 issue_request((*iter).request,
true);
348 if (m_warn_on_timeout) {
350 dunedaq::datahandlinglibs::VerboseRequestTimedOut(
ERS_HERE,
352 (*iter).request.trigger_number,
353 (*iter).request.sequence_number,
354 (*iter).request.run_number,
355 (*iter).request.request_information.window_begin,
356 (*iter).request.request_information.window_end,
357 (*iter).request.data_destination));
359 m_num_requests_bad++;
360 m_num_requests_timed_out++;
361 iter = m_waiting_requests.erase(iter);
367 std::this_thread::sleep_for(std::chrono::milliseconds(1));
371template<
class RDT,
class LBT>
372std::vector<std::pair<void*, size_t>>
376 TLOG_DEBUG(TLVL_WORK_STEPS) <<
"Looking for frags between " << start_win_ts <<
" and " << end_win_ts;
378 std::vector<std::pair<void*, size_t>> frag_pieces;
380 auto front_element = m_latency_buffer->front();
381 auto last_element = m_latency_buffer->back();
382 uint64_t last_ts = front_element->get_timestamp();
383 uint64_t newest_ts = last_element->get_timestamp();
385 if (start_win_ts > newest_ts) {
387 rres.result_code = ResultCode::kNotYet;
388 }
else if (end_win_ts <= last_ts) {
389 rres.result_code = ResultCode::kTooOld;
391 RDT request_element =
RDT();
392 auto start_timestamp = start_win_ts;
393 request_element.set_timestamp(start_timestamp);
395 auto start_iter = m_latency_buffer->begin();
397 if (!start_iter.good()) {
399 rres.result_code = ResultCode::kNotFound;
401 TLOG_DEBUG(TLVL_WORK_STEPS) <<
"Lower bound found " << start_iter->get_timestamp()
402 <<
", --> distance from window: "
403 << int64_t(start_win_ts) - int64_t(start_iter->get_timestamp());
405 rres.result_code = ResultCode::kFound;
407 auto elements_handled = 0;
409 RDT* element = &(*start_iter);
411 while (start_iter.good() && element->get_timestamp() <= end_win_ts) {
412 std::lock_guard<std::mutex> lk(m_pop_list_mutex);
413 if (m_pop_list.count(element->get_timestamp()) ) {
414 TLOG_DEBUG(50) <<
"skip processing for current element " << element->get_timestamp()
415 <<
", already included in trigger.";
417 else if (element->get_timestamp() < start_win_ts) {
418 TLOG_DEBUG(50) <<
"skip processing for current element " << element->get_timestamp()
419 <<
", out of readout window.";
425 frag_pieces.emplace_back(
426 std::make_pair<void*, size_t>(
static_cast<void*
>((*start_iter).begin()), element->get_payload_size()));
428 m_pop_list.insert(element->get_timestamp());
433 element = &(*start_iter);
437 TLOG_DEBUG(TLVL_WORK_STEPS) <<
"*** Number of frames retrieved: " << frag_pieces.size();
441template<
class RDT,
class LBT>
449 auto frag_header = create_fragment_header(dr);
450 std::vector<std::pair<void*, size_t>> frag_pieces;
451 std::ostringstream oss;
454 if (m_latency_buffer->occupancy() == 0) {
455 if (m_warn_about_empty_buffer) {
459 rres.result_code = ResultCode::kNotFound;
460 ++m_num_requests_bad;
464 auto front_element = m_latency_buffer->front();
465 auto last_element = m_latency_buffer->back();
466 uint64_t last_ts = front_element->get_timestamp();
467 uint64_t newest_ts = last_element->get_timestamp();
469 <<
" and SourceID[" << m_sourceid <<
"] with" <<
" Trigger TS=" << dr.
trigger_timestamp
470 <<
" Oldest stored TS=" << last_ts <<
" Newest stored TS=" << newest_ts
473 <<
" Latency buffer occupancy=" << m_latency_buffer->occupancy()
474 <<
" frag_pieces result_code=" << rres.result_code
475 <<
" number of frag_pieces=" << frag_pieces.size();
477 switch (rres.result_code) {
478 case ResultCode::kTooOld:
480 ++m_num_requests_old_window;
481 ++m_num_requests_bad;
484 case ResultCode::kPartiallyOld:
485 ++m_num_requests_old_window;
486 ++m_num_requests_found;
490 case ResultCode::kFound:
491 ++m_num_requests_found;
493 case ResultCode::kPartial:
495 ++m_num_requests_delayed;
497 case ResultCode::kNotYet:
499 ++m_num_requests_delayed;
503 ++m_num_requests_bad;
508 rres.fragment = std::make_unique<daqdataformats::Fragment>(frag_pieces);
511 rres.fragment->set_header_fields(frag_header);
const dunedaq::appmodel::LatencyBuffer * get_latency_buffer() const
Get "latency_buffer" relationship value.
const dunedaq::appmodel::RequestHandler * get_request_handler() const
Get "request_handler" relationship value.
uint32_t get_detector_id() const
Get "detector_id" attribute value.
const dunedaq::appmodel::DataHandlerConf * get_module_configuration() const
Get "module_configuration" relationship value.
uint32_t get_source_id() const
Get "source_id" attribute value.
uint32_t get_size() const
Get "size" attribute value.
const std::vector< const dunedaq::confmodel::Connection * > & get_outputs() const
Get "outputs" relationship value. Output connections from this module.
void set_packets_recorded(::uint64_t value)
void set_bytes_recorded(::uint64_t value)
void set_recording_status(Arg_ &&arg, Args_... args)
void start(const appfwk::DAQModule::CommandData_t &)
typename dunedaq::datahandlinglibs::RequestHandlerConcept< ReadoutType, LatencyBufferType >::RequestResult RequestResult
RequestResult data_request(dfmessages::DataRequest dr) override
virtual void generate_opmon_data() override
void conf(const dunedaq::appmodel::DataHandlerModule *)
std::unique_ptr< daqdataformats::Fragment > create_empty_fragment(const dfmessages::DataRequest &dr)
void issue_request(dfmessages::DataRequest datarequest, bool is_retry=false) override
Issue a data request to the request handler.
void scrap(const appfwk::DAQModule::CommandData_t &) override
void cleanup_check() override
Check if cleanup is necessary and execute it if necessary.
void check_waiting_requests()
void stop(const appfwk::DAQModule::CommandData_t &)
std::vector< std::pair< void *, size_t > > get_fragment_pieces(uint64_t start_win_ts, uint64_t end_win_ts, RequestResult &rres)
void record(const appfwk::DAQModule::CommandData_t &args) override
void periodic_data_transmissions()
virtual void periodic_data_transmission() override
Periodic data transmission - relevant for trigger in particular.
Base class for any user define issue.
#define TLOG_DEBUG(lvl,...)
static std::shared_ptr< iomanager::SenderConcept< Datatype > > get_iom_sender(iomanager::ConnectionId const &id)
void warning(const Issue &issue)
void error(const Issue &issue)
This message represents a request for data sent to a single component of the DAQ.
sequence_number_t sequence_number
Sequence Number of the request.
std::string data_destination
ComponentRequest request_information
trigger_number_t trigger_number
Trigger number the request corresponds to.
timestamp_t trigger_timestamp
Timestamp of trigger.