DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
dunedaq::datahandlinglibs::DefaultSkipListRequestHandler< T > Class Template Reference

#include <DefaultSkipListRequestHandler.hpp>

Inheritance diagram for dunedaq::datahandlinglibs::DefaultSkipListRequestHandler< T >:
[legend]
Collaboration diagram for dunedaq::datahandlinglibs::DefaultSkipListRequestHandler< T >:
[legend]

Public Types

using inherited = datahandlinglibs::DefaultRequestHandlerModel<T, datahandlinglibs::SkipListLatencyBufferModel<T>>
 
using SkipListAcc = typename folly::ConcurrentSkipList<T>::Accessor
 
using SkipListSkip = typename folly::ConcurrentSkipList<T>::Skipper
 
- Public Types inherited from dunedaq::datahandlinglibs::DefaultRequestHandlerModel< T, datahandlinglibs::SkipListLatencyBufferModel< T > >
using RDT
 
using LBT
 
using RequestResult
 
using ResultCode
 
- Public Types inherited from dunedaq::opmonlib::MonitorableObject
using NodePtr = std::weak_ptr<MonitorableObject>
 
using NewNodePtr = std::shared_ptr<MonitorableObject>
 
using ElementId = std::string
 

Public Member Functions

 DefaultSkipListRequestHandler (std::shared_ptr< datahandlinglibs::SkipListLatencyBufferModel< T > > &latency_buffer, std::unique_ptr< datahandlinglibs::FrameErrorRegistry > &error_registry)
 
- Public Member Functions inherited from dunedaq::datahandlinglibs::DefaultRequestHandlerModel< T, datahandlinglibs::SkipListLatencyBufferModel< T > >
 DefaultRequestHandlerModel (std::shared_ptr< datahandlinglibs::SkipListLatencyBufferModel< T > > &latency_buffer, std::unique_ptr< FrameErrorRegistry > &error_registry)
 
void conf (const dunedaq::appmodel::DataHandlerModule *)
 
void scrap (const nlohmann::json &) override
 
void start (const nlohmann::json &)
 
void stop (const nlohmann::json &)
 
void record (const nlohmann::json &args) override
 
void cleanup_check () override
 Check if cleanup is necessary and execute it if necessary.
 
virtual void periodic_data_transmission () override
 Periodic data transmission - relevant for trigger in particular.
 
void issue_request (dfmessages::DataRequest datarequest, bool is_retry=false) override
 Issue a data request to the request handler.
 
virtual dunedaq::daqdataformats::timestamp_t get_cutoff_timestamp ()
 
virtual bool supports_cutoff_timestamp ()
 
- Public Member Functions inherited from dunedaq::datahandlinglibs::RequestHandlerConcept< T, datahandlinglibs::SkipListLatencyBufferModel< T > >
 RequestHandlerConcept ()
 
 RequestHandlerConcept (const RequestHandlerConcept &)=delete
 RequestHandlerConcept is not copy-constructible.
 
 RequestHandlerConcept (RequestHandlerConcept &&)=delete
 RequestHandlerConcept is not move-constructible.
 
virtual ~RequestHandlerConcept ()
 
RequestHandlerConceptoperator= (const RequestHandlerConcept &)=delete
 RequestHandlerConcept is not copy-assginable.
 
RequestHandlerConceptoperator= (RequestHandlerConcept &&)=delete
 RequestHandlerConcept is not move-assignable.
 
- Public Member Functions inherited from dunedaq::opmonlib::MonitorableObject
 MonitorableObject (const MonitorableObject &)=delete
 
MonitorableObjectoperator= (const MonitorableObject &)=delete
 
 MonitorableObject (MonitorableObject &&)=delete
 
MonitorableObjectoperator= (MonitorableObject &&)=delete
 
virtual ~MonitorableObject ()=default
 
auto get_opmon_id () const noexcept
 
auto get_opmon_level () const noexcept
 

Protected Member Functions

void cleanup () override
 
void skip_list_cleanup_request ()
 
- Protected Member Functions inherited from dunedaq::datahandlinglibs::DefaultRequestHandlerModel< T, datahandlinglibs::SkipListLatencyBufferModel< T > >
daqdataformats::FragmentHeader create_fragment_header (const dfmessages::DataRequest &dr)
 
std::unique_ptr< daqdataformats::Fragmentcreate_empty_fragment (const dfmessages::DataRequest &dr)
 
void dump_to_buffer (const void *data, std::size_t size, void *buffer, uint32_t buffer_pos, const std::size_t &buffer_size)
 
void periodic_cleanups ()
 
void periodic_data_transmissions ()
 
void check_waiting_requests ()
 
std::vector< std::pair< void *, size_t > > get_fragment_pieces (uint64_t start_win_ts, uint64_t end_win_ts, RequestResult &rres)
 
RequestResult data_request (dfmessages::DataRequest dr) override
 
virtual void generate_opmon_data () override
 
- Protected Member Functions inherited from dunedaq::datahandlinglibs::RequestHandlerConcept< T, datahandlinglibs::SkipListLatencyBufferModel< T > >
const std::string & resultCodeAsString (ResultCode rc)
 
- Protected Member Functions inherited from dunedaq::opmonlib::MonitorableObject
 MonitorableObject ()=default
 
void register_node (ElementId name, NewNodePtr)
 
void publish (google::protobuf::Message &&, CustomOrigin &&co={}, OpMonLevel l=to_level(EntryOpMonLevel::kDefault)) const noexcept
 

Private Attributes

std::atomic< int > m_found_requested_count { 0 }
 
std::atomic< int > m_bad_requested_count { 0 }
 

Static Private Attributes

static const constexpr uint64_t m_max_ts_diff = 625000000
 

Additional Inherited Members

- Static Public Member Functions inherited from dunedaq::opmonlib::MonitorableObject
static bool publishable_metric (OpMonLevel entry, OpMonLevel system) noexcept
 
- Protected Types inherited from dunedaq::datahandlinglibs::RequestHandlerConcept< T, datahandlinglibs::SkipListLatencyBufferModel< T > >
enum  ResultCode
 
- Protected Attributes inherited from dunedaq::datahandlinglibs::DefaultRequestHandlerModel< T, datahandlinglibs::SkipListLatencyBufferModel< T > >
std::shared_ptr< datahandlinglibs::SkipListLatencyBufferModel< T > > & m_latency_buffer
 
BufferedFileWriter m_buffered_writer
 
utilities::ReusableThread m_recording_thread
 
utilities::ReusableThread m_cleanup_thread
 
utilities::ReusableThread m_periodic_transmission_thread
 
std::map< dfmessages::DataRequest, int > m_request_counter
 
std::size_t m_max_requested_elements
 
std::mutex m_cv_mutex
 
std::condition_variable m_cv
 
std::atomic< bool > m_cleanup_requested
 
std::atomic< int > m_requests_running
 
std::vector< RequestElement > m_waiting_requests
 
std::mutex m_waiting_requests_lock
 
std::unique_ptr< boost::asio::thread_pool > m_request_handler_thread_pool
 
size_t m_num_request_handling_threads
 
std::unique_ptr< FrameErrorRegistry > & m_error_registry
 
std::chrono::time_point< std::chrono::high_resolution_clock > m_t0
 
std::atomic< bool > m_run_marker
 
std::thread m_waiting_queue_thread
 
std::atomic< bool > m_recording
 
std::atomic< uint64_t > m_next_timestamp_to_record
 
bool m_configured
 
float m_pop_limit_pct
 
float m_pop_size_pct
 
unsigned m_pop_limit_size
 
size_t m_buffer_capacity
 
daqdataformats::SourceID m_sourceid
 
uint16_t m_detid
 
std::string m_output_file
 
size_t m_stream_buffer_size
 
bool m_recording_configured
 
bool m_warn_on_timeout
 
bool m_warn_about_empty_buffer
 
uint32_t m_periodic_data_transmission_ms
 
std::vector< std::string > m_frag_out_conn_ids
 
std::atomic< int > m_pop_counter
 
std::atomic< int > m_num_buffer_cleanups
 
std::atomic< int > m_pop_reqs
 
std::atomic< int > m_pops_count
 
std::atomic< int > m_occupancy
 
std::atomic< int > m_num_requests_found
 
std::atomic< int > m_num_requests_bad
 
std::atomic< int > m_num_requests_old_window
 
std::atomic< int > m_num_requests_delayed
 
std::atomic< int > m_num_requests_uncategorized
 
std::atomic< int > m_num_requests_timed_out
 
std::atomic< int > m_handled_requests
 
std::atomic< int > m_response_time_acc
 
std::atomic< int > m_response_time_min
 
std::atomic< int > m_response_time_max
 
std::atomic< int > m_payloads_written
 
std::atomic< int > m_bytes_written
 
std::atomic< uint64_t > m_num_periodic_sent
 
std::atomic< uint64_t > m_num_periodic_send_failed
 
int m_fragment_send_timeout_ms
 
- Protected Attributes inherited from dunedaq::datahandlinglibs::RequestHandlerConcept< T, datahandlinglibs::SkipListLatencyBufferModel< T > >
std::map< ResultCode, std::string > ResultCodeStrings
 

Detailed Description

template<class T>
class dunedaq::datahandlinglibs::DefaultSkipListRequestHandler< T >

Definition at line 30 of file DefaultSkipListRequestHandler.hpp.

Member Typedef Documentation

◆ inherited

◆ SkipListAcc

template<class T >
using dunedaq::datahandlinglibs::DefaultSkipListRequestHandler< T >::SkipListAcc = typename folly::ConcurrentSkipList<T>::Accessor

Definition at line 35 of file DefaultSkipListRequestHandler.hpp.

◆ SkipListSkip

template<class T >
using dunedaq::datahandlinglibs::DefaultSkipListRequestHandler< T >::SkipListSkip = typename folly::ConcurrentSkipList<T>::Skipper

Definition at line 36 of file DefaultSkipListRequestHandler.hpp.

Constructor & Destructor Documentation

◆ DefaultSkipListRequestHandler()

template<class T >
dunedaq::datahandlinglibs::DefaultSkipListRequestHandler< T >::DefaultSkipListRequestHandler ( std::shared_ptr< datahandlinglibs::SkipListLatencyBufferModel< T > > & latency_buffer,
std::unique_ptr< datahandlinglibs::FrameErrorRegistry > & error_registry )
inline

Definition at line 39 of file DefaultSkipListRequestHandler.hpp.

41 : datahandlinglibs::DefaultRequestHandlerModel<T, datahandlinglibs::SkipListLatencyBufferModel<T>>(
42 latency_buffer,
43 error_registry)
44 {
45 TLOG_DEBUG(TLVL_WORK_STEPS) << "DefaultSkipListRequestHandler created...";
46 }
#define TLOG_DEBUG(lvl,...)
Definition Logging.hpp:112

Member Function Documentation

◆ cleanup()

◆ skip_list_cleanup_request()

template<class T >
void dunedaq::datahandlinglibs::DefaultSkipListRequestHandler< T >::skip_list_cleanup_request ( )
protected

Definition at line 8 of file DefaultSkipListRequestHandler.hxx.

9{
10 size_t removed_ctr = 0;
11 uint64_t tailts = 0; // oldest // NOLINT(build/unsigned)
12 uint64_t headts = 0; // newest // NOLINT(build/unsigned)
13 {
14 SkipListAcc acc(inherited::m_latency_buffer->get_skip_list());
15 auto tail = acc.last();
16 auto head = acc.first();
17 if (tail && head) {
18 tailts = (*tail).get_timestamp(); // tailptr->get_timestamp();
19 headts = (*head).get_timestamp(); // headptr->get_timestamp();
20 TLOG_DEBUG(TLVL_WORK_STEPS) << "Cleanup REQUEST with "
21 << "Oldest stored TS=" << headts << " "
22 << "Newest stored TS=" << tailts;
23 if (tailts - headts > m_max_ts_diff) { // ts differnce exceeds maximum
25 uint64_t timediff = m_max_ts_diff; // NOLINT(build/unsigned)
26 while (timediff >= m_max_ts_diff) {
27 bool removed = acc.remove(*head);
28 if (!removed) {
29 TLOG_DEBUG(TLVL_WORK_STEPS) << "Unsuccesfull remove from SKL during cleanup: " << removed;
30 } else {
31 ++removed_ctr;
32 }
33 head = acc.first();
34 headts = (*head).get_timestamp();
35 timediff = tailts - headts;
36 }
37 inherited::m_pops_count += removed_ctr;
38 }
39 } else {
40 TLOG_DEBUG(TLVL_WORK_STEPS) << "Didn't manage to get SKL head and tail!";
41 }
42 }
44}
typename folly::ConcurrentSkipList< T >::Accessor SkipListAcc

Member Data Documentation

◆ m_bad_requested_count

template<class T >
std::atomic<int> dunedaq::datahandlinglibs::DefaultSkipListRequestHandler< T >::m_bad_requested_count { 0 }
private

Definition at line 61 of file DefaultSkipListRequestHandler.hpp.

61{ 0 };

◆ m_found_requested_count

template<class T >
std::atomic<int> dunedaq::datahandlinglibs::DefaultSkipListRequestHandler< T >::m_found_requested_count { 0 }
private

Definition at line 60 of file DefaultSkipListRequestHandler.hpp.

60{ 0 };

◆ m_max_ts_diff

template<class T >
const constexpr uint64_t dunedaq::datahandlinglibs::DefaultSkipListRequestHandler< T >::m_max_ts_diff = 625000000
staticconstexprprivate

Definition at line 57 of file DefaultSkipListRequestHandler.hpp.


The documentation for this class was generated from the following files: