6template<
class RDT,
class LBT>
11 auto reqh_conf =
conf->get_module_configuration()->get_request_handler();
20 for (
auto output :
conf->get_outputs()) {
21 if (output->get_data_type() ==
"Fragment") {
30 auto dr = reqh_conf->get_data_recorder();
51 std::ostringstream oss;
52 oss <<
"RequestHandler configured. ";
56template<
class RDT,
class LBT>
65template<
class RDT,
class LBT>
84 m_t0 = std::chrono::high_resolution_clock::now();
94 if (sender !=
nullptr) {
95 bool is_ready = sender->is_ready_for_sending(std::chrono::milliseconds(100));
96 TLOG_DEBUG(0) <<
"The Fragment sender for " << frag_out_conn <<
" " << (is_ready ?
"is" :
"is not")
97 <<
" ready, my source_id is [" <<
m_sourceid <<
"]";
112template<
class RDT,
class LBT>
118 std::this_thread::sleep_for(std::chrono::milliseconds(10));
121 std::this_thread::sleep_for(std::chrono::milliseconds(10));
124 std::this_thread::sleep_for(std::chrono::milliseconds(10));
131template<
class RDT,
class LBT>
139template<
class RDT,
class LBT>
143 std::unique_lock<std::mutex> lock(
m_cv_mutex);
152template<
class RDT,
class LBT>
157 auto t_req_begin = std::chrono::high_resolution_clock::now();
159 std::unique_lock<std::mutex> lock(
m_cv_mutex);
170 if ((result.result_code == ResultCode::kNotYet || result.result_code == ResultCode::kPartial) &&
173 <<
" with timestamp=" << result.data_request.trigger_timestamp;
179 << result.fragment->get_trigger_number() <<
"."
180 << result.fragment->get_sequence_number() <<
", run number "
181 << result.fragment->get_run_number() <<
", and DetectorID "
182 << result.fragment->get_detector_id() <<
", and SourceID "
183 << result.fragment->get_element_id() <<
", and size " << result.fragment->get_size()
184 <<
", and result code " << result.result_code;
195 auto t_req_end = std::chrono::high_resolution_clock::now();
196 auto us_req_took = std::chrono::duration_cast<std::chrono::microseconds>(t_req_end - t_req_begin);
207template<
class RDT,
class LBT>
222 int new_pop_reqs = 0;
223 int new_pop_count = 0;
224 int new_occupancy = 0;
227 info.set_min_request_response_time(
m_response_time_min.exchange(std::numeric_limits<int>::max()));
228 auto now = std::chrono::high_resolution_clock::now();
232 double seconds = std::chrono::duration_cast<std::chrono::microseconds>(
now -
m_t0).count() / 1000000.;
234 <<
" Dropped: " << new_pop_count <<
" Occupancy: " << new_occupancy;
236 if (info.num_requests_handled() > 0) {
238 info.set_avg_request_response_time(info.tot_request_response_time() / info.num_requests_handled());
240 <<
" | Avarage response time: " << info.avg_request_response_time() <<
"[us]"
241 <<
" | Periodic sends: " << info.num_periodic_sent();
250 this->
publish(std::move(info));
256 this->
publish(std::move(rinfo));
259template<
class RDT,
class LBT>
260std::unique_ptr<daqdataformats::Fragment>
265 auto fragment = std::make_unique<daqdataformats::Fragment>(std::vector<std::pair<void*, size_t>>());
266 fragment->set_header_fields(frag_header);
270template<
class RDT,
class LBT>
276 std::this_thread::sleep_for(std::chrono::milliseconds(50));
280template<
class RDT,
class LBT>
290template<
class RDT,
class LBT>
296template<
class RDT,
class LBT>
327template<
class RDT,
class LBT>
341 uint64_t newest_ts = last_frame ==
nullptr ? std::numeric_limits<uint64_t>::min()
342 : last_frame->get_timestamp();
345 if ((*iter).request.request_information.window_end <= newest_ts) {
348 }
else if (std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::high_resolution_clock::now() -
354 dunedaq::datahandlinglibs::VerboseRequestTimedOut(
ERS_HERE,
356 (*iter).request.trigger_number,
357 (*iter).request.sequence_number,
358 (*iter).request.run_number,
359 (*iter).request.request_information.window_begin,
360 (*iter).request.request_information.window_end,
361 (*iter).request.data_destination));
371 std::this_thread::sleep_for(std::chrono::milliseconds(1));
375template<
class RDT,
class LBT>
376std::vector<std::pair<void*, size_t>>
382 std::vector<std::pair<void*, size_t>> frag_pieces;
386 uint64_t last_ts = front_element->get_timestamp();
387 uint64_t newest_ts = last_element->get_timestamp();
389 if (start_win_ts > newest_ts || newest_ts < end_win_ts) {
391 rres.result_code = ResultCode::kNotYet;
392 }
else if (end_win_ts <= last_ts) {
393 rres.result_code = ResultCode::kTooOld;
395 RDT request_element =
RDT();
396 auto start_timestamp = start_win_ts;
397 request_element.set_timestamp(start_timestamp);
401 if (!start_iter.good()) {
403 rres.result_code = ResultCode::kNotFound;
406 <<
", --> distance from window: "
407 << int64_t(start_win_ts) - int64_t(start_iter->get_timestamp());
409 rres.result_code = ResultCode::kFound;
411 auto elements_handled = 0;
413 RDT* element = &(*start_iter);
415 while (start_iter.good() && element->get_timestamp() <= end_win_ts) {
417 if (
m_pop_list.count(element->get_timestamp())) {
418 TLOG_DEBUG(50) <<
"skip processing for current element " << element->get_timestamp()
419 <<
", already included in trigger.";
420 }
else if (element->get_timestamp() < start_win_ts) {
421 TLOG_DEBUG(51) <<
"skip processing for current element " << element->get_timestamp()
422 <<
", out of readout window.";
428 frag_pieces.emplace_back(
429 std::make_pair<void*, size_t>(
static_cast<void*
>((*start_iter).begin()), element->get_payload_size()));
436 element = &(*start_iter);
444template<
class RDT,
class LBT>
453 std::vector<std::pair<void*, size_t>> frag_pieces;
454 std::ostringstream oss;
462 rres.result_code = ResultCode::kNotFound;
469 uint64_t last_ts = front_element->get_timestamp();
470 uint64_t newest_ts = last_element->get_timestamp();
478 <<
" number of frag_pieces=" << frag_pieces.size();
480 switch (rres.result_code) {
481 case ResultCode::kTooOld:
486 frag_header.status_bits |=
489 case ResultCode::kPartiallyOld:
493 frag_header.status_bits |=
496 case ResultCode::kFound:
499 case ResultCode::kPartial:
501 frag_header.status_bits |=
505 case ResultCode::kNotYet:
507 frag_header.status_bits |=
518 rres.fragment = std::make_unique<daqdataformats::Fragment>(frag_pieces);
521 rres.fragment->set_header_fields(frag_header);
void set_packets_recorded(::uint64_t value)
void set_bytes_recorded(::uint64_t value)
void set_recording_status(Arg_ &&arg, Args_... args)
void publish(google::protobuf::Message &&, CustomOrigin &&co={}, OpMonLevel l=to_level(EntryOpMonLevel::kDefault)) const noexcept
std::atomic< bool > m_recording
void start(const appfwk::DAQModule::CommandData_t &)
std::unique_ptr< datahandlinglibs::FrameErrorRegistry > & m_error_registry
std::set< uint64_t > m_pop_list
std::string m_output_file
RequestResult data_request(dfmessages::DataRequest dr) override
virtual void generate_opmon_data() override
std::atomic< int > m_response_time_acc
std::shared_ptr< LatencyBufferType > & m_latency_buffer
std::vector< std::string > m_frag_out_conn_ids
std::condition_variable m_cv
void conf(const dunedaq::appmodel::DataHandlerModule *)
bool m_warn_about_empty_buffer
std::unique_ptr< daqdataformats::Fragment > create_empty_fragment(const dfmessages::DataRequest &dr)
bool m_recording_configured
std::atomic< bool > m_run_marker
void issue_request(dfmessages::DataRequest datarequest, bool is_retry=false) override
Issue a data request to the request handler.
std::atomic< int > m_payloads_written
std::atomic< int > m_response_time_max
std::atomic< int > m_requests_running
std::mutex m_pop_list_mutex
daqdataformats::FragmentHeader create_fragment_header(const dfmessages::DataRequest &dr)
void scrap(const appfwk::DAQModule::CommandData_t &) override
std::atomic< int > m_num_requests_bad
void cleanup_check() override
Check if cleanup is necessary and execute it if necessary.
void check_waiting_requests()
std::atomic< int > m_num_requests_timed_out
int m_fragment_send_timeout_ms
utilities::ReusableThread m_periodic_transmission_thread
std::atomic< int > m_num_buffer_cleanups
typename dunedaq::datahandlinglibs::RequestHandlerConcept< ReadoutType, LatencyBufferType >::RequestResult RequestResult
std::atomic< int > m_response_time_min
std::atomic< uint64_t > m_oldest_timestamp
utilities::ReusableThread m_recording_thread
std::atomic< uint64_t > m_num_periodic_sent
std::thread m_waiting_queue_thread
void stop(const appfwk::DAQModule::CommandData_t &)
std::atomic< int > m_num_requests_delayed
std::atomic< bool > m_cleanup_requested
std::atomic< int > m_num_requests_found
std::vector< std::pair< void *, size_t > > get_fragment_pieces(uint64_t start_win_ts, uint64_t end_win_ts, RequestResult &rres)
std::atomic< uint64_t > m_num_periodic_send_failed
std::atomic< int > m_pop_reqs
std::chrono::time_point< std::chrono::high_resolution_clock > m_t0
std::atomic< int > m_num_requests_uncategorized
utilities::ReusableThread m_cleanup_thread
std::atomic< int > m_handled_requests
void record(const appfwk::DAQModule::CommandData_t &args) override
datahandlinglibs::BufferedFileWriter m_buffered_writer
size_t m_num_request_handling_threads
size_t m_stream_buffer_size
std::atomic< int > m_occupancy
std::atomic< int > m_pops_count
uint32_t m_periodic_data_transmission_ms
std::unique_ptr< boost::asio::thread_pool > m_request_handler_thread_pool
std::vector< RequestElement > m_waiting_requests
void periodic_data_transmissions()
virtual void periodic_data_transmission() override
Periodic data transmission - relevant for trigger in particular.
daqdataformats::SourceID m_sourceid
std::atomic< int > m_bytes_written
std::atomic< int > m_num_requests_old_window
std::mutex m_waiting_requests_lock
Base class for any user define issue.
#define TLOG_DEBUG(lvl,...)
static std::shared_ptr< iomanager::SenderConcept< Datatype > > get_iom_sender(iomanager::ConnectionId const &id)
void warning(const Issue &issue)
void error(const Issue &issue)
This message represents a request for data sent to a single component of the DAQ.
sequence_number_t sequence_number
Sequence Number of the request.
std::string data_destination
ComponentRequest request_information
trigger_number_t trigger_number
Trigger number the request corresponds to.
timestamp_t trigger_timestamp
Timestamp of trigger.