DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
dunedaq::datahandlinglibs::DefaultRequestHandlerModel< ReadoutType, LatencyBufferType > Class Template Reference

#include <DefaultRequestHandlerModel.hpp>

Inheritance diagram for dunedaq::datahandlinglibs::DefaultRequestHandlerModel< ReadoutType, LatencyBufferType >:
[legend]
Collaboration diagram for dunedaq::datahandlinglibs::DefaultRequestHandlerModel< ReadoutType, LatencyBufferType >:
[legend]

Classes

struct  RequestElement
 

Public Types

using RDT = ReadoutType
 
using LBT = LatencyBufferType
 
using RequestResult
 
using ResultCode
 
- Public Types inherited from dunedaq::opmonlib::MonitorableObject
using NodePtr = std::weak_ptr<MonitorableObject>
 
using NewNodePtr = std::shared_ptr<MonitorableObject>
 
using ElementId = std::string
 

Public Member Functions

 DefaultRequestHandlerModel (std::shared_ptr< LatencyBufferType > &latency_buffer, std::unique_ptr< FrameErrorRegistry > &error_registry)
 
void conf (const dunedaq::appmodel::DataHandlerModule *)
 
void scrap (const nlohmann::json &) override
 
void start (const nlohmann::json &)
 
void stop (const nlohmann::json &)
 
void record (const nlohmann::json &args) override
 
void cleanup_check () override
 Check if cleanup is necessary and execute it if necessary.
 
virtual void periodic_data_transmission () override
 Periodic data transmission - relevant for trigger in particular.
 
void issue_request (dfmessages::DataRequest datarequest, bool is_retry=false) override
 Issue a data request to the request handler.
 
virtual dunedaq::daqdataformats::timestamp_t get_cutoff_timestamp ()
 
virtual bool supports_cutoff_timestamp ()
 
- Public Member Functions inherited from dunedaq::datahandlinglibs::RequestHandlerConcept< ReadoutType, LatencyBufferType >
 RequestHandlerConcept ()
 
virtual ~RequestHandlerConcept ()
 
 RequestHandlerConcept (const RequestHandlerConcept &)=delete
 RequestHandlerConcept is not copy-constructible.
 
RequestHandlerConceptoperator= (const RequestHandlerConcept &)=delete
 RequestHandlerConcept is not copy-assginable.
 
 RequestHandlerConcept (RequestHandlerConcept &&)=delete
 RequestHandlerConcept is not move-constructible.
 
RequestHandlerConceptoperator= (RequestHandlerConcept &&)=delete
 RequestHandlerConcept is not move-assignable.
 
- Public Member Functions inherited from dunedaq::opmonlib::MonitorableObject
 MonitorableObject (const MonitorableObject &)=delete
 
MonitorableObjectoperator= (const MonitorableObject &)=delete
 
 MonitorableObject (MonitorableObject &&)=delete
 
MonitorableObjectoperator= (MonitorableObject &&)=delete
 
virtual ~MonitorableObject ()=default
 
auto get_opmon_id () const noexcept
 
auto get_opmon_level () const noexcept
 

Protected Member Functions

daqdataformats::FragmentHeader create_fragment_header (const dfmessages::DataRequest &dr)
 
std::unique_ptr< daqdataformats::Fragmentcreate_empty_fragment (const dfmessages::DataRequest &dr)
 
void dump_to_buffer (const void *data, std::size_t size, void *buffer, uint32_t buffer_pos, const std::size_t &buffer_size)
 
void periodic_cleanups ()
 
void periodic_data_transmissions ()
 
void cleanup ()
 
void check_waiting_requests ()
 
std::vector< std::pair< void *, size_t > > get_fragment_pieces (uint64_t start_win_ts, uint64_t end_win_ts, RequestResult &rres)
 
RequestResult data_request (dfmessages::DataRequest dr) override
 
virtual void generate_opmon_data () override
 
- Protected Member Functions inherited from dunedaq::datahandlinglibs::RequestHandlerConcept< ReadoutType, LatencyBufferType >
const std::string & resultCodeAsString (ResultCode rc)
 
- Protected Member Functions inherited from dunedaq::opmonlib::MonitorableObject
 MonitorableObject ()=default
 
void register_node (ElementId name, NewNodePtr)
 
void publish (google::protobuf::Message &&, CustomOrigin &&co={}, OpMonLevel l=to_level(EntryOpMonLevel::kDefault)) const noexcept
 

Protected Attributes

std::shared_ptr< LatencyBufferType > & m_latency_buffer
 
BufferedFileWriter m_buffered_writer
 
utilities::ReusableThread m_recording_thread
 
utilities::ReusableThread m_cleanup_thread
 
utilities::ReusableThread m_periodic_transmission_thread
 
std::map< dfmessages::DataRequest, int > m_request_counter
 
std::size_t m_max_requested_elements
 
std::mutex m_cv_mutex
 
std::condition_variable m_cv
 
std::atomic< bool > m_cleanup_requested = false
 
std::atomic< int > m_requests_running = 0
 
std::vector< RequestElementm_waiting_requests
 
std::mutex m_waiting_requests_lock
 
std::unique_ptr< boost::asio::thread_pool > m_request_handler_thread_pool
 
size_t m_num_request_handling_threads = 0
 
std::unique_ptr< FrameErrorRegistry > & m_error_registry
 
std::chrono::time_point< std::chrono::high_resolution_clock > m_t0
 
std::atomic< bool > m_run_marker = false
 
std::thread m_waiting_queue_thread
 
std::atomic< bool > m_recording = false
 
std::atomic< uint64_t > m_next_timestamp_to_record = std::numeric_limits<uint64_t>::max()
 
bool m_configured
 
float m_pop_limit_pct
 
float m_pop_size_pct
 
unsigned m_pop_limit_size
 
size_t m_buffer_capacity
 
daqdataformats::SourceID m_sourceid
 
uint16_t m_detid
 
std::string m_output_file
 
size_t m_stream_buffer_size = 0
 
bool m_recording_configured = false
 
bool m_warn_on_timeout = true
 
bool m_warn_about_empty_buffer = true
 
uint32_t m_periodic_data_transmission_ms = 0
 
std::vector< std::string > m_frag_out_conn_ids
 
std::atomic< int > m_pop_counter
 
std::atomic< int > m_num_buffer_cleanups { 0 }
 
std::atomic< int > m_pop_reqs
 
std::atomic< int > m_pops_count
 
std::atomic< int > m_occupancy
 
std::atomic< int > m_num_requests_found { 0 }
 
std::atomic< int > m_num_requests_bad { 0 }
 
std::atomic< int > m_num_requests_old_window { 0 }
 
std::atomic< int > m_num_requests_delayed { 0 }
 
std::atomic< int > m_num_requests_uncategorized { 0 }
 
std::atomic< int > m_num_requests_timed_out { 0 }
 
std::atomic< int > m_handled_requests { 0 }
 
std::atomic< int > m_response_time_acc { 0 }
 
std::atomic< int > m_response_time_min { std::numeric_limits<int>::max() }
 
std::atomic< int > m_response_time_max { 0 }
 
std::atomic< int > m_payloads_written { 0 }
 
std::atomic< int > m_bytes_written { 0 }
 
std::atomic< uint64_t > m_num_periodic_sent { 0 }
 
std::atomic< uint64_t > m_num_periodic_send_failed { 0 }
 
int m_fragment_send_timeout_ms
 
- Protected Attributes inherited from dunedaq::datahandlinglibs::RequestHandlerConcept< ReadoutType, LatencyBufferType >
std::map< ResultCode, std::string > ResultCodeStrings
 

Private Attributes

int m_request_timeout_ms
 

Additional Inherited Members

- Static Public Member Functions inherited from dunedaq::opmonlib::MonitorableObject
static bool publishable_metric (OpMonLevel entry, OpMonLevel system) noexcept
 
- Protected Types inherited from dunedaq::datahandlinglibs::RequestHandlerConcept< ReadoutType, LatencyBufferType >
enum  ResultCode {
  kFound = 0 , kNotFound , kTooOld , kNotYet ,
  kPartial , kPartiallyOld , kCleanup , kUnknown
}
 

Detailed Description

template<class ReadoutType, class LatencyBufferType>
class dunedaq::datahandlinglibs::DefaultRequestHandlerModel< ReadoutType, LatencyBufferType >

Definition at line 82 of file DefaultRequestHandlerModel.hpp.

Member Typedef Documentation

◆ LBT

template<class ReadoutType , class LatencyBufferType >
using dunedaq::datahandlinglibs::DefaultRequestHandlerModel< ReadoutType, LatencyBufferType >::LBT = LatencyBufferType

Definition at line 87 of file DefaultRequestHandlerModel.hpp.

◆ RDT

template<class ReadoutType , class LatencyBufferType >
using dunedaq::datahandlinglibs::DefaultRequestHandlerModel< ReadoutType, LatencyBufferType >::RDT = ReadoutType

Definition at line 86 of file DefaultRequestHandlerModel.hpp.

◆ RequestResult

◆ ResultCode

template<class ReadoutType , class LatencyBufferType >
using dunedaq::datahandlinglibs::DefaultRequestHandlerModel< ReadoutType, LatencyBufferType >::ResultCode

Constructor & Destructor Documentation

◆ DefaultRequestHandlerModel()

template<class ReadoutType , class LatencyBufferType >
dunedaq::datahandlinglibs::DefaultRequestHandlerModel< ReadoutType, LatencyBufferType >::DefaultRequestHandlerModel ( std::shared_ptr< LatencyBufferType > & latency_buffer,
std::unique_ptr< FrameErrorRegistry > & error_registry )
inlineexplicit

Definition at line 95 of file DefaultRequestHandlerModel.hpp.

97 : m_latency_buffer(latency_buffer)
102 , m_error_registry(error_registry)
103 , m_pop_limit_pct(0.0f)
104 , m_pop_size_pct(0.0f)
107 , m_pop_counter{ 0 }
108 , m_pop_reqs(0)
109 , m_pops_count(0)
110 , m_occupancy(0)
111 //, m_response_time_log()
112 //, m_response_time_log_lock()
113 {
114 TLOG_DEBUG(TLVL_WORK_STEPS) << "DefaultRequestHandlerModel created...";
115 }
#define TLOG_DEBUG(lvl,...)
Definition Logging.hpp:112

Member Function Documentation

◆ check_waiting_requests()

template<class RDT , class LBT >
void dunedaq::datahandlinglibs::DefaultRequestHandlerModel< RDT, LBT >::check_waiting_requests ( )
protected

Definition at line 397 of file DefaultRequestHandlerModel.hxx.

398{
399 // At run stop, we wait until all waiting requests have either:
400 //
401 // 1. been serviced because an item past the end of the window arrived in the buffer
402 // 2. timed out by going past m_request_timeout_ms, and returned a partial fragment
403 while (m_run_marker.load()) {
404 if (m_waiting_requests.size() > 0) {
405
406 std::lock_guard<std::mutex> lock_guard(m_waiting_requests_lock);
407
408 auto last_frame = m_latency_buffer->back(); // NOLINT
409 uint64_t newest_ts = last_frame == nullptr ? std::numeric_limits<uint64_t>::min() // NOLINT(build/unsigned)
410 : last_frame->get_timestamp();
411
412 for (auto iter = m_waiting_requests.begin(); iter!= m_waiting_requests.end();) {
413 if((*iter).request.request_information.window_end < newest_ts) {
414 issue_request((*iter).request, true);
415 iter = m_waiting_requests.erase(iter);
416 }
417 else if (std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::high_resolution_clock::now() - (*iter).start_time).count() >= m_request_timeout_ms) {
418 issue_request((*iter).request, true);
419 if (m_warn_on_timeout) {
420 ers::warning(dunedaq::datahandlinglibs::VerboseRequestTimedOut(ERS_HERE, m_sourceid,
421 (*iter).request.trigger_number,
422 (*iter).request.sequence_number,
423 (*iter).request.run_number,
424 (*iter).request.request_information.window_begin,
425 (*iter).request.request_information.window_end,
426 (*iter).request.data_destination));
427 }
430 iter = m_waiting_requests.erase(iter);
431 }
432 else {
433 ++iter;
434 }
435 }
436 }
437 std::this_thread::sleep_for(std::chrono::milliseconds(1));
438 }
439}

◆ cleanup()

template<class RDT , class LBT >
void dunedaq::datahandlinglibs::DefaultRequestHandlerModel< RDT, LBT >::cleanup ( )
protectedvirtual

Implements dunedaq::datahandlinglibs::RequestHandlerConcept< ReadoutType, LatencyBufferType >.

Reimplemented in dunedaq::datahandlinglibs::DefaultSkipListRequestHandler< T >, and dunedaq::datahandlinglibs::DefaultSkipListRequestHandler< TriggerPrimitiveTypeAdapter >.

Definition at line 372 of file DefaultRequestHandlerModel.hxx.

373{
374 auto size_guess = m_latency_buffer->occupancy();
375 if (size_guess > m_pop_limit_size) {
376 ++m_pop_reqs;
377 unsigned to_pop = m_pop_size_pct * m_latency_buffer->occupancy();
378
379 unsigned popped = 0;
380 for (size_t i = 0; i < to_pop; ++i) {
381 if (m_latency_buffer->front()->get_timestamp() < m_next_timestamp_to_record) {
382 m_latency_buffer->pop(1);
383 popped++;
384 } else {
385 break;
386 }
387 }
388 m_occupancy = m_latency_buffer->occupancy();
389 m_pops_count += popped;
390 m_error_registry->remove_errors_until(m_latency_buffer->front()->get_timestamp());
391 }
393}

◆ cleanup_check()

template<class RDT , class LBT >
void dunedaq::datahandlinglibs::DefaultRequestHandlerModel< RDT, LBT >::cleanup_check ( )
overridevirtual

Check if cleanup is necessary and execute it if necessary.

Implements dunedaq::datahandlinglibs::RequestHandlerConcept< ReadoutType, LatencyBufferType >.

Definition at line 215 of file DefaultRequestHandlerModel.hxx.

◆ conf()

template<class RDT , class LBT >
void dunedaq::datahandlinglibs::DefaultRequestHandlerModel< RDT, LBT >::conf ( const dunedaq::appmodel::DataHandlerModule * conf)
virtual

Implements dunedaq::datahandlinglibs::RequestHandlerConcept< ReadoutType, LatencyBufferType >.

Reimplemented in dunedaq::datahandlinglibs::ZeroCopyRecordingRequestHandlerModel< ReadoutType, LatencyBufferType >, and dunedaq::trigger::TPRequestHandler.

Definition at line 8 of file DefaultRequestHandlerModel.hxx.

9{
10
11 auto reqh_conf = conf->get_module_configuration()->get_request_handler();
12 m_sourceid.id = conf->get_source_id();
13 m_sourceid.subsystem = RDT::subsystem;
14 m_detid = conf->get_detector_id();
15 m_pop_limit_pct = reqh_conf->get_pop_limit_pct();
16 m_pop_size_pct = reqh_conf->get_pop_size_pct();
17
18 m_buffer_capacity = conf->get_module_configuration()->get_latency_buffer()->get_size();
19 m_num_request_handling_threads = reqh_conf->get_handler_threads();
20 m_request_timeout_ms = reqh_conf->get_request_timeout();
21
22 for (auto output : conf->get_outputs()) {
23 if (output->get_data_type() == "Fragment") {
24 m_fragment_send_timeout_ms = output->get_send_timeout_ms();
25 // 19-Dec-2024, KAB: store the names/IDs of the Fragment output connections so that
26 // we can confirm that they are ready for sending at 'start' time.
27 m_frag_out_conn_ids.push_back(output->UID());
28 }
29 }
30
31 if (m_recording_configured == false) {
32 auto dr = reqh_conf->get_data_recorder();
33 if(dr != nullptr) {
34 m_output_file = dr->get_output_file();
35 if (remove(m_output_file.c_str()) == 0) {
36 TLOG_DEBUG(TLVL_WORK_STEPS) << "Removed existing output file from previous run: " << m_output_file << std::endl;
37 }
38 m_stream_buffer_size = dr->get_streaming_buffer_size();
39 m_buffered_writer.open(m_output_file, m_stream_buffer_size, dr->get_compression_algorithm(), dr->get_use_o_direct());
41 }
42 }
43
44 m_warn_on_timeout = reqh_conf->get_warn_on_timeout();
45 m_warn_about_empty_buffer = reqh_conf->get_warn_on_empty_buffer();
46 m_periodic_data_transmission_ms = reqh_conf->get_periodic_data_transmission_ms();
47
49 ers::error(ConfigurationError(ERS_HERE, m_sourceid, "Auto-pop percentage out of range."));
50 } else {
53 }
54
58
59 std::ostringstream oss;
60 oss << "RequestHandler configured. " << std::fixed << std::setprecision(2)
61 << "auto-pop limit: " << m_pop_limit_pct * 100.0f << "% "
62 << "auto-pop size: " << m_pop_size_pct * 100.0f << "% "
63 << "max requested elements: " << m_max_requested_elements;
64 TLOG_DEBUG(TLVL_WORK_STEPS) << oss.str();
65}
#define ERS_HERE
void open(std::string filename, size_t buffer_size, std::string compression_algorithm="None", bool use_o_direct=true)
void conf(const dunedaq::appmodel::DataHandlerModule *)
void set_name(const std::string &name, int tid)
void remove(QString const &db, QString const &fn)
ConfigurationError
Definition util.hpp:27
void error(const Issue &issue)
Definition ers.hpp:81
Subsystem subsystem
The general subsystem of the source of the data.
Definition SourceID.hpp:69
ID_t id
Unique identifier of the source of the data.
Definition SourceID.hpp:74

◆ create_empty_fragment()

template<class RDT , class LBT >
std::unique_ptr< daqdataformats::Fragment > dunedaq::datahandlinglibs::DefaultRequestHandlerModel< RDT, LBT >::create_empty_fragment ( const dfmessages::DataRequest & dr)
protected

Definition at line 336 of file DefaultRequestHandlerModel.hxx.

337{
338 auto frag_header = create_fragment_header(dr);
339 frag_header.error_bits |= (0x1 << static_cast<size_t>(daqdataformats::FragmentErrorBits::kDataNotFound));
340 auto fragment = std::make_unique<daqdataformats::Fragment>(std::vector<std::pair<void*, size_t>>());
341 fragment->set_header_fields(frag_header);
342 return fragment;
343}
daqdataformats::FragmentHeader create_fragment_header(const dfmessages::DataRequest &dr)
@ kDataNotFound
The requested data was not found at all, so the fragment is empty.

◆ create_fragment_header()

template<class ReadoutType , class LatencyBufferType >
daqdataformats::FragmentHeader dunedaq::datahandlinglibs::DefaultRequestHandlerModel< ReadoutType, LatencyBufferType >::create_fragment_header ( const dfmessages::DataRequest & dr)
inlineprotected

Definition at line 163 of file DefaultRequestHandlerModel.hpp.

164 {
165 daqdataformats::FragmentHeader fh;
166 fh.size = sizeof(fh);
167 fh.trigger_number = dr.trigger_number;
168 fh.trigger_timestamp = dr.trigger_timestamp;
169 fh.window_begin = dr.request_information.window_begin;
170 fh.window_end = dr.request_information.window_end;
171 fh.run_number = dr.run_number;
172 fh.fragment_type = static_cast<daqdataformats::fragment_type_t>(ReadoutType::fragment_type);
173 fh.sequence_number = dr.sequence_number;
174 fh.detector_id = m_detid;
175 fh.element_id = { m_sourceid.subsystem, m_sourceid.id };
176 return fh;
177 }
uint32_t fragment_type_t
Type used to represent Fragment type ID.
Definition Types.hpp:28

◆ data_request()

Implements dunedaq::datahandlinglibs::RequestHandlerConcept< ReadoutType, LatencyBufferType >.

Definition at line 532 of file DefaultRequestHandlerModel.hxx.

533{
534 // Prepare response
535 RequestResult rres(ResultCode::kUnknown, dr);
536
537 // Prepare FragmentHeader and empty Fragment pieces list
538 auto frag_header = create_fragment_header(dr);
539 std::vector<std::pair<void*, size_t>> frag_pieces;
540 std::ostringstream oss;
541
542 //bool local_data_not_found_flag = false;
543 if (m_latency_buffer->occupancy() == 0) {
545 ers::warning(RequestOnEmptyBuffer(ERS_HERE, m_sourceid, "Data not found"));
546 }
547 frag_header.error_bits |= (0x1 << static_cast<size_t>(daqdataformats::FragmentErrorBits::kDataNotFound));
548 rres.result_code = ResultCode::kNotFound;
550 }
551 else {
552 frag_pieces = get_fragment_pieces(dr.request_information.window_begin, dr.request_information.window_end, rres);
553
554 auto front_element = m_latency_buffer->front(); // NOLINT
555 auto last_element = m_latency_buffer->back(); // NOLINT
556 uint64_t last_ts = front_element->get_timestamp(); // NOLINT(build/unsigned)
557 uint64_t newest_ts = last_element->get_timestamp(); // NOLINT(build/unsigned)
558 TLOG_DEBUG(TLVL_WORK_STEPS) << "Data request for trig/seq_num=" << dr.trigger_number
559 << "." << dr.sequence_number << " and SourceID[" << m_sourceid << "] with"
560 << " Trigger TS=" << dr.trigger_timestamp
561 << " Oldest stored TS=" << last_ts
562 << " Newest stored TS=" << newest_ts
563 << " Start of window TS=" << dr.request_information.window_begin
564 << " End of window TS=" << dr.request_information.window_end
565 << " Latency buffer occupancy=" << m_latency_buffer->occupancy()
566 << " frag_pieces result_code=" << rres.result_code
567 << " number of frag_pieces=" << frag_pieces.size();
568
569 switch (rres.result_code) {
570 case ResultCode::kTooOld:
571 // return empty frag
574 frag_header.error_bits |= (0x1 << static_cast<size_t>(daqdataformats::FragmentErrorBits::kDataNotFound));
575 break;
576 case ResultCode::kPartiallyOld:
579 frag_header.error_bits |= (0x1 << static_cast<size_t>(daqdataformats::FragmentErrorBits::kIncomplete));
580 frag_header.error_bits |= (0x1 << static_cast<size_t>(daqdataformats::FragmentErrorBits::kDataNotFound));
581 break;
582 case ResultCode::kFound:
584 break;
585 case ResultCode::kPartial:
586 frag_header.error_bits |= (0x1 << static_cast<size_t>(daqdataformats::FragmentErrorBits::kIncomplete));
588 break;
589 case ResultCode::kNotYet:
590 frag_header.error_bits |= (0x1 << static_cast<size_t>(daqdataformats::FragmentErrorBits::kDataNotFound));
592 break;
593 default:
594 // Unknown result of data search
596 frag_header.error_bits |= (0x1 << static_cast<size_t>(daqdataformats::FragmentErrorBits::kDataNotFound));
597 }
598 }
599 // Create fragment from pieces
600 rres.fragment = std::make_unique<daqdataformats::Fragment>(frag_pieces);
601
602 // Set header
603 rres.fragment->set_header_fields(frag_header);
604
605 return rres;
606}
typename dunedaq::datahandlinglibs::RequestHandlerConcept< ReadoutType, LatencyBufferType >::RequestResult RequestResult
std::vector< std::pair< void *, size_t > > get_fragment_pieces(uint64_t start_win_ts, uint64_t end_win_ts, RequestResult &rres)
@ kIncomplete
Only part of the requested data is present in the fragment.
void warning(const Issue &issue)
Definition ers.hpp:115

◆ dump_to_buffer()

template<class ReadoutType , class LatencyBufferType >
void dunedaq::datahandlinglibs::DefaultRequestHandlerModel< ReadoutType, LatencyBufferType >::dump_to_buffer ( const void * data,
std::size_t size,
void * buffer,
uint32_t buffer_pos,
const std::size_t & buffer_size )
inlineprotected

Definition at line 184 of file DefaultRequestHandlerModel.hpp.

187 {
188 auto bytes_to_copy = size;
189 while (bytes_to_copy > 0) {
190 auto n = std::min(bytes_to_copy, buffer_size - buffer_pos);
191 std::memcpy(static_cast<char*>(buffer) + buffer_pos, static_cast<const char*>(data), n);
192 buffer_pos += n;
193 bytes_to_copy -= n;
194 if (buffer_pos == buffer_size) {
195 buffer_pos = 0;
196 }
197 }
198 }
FELIX Initialization std::string initerror FELIX queue timed std::string queuename Unexpected chunk size

◆ generate_opmon_data()

template<class RDT , class LBT >
void dunedaq::datahandlinglibs::DefaultRequestHandlerModel< RDT, LBT >::generate_opmon_data ( )
overrideprotectedvirtual

Hook for customisable pubblication. The function can throw, exception will be caught by the monitoring thread

Reimplemented from dunedaq::opmonlib::MonitorableObject.

Definition at line 283 of file DefaultRequestHandlerModel.hxx.

284 {
285 opmon::RequestHandlerInfo info;
286
287 info.set_num_requests_handled(m_handled_requests.exchange(0));
288 info.set_num_requests_found(m_num_requests_found.exchange(0));
289 info.set_num_requests_bad(m_num_requests_bad.exchange(0));
290 info.set_num_requests_old_window(m_num_requests_old_window.exchange(0));
291 info.set_num_requests_delayed(m_num_requests_delayed.exchange(0));
292 info.set_num_requests_uncategorized(m_num_requests_uncategorized.exchange(0));
293 info.set_num_requests_timed_out(m_num_requests_timed_out.exchange(0));
294 info.set_num_requests_waiting(m_waiting_requests.size());
295
296 int new_pop_reqs = 0;
297 int new_pop_count = 0;
298 int new_occupancy = 0;
299 info.set_tot_request_response_time(m_response_time_acc.exchange(0));
300 info.set_max_request_response_time(m_response_time_max.exchange(0));
301 info.set_min_request_response_time(m_response_time_min.exchange(std::numeric_limits<int>::max()));
302 auto now = std::chrono::high_resolution_clock::now();
303 new_pop_reqs = m_pop_reqs.exchange(0);
304 new_pop_count = m_pops_count.exchange(0);
305 new_occupancy = m_occupancy;
306 double seconds = std::chrono::duration_cast<std::chrono::microseconds>(now - m_t0).count() / 1000000.;
307 TLOG_DEBUG(TLVL_HOUSEKEEPING) << "Cleanup request rate: " << new_pop_reqs / seconds / 1. << " [Hz]"
308 << " Dropped: " << new_pop_count << " Occupancy: " << new_occupancy;
309
310 if (info.num_requests_handled() > 0) {
311
312 info.set_avg_request_response_time(info.tot_request_response_time() / info.num_requests_handled());
313 TLOG_DEBUG(TLVL_HOUSEKEEPING) << "Completed requests: " << info.num_requests_handled()
314 << " | Avarage response time: " << info.avg_request_response_time() << "[us]"
315 << " | Periodic sends: " << info.num_periodic_sent();
316 }
317
318 m_t0 = now;
319
320 info.set_num_buffer_cleanups(m_num_buffer_cleanups.exchange(0));
321 info.set_num_periodic_sent(m_num_periodic_sent.exchange(0));
322 info.set_num_periodic_send_failed(m_num_periodic_send_failed.exchange(0));
323
324 this->publish(std::move(info));
325
326 opmon::RecordingInfo rinfo;
327 rinfo.set_recording_status(m_recording? "Y" : "N");
328 rinfo.set_packets_recorded(m_payloads_written.exchange(0));
329 rinfo.set_bytes_recorded(m_bytes_written.exchange(0));
330 this->publish(std::move(rinfo));
331 }
std::chrono::time_point< std::chrono::high_resolution_clock > m_t0
void publish(google::protobuf::Message &&, CustomOrigin &&co={}, OpMonLevel l=to_level(EntryOpMonLevel::kDefault)) const noexcept
static int64_t now()

◆ get_cutoff_timestamp()

template<class ReadoutType , class LatencyBufferType >
virtual dunedaq::daqdataformats::timestamp_t dunedaq::datahandlinglibs::DefaultRequestHandlerModel< ReadoutType, LatencyBufferType >::get_cutoff_timestamp ( )
inlinevirtual

Definition at line 157 of file DefaultRequestHandlerModel.hpp.

157{return 0;}

◆ get_fragment_pieces()

template<class RDT , class LBT >
std::vector< std::pair< void *, size_t > > dunedaq::datahandlinglibs::DefaultRequestHandlerModel< RDT, LBT >::get_fragment_pieces ( uint64_t start_win_ts,
uint64_t end_win_ts,
RequestResult & rres )
protected

Definition at line 443 of file DefaultRequestHandlerModel.hxx.

446{
447
448 TLOG_DEBUG(TLVL_WORK_STEPS) << "Looking for frags between " << start_win_ts << " and " << end_win_ts;
449
450 std::vector<std::pair<void*, size_t>> frag_pieces;
451 // Data availability is calculated here
452 auto front_element = m_latency_buffer->front(); // NOLINT
453 auto last_element = m_latency_buffer->back(); // NOLINT
454 uint64_t last_ts = front_element->get_timestamp(); // NOLINT(build/unsigned)
455 uint64_t newest_ts = last_element->get_timestamp(); // NOLINT(build/unsigned)
456
457 if (start_win_ts > newest_ts) {
458 // No element is as small as the start window-> request is far in the future
459 rres.result_code = ResultCode::kNotYet; // give it another chance
460 }
461 else if (end_win_ts < last_ts ) {
462 rres.result_code = ResultCode::kTooOld;
463 }
464 else {
465 RDT request_element = RDT();
466 request_element.set_timestamp(start_win_ts-(request_element.get_num_frames() * RDT::expected_tick_difference));
467 //request_element.set_timestamp(start_win_ts);
468
469 auto start_iter = m_error_registry->has_error("MISSING_FRAMES")
470 ? m_latency_buffer->lower_bound(request_element, true)
471 : m_latency_buffer->lower_bound(request_element, false);
472 if (!start_iter.good()) {
473 // Accessor problem
474 rres.result_code = ResultCode::kNotFound;
475 }
476 else {
477 TLOG_DEBUG(TLVL_WORK_STEPS) << "Lower bound found " << start_iter->get_timestamp() << ", --> distance from window: "
478 << int64_t(start_win_ts) - int64_t(start_iter->get_timestamp()) ;
479 if (end_win_ts > newest_ts) {
480 rres.result_code = ResultCode::kPartial;
481 }
482 else if (start_win_ts < last_ts) {
483 rres.result_code = ResultCode::kPartiallyOld;
484 }
485 else {
486 rres.result_code = ResultCode::kFound;
487 }
488
489 auto elements_handled = 0;
490
491 RDT* element = &(*start_iter);
492
493 while (start_iter.good() && element->get_timestamp() < end_win_ts) {
494 if ( element->get_timestamp() + element->get_num_frames() * RDT::expected_tick_difference <= start_win_ts) {
495 //TLOG() << "skip processing for current element " << element->get_timestamp() << ", out of readout window.";
496 }
497
498 else if ( element->get_num_frames()>1 &&
499 ((element->get_timestamp() < start_win_ts &&
500 element->get_timestamp() + element->get_num_frames() * RDT::expected_tick_difference > start_win_ts)
501 ||
502 element->get_timestamp() + element->get_num_frames() * RDT::expected_tick_difference >
503 end_win_ts)) {
504 //TLOG() << "We don't need the whole aggregated object (e.g.: superchunk)" ;
505 for (auto frame_iter = element->begin(); frame_iter != element->end(); frame_iter++) {
506 if (get_frame_iterator_timestamp(frame_iter) > (start_win_ts - RDT::expected_tick_difference)&&
507 get_frame_iterator_timestamp(frame_iter) < end_win_ts ) {
508 frag_pieces.emplace_back(
509 std::make_pair<void*, size_t>(static_cast<void*>(&(*frame_iter)), element->get_frame_size()));
510 }
511 }
512 }
513 else {
514 //TLOG() << "Add element " << element->get_timestamp();
515 // We are somewhere in the middle -> the whole aggregated object (e.g.: superchunk) can be copied
516 frag_pieces.emplace_back(
517 std::make_pair<void*, size_t>(static_cast<void*>((*start_iter).begin()), element->get_payload_size()));
518 }
519
520 elements_handled++;
521 ++start_iter;
522 element = &(*start_iter);
523 }
524 }
525 }
526 TLOG_DEBUG(TLVL_WORK_STEPS) << "*** Number of frames retrieved: " << frag_pieces.size();
527 return frag_pieces;
528}

◆ issue_request()

template<class RDT , class LBT >
void dunedaq::datahandlinglibs::DefaultRequestHandlerModel< RDT, LBT >::issue_request ( dfmessages::DataRequest ,
bool = false )
overridevirtual

Issue a data request to the request handler.

Implements dunedaq::datahandlinglibs::RequestHandlerConcept< ReadoutType, LatencyBufferType >.

Reimplemented in dunedaq::datahandlinglibs::EmptyFragmentRequestHandlerModel< ReadoutType, LatencyBufferType >.

Definition at line 228 of file DefaultRequestHandlerModel.hxx.

229{
230 boost::asio::post(*m_request_handler_thread_pool, [&, datarequest, is_retry]() { // start a thread from pool
231 auto t_req_begin = std::chrono::high_resolution_clock::now();
232 {
233 std::unique_lock<std::mutex> lock(m_cv_mutex);
234 m_cv.wait(lock, [&] { return !m_cleanup_requested; });
236 }
237 m_cv.notify_all();
238 auto result = data_request(datarequest);
239 {
240 std::lock_guard<std::mutex> lock(m_cv_mutex);
242 }
243 m_cv.notify_all();
244 if ((result.result_code == ResultCode::kNotYet || result.result_code == ResultCode::kPartial) && m_request_timeout_ms >0 && is_retry == false) {
245 TLOG_DEBUG(TLVL_WORK_STEPS) << "Re-queue request. "
246 << " with timestamp=" << result.data_request.trigger_timestamp;
247 std::lock_guard<std::mutex> wait_lock_guard(m_waiting_requests_lock);
248 m_waiting_requests.push_back(RequestElement(datarequest, std::chrono::high_resolution_clock::now()));
249 }
250 else {
251 try { // Send to fragment connection
252 TLOG_DEBUG(TLVL_WORK_STEPS) << "Sending fragment with trigger/sequence_number "
253 << result.fragment->get_trigger_number() << "."
254 << result.fragment->get_sequence_number() << ", run number "
255 << result.fragment->get_run_number() << ", and DetectorID "
256 << result.fragment->get_detector_id() << ", and SourceID "
257 << result.fragment->get_element_id() << ", and size "
258 << result.fragment->get_size() << ", and result code "
259 << result.result_code;
260 // Send fragment
262 ->send(std::move(result.fragment), std::chrono::milliseconds(m_fragment_send_timeout_ms));
263
264 } catch (const ers::Issue& excpt) {
265 ers::warning(CannotWriteToQueue(ERS_HERE, m_sourceid, datarequest.data_destination, excpt));
266 }
267 }
268
269 auto t_req_end = std::chrono::high_resolution_clock::now();
270 auto us_req_took = std::chrono::duration_cast<std::chrono::microseconds>(t_req_end - t_req_begin);
271 TLOG_DEBUG(TLVL_WORK_STEPS) << "Responding to data request took: " << us_req_took.count() << "[us]";
272 m_response_time_acc.fetch_add(us_req_took.count());
273 if ( us_req_took.count() > m_response_time_max.load() )
274 m_response_time_max.store(us_req_took.count());
275 if ( us_req_took.count() < m_response_time_min.load() )
276 m_response_time_min.store(us_req_took.count());
278 });
279}
RequestResult data_request(dfmessages::DataRequest dr) override
std::unique_ptr< boost::asio::thread_pool > m_request_handler_thread_pool
Base class for any user define issue.
Definition Issue.hpp:69
static std::shared_ptr< iomanager::SenderConcept< Datatype > > get_iom_sender(iomanager::ConnectionId const &id)

◆ periodic_cleanups()

template<class RDT , class LBT >
void dunedaq::datahandlinglibs::DefaultRequestHandlerModel< RDT, LBT >::periodic_cleanups ( )
protected

Definition at line 347 of file DefaultRequestHandlerModel.hxx.

348{
349 while (m_run_marker.load()) {
351 std::this_thread::sleep_for(std::chrono::milliseconds(50));
352 }
353}
void cleanup_check() override
Check if cleanup is necessary and execute it if necessary.

◆ periodic_data_transmission()

template<class RDT , class LBT >
void dunedaq::datahandlinglibs::DefaultRequestHandlerModel< RDT, LBT >::periodic_data_transmission ( )
overridevirtual

Periodic data transmission - relevant for trigger in particular.

Implements dunedaq::datahandlinglibs::RequestHandlerConcept< ReadoutType, LatencyBufferType >.

Reimplemented in dunedaq::trigger::TPRequestHandler.

Definition at line 367 of file DefaultRequestHandlerModel.hxx.

368{}

◆ periodic_data_transmissions()

template<class RDT , class LBT >
void dunedaq::datahandlinglibs::DefaultRequestHandlerModel< RDT, LBT >::periodic_data_transmissions ( )
protected

Definition at line 357 of file DefaultRequestHandlerModel.hxx.

358{
359 while (m_run_marker.load()) {
361 std::this_thread::sleep_for(std::chrono::milliseconds(m_periodic_data_transmission_ms));
362 }
363}
virtual void periodic_data_transmission() override
Periodic data transmission - relevant for trigger in particular.

◆ record()

template<class RDT , class LBT >
void dunedaq::datahandlinglibs::DefaultRequestHandlerModel< RDT, LBT >::record ( const nlohmann::json & args)
overridevirtual

Implements dunedaq::datahandlinglibs::RequestHandlerConcept< ReadoutType, LatencyBufferType >.

Reimplemented in dunedaq::datahandlinglibs::ZeroCopyRecordingRequestHandlerModel< ReadoutType, LatencyBufferType >.

Definition at line 144 of file DefaultRequestHandlerModel.hxx.

145{
146 //auto conf = args.get<readoutconfig::RecordingParams>();
147 //FIXME: how do we pass the duration or recording?
148 int recording_time_sec = 1;
149 if (m_recording.load()) {
150 ers::error(CommandError(ERS_HERE, m_sourceid, "A recording is still running, no new recording was started!"));
151 return;
152 } else if (!m_buffered_writer.is_open()) {
153 ers::error(CommandError(ERS_HERE, m_sourceid, "DLH is not configured for recording"));
154 return;
155 }
157 [&](int duration) {
158 TLOG() << "Start recording for " << duration << " second(s)" << std::endl;
159 m_recording.exchange(true);
160 auto start_of_recording = std::chrono::high_resolution_clock::now();
161 auto current_time = start_of_recording;
163 RDT element_to_search;
164 while (std::chrono::duration_cast<std::chrono::seconds>(current_time - start_of_recording).count() < duration) {
167 auto front = m_latency_buffer->front();
168 m_next_timestamp_to_record = front == nullptr ? 0 : front->get_timestamp();
169 }
170 element_to_search.set_timestamp(m_next_timestamp_to_record);
171 size_t processed_chunks_in_loop = 0;
172
173 {
174 std::unique_lock<std::mutex> lock(m_cv_mutex);
175 m_cv.wait(lock, [&] { return !m_cleanup_requested; });
177 }
178 m_cv.notify_all();
179 auto chunk_iter = m_latency_buffer->lower_bound(element_to_search, true);
180 auto end = m_latency_buffer->end();
181 {
182 std::lock_guard<std::mutex> lock(m_cv_mutex);
184 }
185 m_cv.notify_all();
186
187 for (; chunk_iter != end && chunk_iter.good() && processed_chunks_in_loop < 1000;) {
188 if ((*chunk_iter).get_timestamp() >= m_next_timestamp_to_record) {
189 if (!m_buffered_writer.write(reinterpret_cast<char*>(chunk_iter->begin()), // NOLINT
190 chunk_iter->get_payload_size())) {
191 ers::warning(CannotWriteToFile(ERS_HERE, m_output_file));
192 }
194 m_bytes_written += chunk_iter->get_payload_size();
195 processed_chunks_in_loop++;
196 m_next_timestamp_to_record = (*chunk_iter).get_timestamp() +
197 RDT::expected_tick_difference * (*chunk_iter).get_num_frames();
198 }
199 ++chunk_iter;
200 }
201 }
202 current_time = std::chrono::high_resolution_clock::now();
203 }
204 m_next_timestamp_to_record = std::numeric_limits<uint64_t>::max(); // NOLINT (build/unsigned)
205
206 TLOG() << "Stop recording" << std::endl;
207 m_recording.exchange(false);
209 },
210 recording_time_sec);
211}
bool write(const char *memory, const size_t size)
bool set_work(Function &&f, Args &&... args)
#define TLOG(...)
Definition macro.hpp:22

◆ scrap()

◆ start()

template<class RDT , class LBT >
void dunedaq::datahandlinglibs::DefaultRequestHandlerModel< RDT, LBT >::start ( const nlohmann::json & )
virtual

Implements dunedaq::datahandlinglibs::RequestHandlerConcept< ReadoutType, LatencyBufferType >.

Reimplemented in dunedaq::trigger::TPRequestHandler.

Definition at line 78 of file DefaultRequestHandlerModel.hxx.

79{
80 // Reset opmon variables
90 m_pop_reqs = 0;
91 m_pops_count = 0;
94
95 m_t0 = std::chrono::high_resolution_clock::now();
96
97 // 19-Dec-2024, KAB: check that Fragment senders are ready to send. This is done so
98 // that the IOManager infrastructure fetches the necessary connection details from
99 // the ConnectivityService at 'start' time, instead of the first time that the sender
100 // is used to send data. This avoids delays in the sending of the first fragment in
101 // the first data-taking run in a DAQ session. Such delays can lead to undesirable
102 // system behavior like trigger inhibits.
103 for (auto frag_out_conn : m_frag_out_conn_ids) {
105 if (sender != nullptr) {
106 bool is_ready = sender->is_ready_for_sending(std::chrono::milliseconds(100));
107 TLOG_DEBUG(0) << "The Fragment sender for " << frag_out_conn << " " << (is_ready ? "is" : "is not")
108 << " ready, my source_id is [" << m_sourceid << "]";
109 }
110 }
111
112 m_request_handler_thread_pool = std::make_unique<boost::asio::thread_pool>(m_num_request_handling_threads);
113
114 m_run_marker.store(true);
118 }
119
122}

◆ stop()

template<class RDT , class LBT >
void dunedaq::datahandlinglibs::DefaultRequestHandlerModel< RDT, LBT >::stop ( const nlohmann::json & )
virtual

Implements dunedaq::datahandlinglibs::RequestHandlerConcept< ReadoutType, LatencyBufferType >.

Definition at line 126 of file DefaultRequestHandlerModel.hxx.

127{
128 m_run_marker.store(false);
130 std::this_thread::sleep_for(std::chrono::milliseconds(10));
131 }
133 std::this_thread::sleep_for(std::chrono::milliseconds(10));
134 }
136 std::this_thread::sleep_for(std::chrono::milliseconds(10));
137 }
140}

◆ supports_cutoff_timestamp()

template<class ReadoutType , class LatencyBufferType >
virtual bool dunedaq::datahandlinglibs::DefaultRequestHandlerModel< ReadoutType, LatencyBufferType >::supports_cutoff_timestamp ( )
inlinevirtual

Definition at line 158 of file DefaultRequestHandlerModel.hpp.

158{return false;}

Member Data Documentation

◆ m_buffer_capacity

template<class ReadoutType , class LatencyBufferType >
size_t dunedaq::datahandlinglibs::DefaultRequestHandlerModel< ReadoutType, LatencyBufferType >::m_buffer_capacity
protected

Definition at line 266 of file DefaultRequestHandlerModel.hpp.

◆ m_buffered_writer

template<class ReadoutType , class LatencyBufferType >
BufferedFileWriter dunedaq::datahandlinglibs::DefaultRequestHandlerModel< ReadoutType, LatencyBufferType >::m_buffered_writer
protected

Definition at line 228 of file DefaultRequestHandlerModel.hpp.

◆ m_bytes_written

template<class ReadoutType , class LatencyBufferType >
std::atomic<int> dunedaq::datahandlinglibs::DefaultRequestHandlerModel< ReadoutType, LatencyBufferType >::m_bytes_written { 0 }
protected

Definition at line 294 of file DefaultRequestHandlerModel.hpp.

294{ 0 };

◆ m_cleanup_requested

template<class ReadoutType , class LatencyBufferType >
std::atomic<bool> dunedaq::datahandlinglibs::DefaultRequestHandlerModel< ReadoutType, LatencyBufferType >::m_cleanup_requested = false
protected

Definition at line 241 of file DefaultRequestHandlerModel.hpp.

◆ m_cleanup_thread

template<class ReadoutType , class LatencyBufferType >
utilities::ReusableThread dunedaq::datahandlinglibs::DefaultRequestHandlerModel< ReadoutType, LatencyBufferType >::m_cleanup_thread
protected

Definition at line 231 of file DefaultRequestHandlerModel.hpp.

◆ m_configured

template<class ReadoutType , class LatencyBufferType >
bool dunedaq::datahandlinglibs::DefaultRequestHandlerModel< ReadoutType, LatencyBufferType >::m_configured
protected

Definition at line 262 of file DefaultRequestHandlerModel.hpp.

◆ m_cv

template<class ReadoutType , class LatencyBufferType >
std::condition_variable dunedaq::datahandlinglibs::DefaultRequestHandlerModel< ReadoutType, LatencyBufferType >::m_cv
protected

Definition at line 240 of file DefaultRequestHandlerModel.hpp.

◆ m_cv_mutex

template<class ReadoutType , class LatencyBufferType >
std::mutex dunedaq::datahandlinglibs::DefaultRequestHandlerModel< ReadoutType, LatencyBufferType >::m_cv_mutex
protected

Definition at line 239 of file DefaultRequestHandlerModel.hpp.

◆ m_detid

template<class ReadoutType , class LatencyBufferType >
uint16_t dunedaq::datahandlinglibs::DefaultRequestHandlerModel< ReadoutType, LatencyBufferType >::m_detid
protected

Definition at line 268 of file DefaultRequestHandlerModel.hpp.

◆ m_error_registry

template<class ReadoutType , class LatencyBufferType >
std::unique_ptr<FrameErrorRegistry>& dunedaq::datahandlinglibs::DefaultRequestHandlerModel< ReadoutType, LatencyBufferType >::m_error_registry
protected

Definition at line 251 of file DefaultRequestHandlerModel.hpp.

◆ m_frag_out_conn_ids

template<class ReadoutType , class LatencyBufferType >
std::vector<std::string> dunedaq::datahandlinglibs::DefaultRequestHandlerModel< ReadoutType, LatencyBufferType >::m_frag_out_conn_ids
protected

Definition at line 275 of file DefaultRequestHandlerModel.hpp.

◆ m_fragment_send_timeout_ms

template<class ReadoutType , class LatencyBufferType >
int dunedaq::datahandlinglibs::DefaultRequestHandlerModel< ReadoutType, LatencyBufferType >::m_fragment_send_timeout_ms
protected

Definition at line 304 of file DefaultRequestHandlerModel.hpp.

◆ m_handled_requests

template<class ReadoutType , class LatencyBufferType >
std::atomic<int> dunedaq::datahandlinglibs::DefaultRequestHandlerModel< ReadoutType, LatencyBufferType >::m_handled_requests { 0 }
protected

Definition at line 289 of file DefaultRequestHandlerModel.hpp.

289{ 0 };

◆ m_latency_buffer

template<class ReadoutType , class LatencyBufferType >
std::shared_ptr<LatencyBufferType>& dunedaq::datahandlinglibs::DefaultRequestHandlerModel< ReadoutType, LatencyBufferType >::m_latency_buffer
protected

Definition at line 225 of file DefaultRequestHandlerModel.hpp.

◆ m_max_requested_elements

template<class ReadoutType , class LatencyBufferType >
std::size_t dunedaq::datahandlinglibs::DefaultRequestHandlerModel< ReadoutType, LatencyBufferType >::m_max_requested_elements
protected

Definition at line 238 of file DefaultRequestHandlerModel.hpp.

◆ m_next_timestamp_to_record

template<class ReadoutType , class LatencyBufferType >
std::atomic<uint64_t> dunedaq::datahandlinglibs::DefaultRequestHandlerModel< ReadoutType, LatencyBufferType >::m_next_timestamp_to_record = std::numeric_limits<uint64_t>::max()
protected

Definition at line 259 of file DefaultRequestHandlerModel.hpp.

◆ m_num_buffer_cleanups

template<class ReadoutType , class LatencyBufferType >
std::atomic<int> dunedaq::datahandlinglibs::DefaultRequestHandlerModel< ReadoutType, LatencyBufferType >::m_num_buffer_cleanups { 0 }
protected

Definition at line 279 of file DefaultRequestHandlerModel.hpp.

279{ 0 };

◆ m_num_periodic_send_failed

template<class ReadoutType , class LatencyBufferType >
std::atomic<uint64_t> dunedaq::datahandlinglibs::DefaultRequestHandlerModel< ReadoutType, LatencyBufferType >::m_num_periodic_send_failed { 0 }
protected

Definition at line 296 of file DefaultRequestHandlerModel.hpp.

296{ 0 }; // NOLINT(build/unsigned)

◆ m_num_periodic_sent

template<class ReadoutType , class LatencyBufferType >
std::atomic<uint64_t> dunedaq::datahandlinglibs::DefaultRequestHandlerModel< ReadoutType, LatencyBufferType >::m_num_periodic_sent { 0 }
protected

Definition at line 295 of file DefaultRequestHandlerModel.hpp.

295{ 0 }; // NOLINT(build/unsigned)

◆ m_num_request_handling_threads

template<class ReadoutType , class LatencyBufferType >
size_t dunedaq::datahandlinglibs::DefaultRequestHandlerModel< ReadoutType, LatencyBufferType >::m_num_request_handling_threads = 0
protected

Definition at line 248 of file DefaultRequestHandlerModel.hpp.

◆ m_num_requests_bad

template<class ReadoutType , class LatencyBufferType >
std::atomic<int> dunedaq::datahandlinglibs::DefaultRequestHandlerModel< ReadoutType, LatencyBufferType >::m_num_requests_bad { 0 }
protected

Definition at line 284 of file DefaultRequestHandlerModel.hpp.

284{ 0 };

◆ m_num_requests_delayed

template<class ReadoutType , class LatencyBufferType >
std::atomic<int> dunedaq::datahandlinglibs::DefaultRequestHandlerModel< ReadoutType, LatencyBufferType >::m_num_requests_delayed { 0 }
protected

Definition at line 286 of file DefaultRequestHandlerModel.hpp.

286{ 0 };

◆ m_num_requests_found

template<class ReadoutType , class LatencyBufferType >
std::atomic<int> dunedaq::datahandlinglibs::DefaultRequestHandlerModel< ReadoutType, LatencyBufferType >::m_num_requests_found { 0 }
protected

Definition at line 283 of file DefaultRequestHandlerModel.hpp.

283{ 0 };

◆ m_num_requests_old_window

template<class ReadoutType , class LatencyBufferType >
std::atomic<int> dunedaq::datahandlinglibs::DefaultRequestHandlerModel< ReadoutType, LatencyBufferType >::m_num_requests_old_window { 0 }
protected

Definition at line 285 of file DefaultRequestHandlerModel.hpp.

285{ 0 };

◆ m_num_requests_timed_out

template<class ReadoutType , class LatencyBufferType >
std::atomic<int> dunedaq::datahandlinglibs::DefaultRequestHandlerModel< ReadoutType, LatencyBufferType >::m_num_requests_timed_out { 0 }
protected

Definition at line 288 of file DefaultRequestHandlerModel.hpp.

288{ 0 };

◆ m_num_requests_uncategorized

template<class ReadoutType , class LatencyBufferType >
std::atomic<int> dunedaq::datahandlinglibs::DefaultRequestHandlerModel< ReadoutType, LatencyBufferType >::m_num_requests_uncategorized { 0 }
protected

Definition at line 287 of file DefaultRequestHandlerModel.hpp.

287{ 0 };

◆ m_occupancy

template<class ReadoutType , class LatencyBufferType >
std::atomic<int> dunedaq::datahandlinglibs::DefaultRequestHandlerModel< ReadoutType, LatencyBufferType >::m_occupancy
protected

Definition at line 282 of file DefaultRequestHandlerModel.hpp.

◆ m_output_file

template<class ReadoutType , class LatencyBufferType >
std::string dunedaq::datahandlinglibs::DefaultRequestHandlerModel< ReadoutType, LatencyBufferType >::m_output_file
protected

Definition at line 269 of file DefaultRequestHandlerModel.hpp.

◆ m_payloads_written

template<class ReadoutType , class LatencyBufferType >
std::atomic<int> dunedaq::datahandlinglibs::DefaultRequestHandlerModel< ReadoutType, LatencyBufferType >::m_payloads_written { 0 }
protected

Definition at line 293 of file DefaultRequestHandlerModel.hpp.

293{ 0 };

◆ m_periodic_data_transmission_ms

template<class ReadoutType , class LatencyBufferType >
uint32_t dunedaq::datahandlinglibs::DefaultRequestHandlerModel< ReadoutType, LatencyBufferType >::m_periodic_data_transmission_ms = 0
protected

Definition at line 274 of file DefaultRequestHandlerModel.hpp.

◆ m_periodic_transmission_thread

template<class ReadoutType , class LatencyBufferType >
utilities::ReusableThread dunedaq::datahandlinglibs::DefaultRequestHandlerModel< ReadoutType, LatencyBufferType >::m_periodic_transmission_thread
protected

Definition at line 232 of file DefaultRequestHandlerModel.hpp.

◆ m_pop_counter

template<class ReadoutType , class LatencyBufferType >
std::atomic<int> dunedaq::datahandlinglibs::DefaultRequestHandlerModel< ReadoutType, LatencyBufferType >::m_pop_counter
protected

Definition at line 278 of file DefaultRequestHandlerModel.hpp.

◆ m_pop_limit_pct

template<class ReadoutType , class LatencyBufferType >
float dunedaq::datahandlinglibs::DefaultRequestHandlerModel< ReadoutType, LatencyBufferType >::m_pop_limit_pct
protected

Definition at line 263 of file DefaultRequestHandlerModel.hpp.

◆ m_pop_limit_size

template<class ReadoutType , class LatencyBufferType >
unsigned dunedaq::datahandlinglibs::DefaultRequestHandlerModel< ReadoutType, LatencyBufferType >::m_pop_limit_size
protected

Definition at line 265 of file DefaultRequestHandlerModel.hpp.

◆ m_pop_reqs

template<class ReadoutType , class LatencyBufferType >
std::atomic<int> dunedaq::datahandlinglibs::DefaultRequestHandlerModel< ReadoutType, LatencyBufferType >::m_pop_reqs
protected

Definition at line 280 of file DefaultRequestHandlerModel.hpp.

◆ m_pop_size_pct

template<class ReadoutType , class LatencyBufferType >
float dunedaq::datahandlinglibs::DefaultRequestHandlerModel< ReadoutType, LatencyBufferType >::m_pop_size_pct
protected

Definition at line 264 of file DefaultRequestHandlerModel.hpp.

◆ m_pops_count

template<class ReadoutType , class LatencyBufferType >
std::atomic<int> dunedaq::datahandlinglibs::DefaultRequestHandlerModel< ReadoutType, LatencyBufferType >::m_pops_count
protected

Definition at line 281 of file DefaultRequestHandlerModel.hpp.

◆ m_recording

template<class ReadoutType , class LatencyBufferType >
std::atomic<bool> dunedaq::datahandlinglibs::DefaultRequestHandlerModel< ReadoutType, LatencyBufferType >::m_recording = false
protected

Definition at line 258 of file DefaultRequestHandlerModel.hpp.

◆ m_recording_configured

template<class ReadoutType , class LatencyBufferType >
bool dunedaq::datahandlinglibs::DefaultRequestHandlerModel< ReadoutType, LatencyBufferType >::m_recording_configured = false
protected

Definition at line 271 of file DefaultRequestHandlerModel.hpp.

◆ m_recording_thread

template<class ReadoutType , class LatencyBufferType >
utilities::ReusableThread dunedaq::datahandlinglibs::DefaultRequestHandlerModel< ReadoutType, LatencyBufferType >::m_recording_thread
protected

Definition at line 229 of file DefaultRequestHandlerModel.hpp.

◆ m_request_counter

template<class ReadoutType , class LatencyBufferType >
std::map<dfmessages::DataRequest, int> dunedaq::datahandlinglibs::DefaultRequestHandlerModel< ReadoutType, LatencyBufferType >::m_request_counter
protected

Definition at line 235 of file DefaultRequestHandlerModel.hpp.

◆ m_request_handler_thread_pool

template<class ReadoutType , class LatencyBufferType >
std::unique_ptr<boost::asio::thread_pool> dunedaq::datahandlinglibs::DefaultRequestHandlerModel< ReadoutType, LatencyBufferType >::m_request_handler_thread_pool
protected

Definition at line 247 of file DefaultRequestHandlerModel.hpp.

◆ m_request_timeout_ms

template<class ReadoutType , class LatencyBufferType >
int dunedaq::datahandlinglibs::DefaultRequestHandlerModel< ReadoutType, LatencyBufferType >::m_request_timeout_ms
private

Definition at line 306 of file DefaultRequestHandlerModel.hpp.

◆ m_requests_running

template<class ReadoutType , class LatencyBufferType >
std::atomic<int> dunedaq::datahandlinglibs::DefaultRequestHandlerModel< ReadoutType, LatencyBufferType >::m_requests_running = 0
protected

Definition at line 242 of file DefaultRequestHandlerModel.hpp.

◆ m_response_time_acc

template<class ReadoutType , class LatencyBufferType >
std::atomic<int> dunedaq::datahandlinglibs::DefaultRequestHandlerModel< ReadoutType, LatencyBufferType >::m_response_time_acc { 0 }
protected

Definition at line 290 of file DefaultRequestHandlerModel.hpp.

290{ 0 };

◆ m_response_time_max

template<class ReadoutType , class LatencyBufferType >
std::atomic<int> dunedaq::datahandlinglibs::DefaultRequestHandlerModel< ReadoutType, LatencyBufferType >::m_response_time_max { 0 }
protected

Definition at line 292 of file DefaultRequestHandlerModel.hpp.

292{ 0 };

◆ m_response_time_min

template<class ReadoutType , class LatencyBufferType >
std::atomic<int> dunedaq::datahandlinglibs::DefaultRequestHandlerModel< ReadoutType, LatencyBufferType >::m_response_time_min { std::numeric_limits<int>::max() }
protected

Definition at line 291 of file DefaultRequestHandlerModel.hpp.

291{ std::numeric_limits<int>::max() };

◆ m_run_marker

template<class ReadoutType , class LatencyBufferType >
std::atomic<bool> dunedaq::datahandlinglibs::DefaultRequestHandlerModel< ReadoutType, LatencyBufferType >::m_run_marker = false
protected

Definition at line 255 of file DefaultRequestHandlerModel.hpp.

◆ m_sourceid

template<class ReadoutType , class LatencyBufferType >
daqdataformats::SourceID dunedaq::datahandlinglibs::DefaultRequestHandlerModel< ReadoutType, LatencyBufferType >::m_sourceid
protected

Definition at line 267 of file DefaultRequestHandlerModel.hpp.

◆ m_stream_buffer_size

template<class ReadoutType , class LatencyBufferType >
size_t dunedaq::datahandlinglibs::DefaultRequestHandlerModel< ReadoutType, LatencyBufferType >::m_stream_buffer_size = 0
protected

Definition at line 270 of file DefaultRequestHandlerModel.hpp.

◆ m_t0

template<class ReadoutType , class LatencyBufferType >
std::chrono::time_point<std::chrono::high_resolution_clock> dunedaq::datahandlinglibs::DefaultRequestHandlerModel< ReadoutType, LatencyBufferType >::m_t0
protected

Definition at line 252 of file DefaultRequestHandlerModel.hpp.

◆ m_waiting_queue_thread

template<class ReadoutType , class LatencyBufferType >
std::thread dunedaq::datahandlinglibs::DefaultRequestHandlerModel< ReadoutType, LatencyBufferType >::m_waiting_queue_thread
protected

Definition at line 257 of file DefaultRequestHandlerModel.hpp.

◆ m_waiting_requests

template<class ReadoutType , class LatencyBufferType >
std::vector<RequestElement> dunedaq::datahandlinglibs::DefaultRequestHandlerModel< ReadoutType, LatencyBufferType >::m_waiting_requests
protected

Definition at line 243 of file DefaultRequestHandlerModel.hpp.

◆ m_waiting_requests_lock

template<class ReadoutType , class LatencyBufferType >
std::mutex dunedaq::datahandlinglibs::DefaultRequestHandlerModel< ReadoutType, LatencyBufferType >::m_waiting_requests_lock
protected

Definition at line 244 of file DefaultRequestHandlerModel.hpp.

◆ m_warn_about_empty_buffer

template<class ReadoutType , class LatencyBufferType >
bool dunedaq::datahandlinglibs::DefaultRequestHandlerModel< ReadoutType, LatencyBufferType >::m_warn_about_empty_buffer = true
protected

Definition at line 273 of file DefaultRequestHandlerModel.hpp.

◆ m_warn_on_timeout

template<class ReadoutType , class LatencyBufferType >
bool dunedaq::datahandlinglibs::DefaultRequestHandlerModel< ReadoutType, LatencyBufferType >::m_warn_on_timeout = true
protected

Definition at line 272 of file DefaultRequestHandlerModel.hpp.


The documentation for this class was generated from the following files: