6template<
class RDT,
class LBT>
11 auto reqh_conf =
conf->get_module_configuration()->get_request_handler();
22 for (
auto output :
conf->get_outputs()) {
23 if (output->get_data_type() ==
"Fragment") {
32 auto dr = reqh_conf->get_data_recorder();
48 if (m_pop_limit_pct < 0.0f || m_pop_limit_pct > 1.0f || m_pop_size_pct < 0.0f || m_pop_size_pct > 1.0f) {
58 std::ostringstream oss;
59 oss <<
"RequestHandler configured. " << std::fixed << std::setprecision(2)
65template<
class RDT,
class LBT>
74template<
class RDT,
class LBT>
93 m_t0 = std::chrono::high_resolution_clock::now();
103 if (sender !=
nullptr) {
104 bool is_ready = sender->is_ready_for_sending(std::chrono::milliseconds(100));
105 TLOG_DEBUG(0) <<
"The Fragment sender for " << frag_out_conn <<
" " << (is_ready ?
"is" :
"is not")
106 <<
" ready, my source_id is [" <<
m_sourceid <<
"]";
122template<
class RDT,
class LBT>
128 std::this_thread::sleep_for(std::chrono::milliseconds(10));
131 std::this_thread::sleep_for(std::chrono::milliseconds(10));
134 std::this_thread::sleep_for(std::chrono::milliseconds(10));
140template<
class RDT,
class LBT>
146 int recording_time_sec = 1;
156 TLOG() <<
"Start recording for " << duration <<
" second(s)" << std::endl;
158 auto start_of_recording = std::chrono::high_resolution_clock::now();
159 auto current_time = start_of_recording;
161 RDT element_to_search;
162 while (std::chrono::duration_cast<std::chrono::seconds>(current_time - start_of_recording).count() < duration) {
169 size_t processed_chunks_in_loop = 0;
172 std::unique_lock<std::mutex> lock(
m_cv_mutex);
185 for (; chunk_iter != end && chunk_iter.good() && processed_chunks_in_loop < 1000;) {
188 chunk_iter->get_payload_size())) {
193 processed_chunks_in_loop++;
195 RDT::expected_tick_difference * (*chunk_iter).get_num_frames();
200 current_time = std::chrono::high_resolution_clock::now();
204 TLOG() <<
"Stop recording" << std::endl;
211template<
class RDT,
class LBT>
215 std::unique_lock<std::mutex> lock(
m_cv_mutex);
224template<
class RDT,
class LBT>
229 auto t_req_begin = std::chrono::high_resolution_clock::now();
231 std::unique_lock<std::mutex> lock(
m_cv_mutex);
242 if ((result.result_code == ResultCode::kNotYet || result.result_code == ResultCode::kPartial) &&
m_request_timeout_ms >0 && is_retry ==
false) {
244 <<
" with timestamp=" << result.data_request.trigger_timestamp;
251 << result.fragment->get_trigger_number() <<
"."
252 << result.fragment->get_sequence_number() <<
", run number "
253 << result.fragment->get_run_number() <<
", and DetectorID "
254 << result.fragment->get_detector_id() <<
", and SourceID "
255 << result.fragment->get_element_id() <<
", and size "
256 << result.fragment->get_size() <<
", and result code "
257 << result.result_code;
267 auto t_req_end = std::chrono::high_resolution_clock::now();
268 auto us_req_took = std::chrono::duration_cast<std::chrono::microseconds>(t_req_end - t_req_begin);
279template<
class RDT,
class LBT>
294 int new_pop_reqs = 0;
295 int new_pop_count = 0;
296 int new_occupancy = 0;
299 info.set_min_request_response_time(
m_response_time_min.exchange(std::numeric_limits<int>::max()));
300 auto now = std::chrono::high_resolution_clock::now();
304 double seconds = std::chrono::duration_cast<std::chrono::microseconds>(
now -
m_t0).count() / 1000000.;
306 <<
" Dropped: " << new_pop_count <<
" Occupancy: " << new_occupancy;
308 if (info.num_requests_handled() > 0) {
310 info.set_avg_request_response_time(info.tot_request_response_time() / info.num_requests_handled());
312 <<
" | Avarage response time: " << info.avg_request_response_time() <<
"[us]"
313 <<
" | Periodic sends: " << info.num_periodic_sent();
322 this->
publish(std::move(info));
328 this->
publish(std::move(rinfo));
332template<
class RDT,
class LBT>
333std::unique_ptr<daqdataformats::Fragment>
338 auto fragment = std::make_unique<daqdataformats::Fragment>(std::vector<std::pair<void*, size_t>>());
339 fragment->set_header_fields(frag_header);
343template<
class RDT,
class LBT>
349 std::this_thread::sleep_for(std::chrono::milliseconds(50));
353template<
class RDT,
class LBT>
363template<
class RDT,
class LBT>
368template<
class RDT,
class LBT>
378 for (
size_t i = 0; i < to_pop; ++i) {
395template<
class RDT,
class LBT>
409 uint64_t newest_ts = last_frame ==
nullptr ? std::numeric_limits<uint64_t>::min()
410 : last_frame->get_timestamp();
413 if((*iter).request.request_information.window_end < newest_ts) {
417 else if (std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::high_resolution_clock::now() - (*iter).start_time).count() >=
m_request_timeout_ms) {
421 (*iter).request.trigger_number,
422 (*iter).request.sequence_number,
423 (*iter).request.run_number,
424 (*iter).request.request_information.window_begin,
425 (*iter).request.request_information.window_end,
426 (*iter).request.data_destination));
437 std::this_thread::sleep_for(std::chrono::milliseconds(1));
441template<
class RDT,
class LBT>
442std::vector<std::pair<void*, size_t>>
450 std::vector<std::pair<void*, size_t>> frag_pieces;
454 uint64_t last_ts = front_element->get_timestamp();
455 uint64_t newest_ts = last_element->get_timestamp();
457 if (start_win_ts > newest_ts) {
459 rres.result_code = ResultCode::kNotYet;
461 else if (end_win_ts < last_ts ) {
462 rres.result_code = ResultCode::kTooOld;
465 RDT request_element =
RDT();
466 auto start_timestamp = start_win_ts - (request_element.get_num_frames() * RDT::expected_tick_difference);
467 if (start_timestamp < last_ts)
468 start_timestamp = last_ts;
469 request_element.set_timestamp(start_timestamp);
474 if (!start_iter.good()) {
476 rres.result_code = ResultCode::kNotFound;
480 << int64_t(start_win_ts) - int64_t(start_iter->get_timestamp()) ;
481 if (end_win_ts >= newest_ts) {
482 rres.result_code = ResultCode::kPartial;
484 else if (start_win_ts < last_ts) {
485 rres.result_code = ResultCode::kPartiallyOld;
488 rres.result_code = ResultCode::kFound;
491 auto elements_handled = 0;
493 RDT* element = &(*start_iter);
495 while (start_iter.good() && element->get_timestamp() < end_win_ts) {
496 if ( element->get_timestamp() + element->get_num_frames() * RDT::expected_tick_difference <= start_win_ts) {
500 else if ( element->get_num_frames()>1 &&
501 ((element->get_timestamp() < start_win_ts &&
502 element->get_timestamp() + element->get_num_frames() * RDT::expected_tick_difference > start_win_ts)
504 element->get_timestamp() + element->get_num_frames() * RDT::expected_tick_difference >
507 for (
auto frame_iter = element->begin(); frame_iter != element->end(); frame_iter++) {
510 frag_pieces.emplace_back(
511 std::make_pair<void*, size_t>(
static_cast<void*
>(&(*frame_iter)), element->get_frame_size()));
518 frag_pieces.emplace_back(
519 std::make_pair<void*, size_t>(
static_cast<void*
>((*start_iter).begin()), element->get_payload_size()));
524 element = &(*start_iter);
532template<
class RDT,
class LBT>
541 std::vector<std::pair<void*, size_t>> frag_pieces;
542 std::ostringstream oss;
550 rres.result_code = ResultCode::kNotFound;
558 uint64_t last_ts = front_element->get_timestamp();
559 uint64_t newest_ts = last_element->get_timestamp();
563 <<
" Oldest stored TS=" << last_ts
564 <<
" Newest stored TS=" << newest_ts
568 <<
" frag_pieces result_code=" << rres.result_code
569 <<
" number of frag_pieces=" << frag_pieces.size();
571 switch (rres.result_code) {
572 case ResultCode::kTooOld:
576 frag_header.status_bits |=
578 frag_header.status_bits |=
581 case ResultCode::kPartiallyOld:
584 frag_header.status_bits |=
586 frag_header.status_bits |=
589 case ResultCode::kFound:
592 case ResultCode::kPartial:
594 frag_header.status_bits |=
598 case ResultCode::kNotYet:
600 frag_header.status_bits |=
607 frag_header.status_bits |=
612 rres.fragment = std::make_unique<daqdataformats::Fragment>(frag_pieces);
615 rres.fragment->set_header_fields(frag_header);
std::atomic< bool > m_run_marker
int m_fragment_send_timeout_ms
std::unique_ptr< FrameErrorRegistry > & m_error_registry
std::atomic< int > m_response_time_max
std::atomic< int > m_handled_requests
void conf(const dunedaq::appmodel::DataHandlerModule *)
virtual void periodic_data_transmission() override
Periodic data transmission - relevant for trigger in particular.
std::atomic< bool > m_recording
std::vector< std::string > m_frag_out_conn_ids
void issue_request(dfmessages::DataRequest datarequest, bool is_retry=false) override
Issue a data request to the request handler.
virtual void generate_opmon_data() override
std::atomic< bool > m_cleanup_requested
std::atomic< int > m_num_requests_uncategorized
RequestResult data_request(dfmessages::DataRequest dr) override
std::atomic< int > m_bytes_written
std::atomic< int > m_num_requests_found
std::atomic< uint64_t > m_next_timestamp_to_record
utilities::ReusableThread m_cleanup_thread
void scrap(const appfwk::DAQModule::CommandData_t &) override
std::string m_output_file
std::condition_variable m_cv
bool m_warn_about_empty_buffer
void check_waiting_requests()
utilities::ReusableThread m_recording_thread
std::atomic< int > m_pops_count
void record(const appfwk::DAQModule::CommandData_t &args) override
std::atomic< int > m_occupancy
void stop(const appfwk::DAQModule::CommandData_t &)
void cleanup_check() override
Check if cleanup is necessary and execute it if necessary.
bool m_recording_configured
typename dunedaq::datahandlinglibs::RequestHandlerConcept< ReadoutType, LatencyBufferType >::RequestResult RequestResult
BufferedFileWriter m_buffered_writer
std::atomic< int > m_num_buffer_cleanups
size_t m_stream_buffer_size
std::unique_ptr< daqdataformats::Fragment > create_empty_fragment(const dfmessages::DataRequest &dr)
std::atomic< uint64_t > m_num_periodic_send_failed
void periodic_data_transmissions()
std::atomic< uint64_t > m_num_periodic_sent
std::atomic< int > m_payloads_written
std::atomic< int > m_num_requests_timed_out
std::atomic< int > m_num_requests_delayed
std::atomic< int > m_response_time_acc
std::vector< std::pair< void *, size_t > > get_fragment_pieces(uint64_t start_win_ts, uint64_t end_win_ts, RequestResult &rres)
void start(const appfwk::DAQModule::CommandData_t &)
daqdataformats::FragmentHeader create_fragment_header(const dfmessages::DataRequest &dr)
size_t m_num_request_handling_threads
std::shared_ptr< LatencyBufferType > & m_latency_buffer
std::mutex m_waiting_requests_lock
uint32_t m_periodic_data_transmission_ms
std::atomic< int > m_response_time_min
std::atomic< int > m_num_requests_bad
std::vector< RequestElement > m_waiting_requests
daqdataformats::SourceID m_sourceid
std::thread m_waiting_queue_thread
std::atomic< uint64_t > m_oldest_timestamp
std::atomic< int > m_requests_running
unsigned m_pop_limit_size
std::chrono::time_point< std::chrono::high_resolution_clock > m_t0
std::atomic< int > m_num_requests_old_window
std::unique_ptr< boost::asio::thread_pool > m_request_handler_thread_pool
utilities::ReusableThread m_periodic_transmission_thread
std::atomic< int > m_pop_reqs
void set_packets_recorded(::uint64_t value)
void set_bytes_recorded(::uint64_t value)
void set_recording_status(Arg_ &&arg, Args_... args)
void publish(google::protobuf::Message &&, CustomOrigin &&co={}, OpMonLevel l=to_level(EntryOpMonLevel::kDefault)) const noexcept
Base class for any user define issue.
#define TLOG_DEBUG(lvl,...)
uint64_t get_frame_iterator_timestamp(T iter)
static std::shared_ptr< iomanager::SenderConcept< Datatype > > get_iom_sender(iomanager::ConnectionId const &id)
void warning(const Issue &issue)
void error(const Issue &issue)
This message represents a request for data sent to a single component of the DAQ.
sequence_number_t sequence_number
Sequence Number of the request.
std::string data_destination
ComponentRequest request_information
trigger_number_t trigger_number
Trigger number the request corresponds to.
timestamp_t trigger_timestamp
Timestamp of trigger.