DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
dunedaq::trigger::TPRequestHandler Class Reference

#include <TPRequestHandler.hpp>

Inheritance diagram for dunedaq::trigger::TPRequestHandler:
[legend]
Collaboration diagram for dunedaq::trigger::TPRequestHandler:
[legend]

Public Types

using inherited2 = datahandlinglibs::DefaultSkipListRequestHandler<TriggerPrimitiveTypeAdapter>
 
- Public Types inherited from dunedaq::datahandlinglibs::DefaultSkipListRequestHandler< TriggerPrimitiveTypeAdapter >
using inherited
 
using SkipListAcc
 
using SkipListSkip
 
- Public Types inherited from dunedaq::datahandlinglibs::DefaultRequestHandlerModel< TriggerPrimitiveTypeAdapter, datahandlinglibs::SkipListLatencyBufferModel< TriggerPrimitiveTypeAdapter > >
using RDT
 
using LBT
 
using RequestResult
 
using ResultCode
 
- Public Types inherited from dunedaq::opmonlib::MonitorableObject
using NodePtr = std::weak_ptr<MonitorableObject>
 
using NewNodePtr = std::shared_ptr<MonitorableObject>
 
using ElementId = std::string
 

Public Member Functions

 TPRequestHandler (std::shared_ptr< datahandlinglibs::SkipListLatencyBufferModel< TriggerPrimitiveTypeAdapter > > &latency_buffer, std::unique_ptr< datahandlinglibs::FrameErrorRegistry > &error_registry)
 
void conf (const appmodel::DataHandlerModule *conf) override
 
void start (const appfwk::DAQModule::CommandData_t &args) override
 
void scrap (const appfwk::DAQModule::CommandData_t &args) override
 
void periodic_data_transmission () override
 Periodic data transmission - relevant for trigger in particular.
 
- Public Member Functions inherited from dunedaq::datahandlinglibs::DefaultSkipListRequestHandler< TriggerPrimitiveTypeAdapter >
 DefaultSkipListRequestHandler (std::shared_ptr< datahandlinglibs::SkipListLatencyBufferModel< TriggerPrimitiveTypeAdapter > > &latency_buffer, std::unique_ptr< datahandlinglibs::FrameErrorRegistry > &error_registry)
 
- Public Member Functions inherited from dunedaq::datahandlinglibs::DefaultRequestHandlerModel< TriggerPrimitiveTypeAdapter, datahandlinglibs::SkipListLatencyBufferModel< TriggerPrimitiveTypeAdapter > >
 DefaultRequestHandlerModel (std::shared_ptr< datahandlinglibs::SkipListLatencyBufferModel< TriggerPrimitiveTypeAdapter > > &latency_buffer, std::unique_ptr< FrameErrorRegistry > &error_registry)
 
void stop (const appfwk::DAQModule::CommandData_t &)
 
void record (const appfwk::DAQModule::CommandData_t &args) override
 
void cleanup_check () override
 Check if cleanup is necessary and execute it if necessary.
 
void issue_request (dfmessages::DataRequest datarequest, bool is_retry=false) override
 Issue a data request to the request handler.
 
virtual dunedaq::daqdataformats::timestamp_t get_cutoff_timestamp ()
 
virtual bool supports_cutoff_timestamp ()
 
void reset_oldest_time ()
 
std::uint64_t get_oldest_time () override
 Get oldest timestamp in the buffer.
 
- Public Member Functions inherited from dunedaq::datahandlinglibs::RequestHandlerConcept< TriggerPrimitiveTypeAdapter, datahandlinglibs::SkipListLatencyBufferModel< TriggerPrimitiveTypeAdapter > >
 RequestHandlerConcept ()
 
 RequestHandlerConcept (const RequestHandlerConcept &)=delete
 RequestHandlerConcept is not copy-constructible.
 
 RequestHandlerConcept (RequestHandlerConcept &&)=delete
 RequestHandlerConcept is not move-constructible.
 
virtual ~RequestHandlerConcept ()
 
RequestHandlerConceptoperator= (const RequestHandlerConcept &)=delete
 RequestHandlerConcept is not copy-assginable.
 
RequestHandlerConceptoperator= (RequestHandlerConcept &&)=delete
 RequestHandlerConcept is not move-assignable.
 
- Public Member Functions inherited from dunedaq::opmonlib::MonitorableObject
 MonitorableObject (const MonitorableObject &)=delete
 
MonitorableObjectoperator= (const MonitorableObject &)=delete
 
 MonitorableObject (MonitorableObject &&)=delete
 
MonitorableObjectoperator= (MonitorableObject &&)=delete
 
virtual ~MonitorableObject ()=default
 
auto get_opmon_id () const noexcept
 
auto get_opmon_level () const noexcept
 

Private Types

using timestamp_t = std::uint64_t
 

Private Attributes

std::shared_ptr< iomanager::SenderConcept< dunedaq::trigger::TPSet > > m_tpset_sink
 
uint64_t m_run_number
 
uint64_t m_next_tpset_seqno
 
timestamp_t m_oldest_ts =0
 
timestamp_t m_newest_ts =0
 
timestamp_t m_start_win_ts =0
 
timestamp_t m_end_win_ts =0
 
bool m_first_cycle = true
 
uint64_t m_ts_set_sender_offset_ticks = 6250000
 

Additional Inherited Members

- Static Public Member Functions inherited from dunedaq::opmonlib::MonitorableObject
static bool publishable_metric (OpMonLevel entry, OpMonLevel system) noexcept
 
- Protected Types inherited from dunedaq::datahandlinglibs::RequestHandlerConcept< TriggerPrimitiveTypeAdapter, datahandlinglibs::SkipListLatencyBufferModel< TriggerPrimitiveTypeAdapter > >
enum  ResultCode
 
- Protected Member Functions inherited from dunedaq::datahandlinglibs::DefaultSkipListRequestHandler< TriggerPrimitiveTypeAdapter >
void cleanup () override
 
void skip_list_cleanup_request ()
 
- Protected Member Functions inherited from dunedaq::datahandlinglibs::DefaultRequestHandlerModel< TriggerPrimitiveTypeAdapter, datahandlinglibs::SkipListLatencyBufferModel< TriggerPrimitiveTypeAdapter > >
daqdataformats::FragmentHeader create_fragment_header (const dfmessages::DataRequest &dr)
 
std::unique_ptr< daqdataformats::Fragmentcreate_empty_fragment (const dfmessages::DataRequest &dr)
 
void dump_to_buffer (const void *data, std::size_t size, void *buffer, uint32_t buffer_pos, const std::size_t &buffer_size)
 
void periodic_cleanups ()
 
void periodic_data_transmissions ()
 
void check_waiting_requests ()
 
std::vector< std::pair< void *, size_t > > get_fragment_pieces (uint64_t start_win_ts, uint64_t end_win_ts, RequestResult &rres)
 
RequestResult data_request (dfmessages::DataRequest dr) override
 
virtual void generate_opmon_data () override
 
- Protected Member Functions inherited from dunedaq::datahandlinglibs::RequestHandlerConcept< TriggerPrimitiveTypeAdapter, datahandlinglibs::SkipListLatencyBufferModel< TriggerPrimitiveTypeAdapter > >
const std::string & resultCodeAsString (ResultCode rc)
 
- Protected Member Functions inherited from dunedaq::opmonlib::MonitorableObject
 MonitorableObject ()=default
 
void register_node (ElementId name, NewNodePtr)
 
void publish (google::protobuf::Message &&, CustomOrigin &&co={}, OpMonLevel l=to_level(EntryOpMonLevel::kDefault)) const noexcept
 
- Protected Attributes inherited from dunedaq::datahandlinglibs::DefaultRequestHandlerModel< TriggerPrimitiveTypeAdapter, datahandlinglibs::SkipListLatencyBufferModel< TriggerPrimitiveTypeAdapter > >
std::shared_ptr< datahandlinglibs::SkipListLatencyBufferModel< TriggerPrimitiveTypeAdapter > > & m_latency_buffer
 
BufferedFileWriter m_buffered_writer
 
utilities::ReusableThread m_recording_thread
 
utilities::ReusableThread m_cleanup_thread
 
utilities::ReusableThread m_periodic_transmission_thread
 
std::map< dfmessages::DataRequest, int > m_request_counter
 
std::size_t m_max_requested_elements
 
std::mutex m_cv_mutex
 
std::condition_variable m_cv
 
std::atomic< boolm_cleanup_requested
 
std::atomic< int > m_requests_running
 
std::vector< RequestElement > m_waiting_requests
 
std::mutex m_waiting_requests_lock
 
std::unique_ptr< boost::asio::thread_pool > m_request_handler_thread_pool
 
size_t m_num_request_handling_threads
 
std::unique_ptr< FrameErrorRegistry > & m_error_registry
 
std::chrono::time_point< std::chrono::high_resolution_clock > m_t0
 
std::atomic< boolm_run_marker
 
std::thread m_waiting_queue_thread
 
std::atomic< boolm_recording
 
std::atomic< uint64_t > m_next_timestamp_to_record
 
bool m_configured
 
float m_pop_limit_pct
 
float m_pop_size_pct
 
unsigned m_pop_limit_size
 
size_t m_buffer_capacity
 
daqdataformats::SourceID m_sourceid
 
uint16_t m_detid
 
std::string m_output_file
 
size_t m_stream_buffer_size
 
bool m_recording_configured
 
bool m_warn_on_timeout
 
bool m_warn_about_empty_buffer
 
uint32_t m_periodic_data_transmission_ms
 
std::vector< std::string > m_frag_out_conn_ids
 
std::atomic< int > m_pop_counter
 
std::atomic< int > m_num_buffer_cleanups
 
std::atomic< int > m_pop_reqs
 
std::atomic< int > m_pops_count
 
std::atomic< int > m_occupancy
 
std::atomic< int > m_num_requests_found
 
std::atomic< int > m_num_requests_bad
 
std::atomic< int > m_num_requests_old_window
 
std::atomic< int > m_num_requests_delayed
 
std::atomic< int > m_num_requests_uncategorized
 
std::atomic< int > m_num_requests_timed_out
 
std::atomic< int > m_handled_requests
 
std::atomic< int > m_response_time_acc
 
std::atomic< int > m_response_time_min
 
std::atomic< int > m_response_time_max
 
std::atomic< int > m_payloads_written
 
std::atomic< int > m_bytes_written
 
std::atomic< uint64_t > m_num_periodic_sent
 
std::atomic< uint64_t > m_num_periodic_send_failed
 
std::atomic< uint64_t > m_oldest_timestamp
 
int m_fragment_send_timeout_ms
 
- Protected Attributes inherited from dunedaq::datahandlinglibs::RequestHandlerConcept< TriggerPrimitiveTypeAdapter, datahandlinglibs::SkipListLatencyBufferModel< TriggerPrimitiveTypeAdapter > >
std::map< ResultCode, std::string > ResultCodeStrings
 

Detailed Description

Definition at line 48 of file TPRequestHandler.hpp.

Member Typedef Documentation

◆ inherited2

◆ timestamp_t

using dunedaq::trigger::TPRequestHandler::timestamp_t = std::uint64_t
private

Definition at line 70 of file TPRequestHandler.hpp.

Constructor & Destructor Documentation

◆ TPRequestHandler()

dunedaq::trigger::TPRequestHandler::TPRequestHandler ( std::shared_ptr< datahandlinglibs::SkipListLatencyBufferModel< TriggerPrimitiveTypeAdapter > > & latency_buffer,
std::unique_ptr< datahandlinglibs::FrameErrorRegistry > & error_registry )
inline

Definition at line 55 of file TPRequestHandler.hpp.

57 : datahandlinglibs::DefaultSkipListRequestHandler<TriggerPrimitiveTypeAdapter>(
58 latency_buffer,
59 error_registry)
60 {
61 TLOG_DEBUG(TLVL_WORK_STEPS) << "TPRequestHandler created...";
62 }
#define TLOG_DEBUG(lvl,...)
Definition Logging.hpp:112

Member Function Documentation

◆ conf()

void dunedaq::trigger::TPRequestHandler::conf ( const appmodel::DataHandlerModule * conf)
overridevirtual

Reimplemented from dunedaq::datahandlinglibs::DefaultRequestHandlerModel< TriggerPrimitiveTypeAdapter, datahandlinglibs::SkipListLatencyBufferModel< TriggerPrimitiveTypeAdapter > >.

Definition at line 11 of file TPRequestHandler.cpp.

11 {
12
13 for (auto output : conf->get_outputs()) {
14 if (output->get_data_type() == "TPSet") {
15 try {
17 } catch (const ers::Issue& excpt) {
18 throw datahandlinglibs::ResourceQueueError(ERS_HERE, "tp queue", "DefaultRequestHandlerModel", excpt);
19 }
20 }
21 }
23}
#define ERS_HERE
static std::shared_ptr< IOManager > get()
Definition IOManager.hpp:40
A set of TPs or TAs in a given time window, defined by its start and end times.
Definition Set.hpp:26
std::shared_ptr< iomanager::SenderConcept< dunedaq::trigger::TPSet > > m_tpset_sink
void conf(const appmodel::DataHandlerModule *conf) override
Base class for any user define issue.
Definition Issue.hpp:69

◆ periodic_data_transmission()

void dunedaq::trigger::TPRequestHandler::periodic_data_transmission ( )
overridevirtual

Periodic data transmission - relevant for trigger in particular.

Reimplemented from dunedaq::datahandlinglibs::DefaultRequestHandlerModel< TriggerPrimitiveTypeAdapter, datahandlinglibs::SkipListLatencyBufferModel< TriggerPrimitiveTypeAdapter > >.

Definition at line 49 of file TPRequestHandler.cpp.

49 {
50
51 if (m_tpset_sink == nullptr) return;
52
54
55 {
56 std::unique_lock<std::mutex> lock(m_cv_mutex);
57 m_cv.wait(lock, [&] { return !m_cleanup_requested; });
59 }
60 m_cv.notify_all();
61 if(m_latency_buffer->occupancy() != 0) {
62 // Prepare response
63 RequestResult rres(ResultCode::kUnknown, dr);
64 std::vector<std::pair<void*, size_t>> frag_pieces;
65
66 // Get the newest TP
67 SkipListAcc acc(inherited2::m_latency_buffer->get_skip_list());
68 auto tail = acc.last();
69 auto head = acc.first();
70 m_newest_ts = (*tail).get_timestamp();
71 m_oldest_ts = (*head).get_timestamp();
72
73 if (m_first_cycle) {
75 m_first_cycle = false;
76 }
80 auto num_tps = frag_pieces.size();
81 trigger::TPSet tpset;
82 tpset.run_number = m_run_number;
84 tpset.origin = m_sourceid;
85 tpset.start_time = m_start_win_ts; // provisory timestamp, will be filled with first TP
86 tpset.end_time = m_end_win_ts; // provisory timestamp, will be filled with last TP
87 tpset.seqno = m_next_tpset_seqno++; // NOLINT(runtime/increment_decrement)
88 // reserve the space for efficiency
89 if (num_tps > 0) {
90 tpset.objects.reserve(frag_pieces.size());
91 bool first_tp = true;
92 for( auto f : frag_pieces) {
93 trgdataformats::TriggerPrimitive tp = *(static_cast<trgdataformats::TriggerPrimitive*>(f.first));
94
95 if(first_tp) {
96 tpset.start_time = tp.time_start;
97 first_tp = false;
98 }
99 tpset.end_time = tp.time_start;
100 tpset.objects.emplace_back(std::move(tp));
101 }
102 }
103 if(!m_tpset_sink->try_send(std::move(tpset), iomanager::Sender::s_no_block)) {
106 }
108
109 //remember what we sent for the next loop
111 }
112 }
113 {
114 std::lock_guard<std::mutex> lock(m_cv_mutex);
116 }
117 m_cv.notify_all();
118 return;
119}
typename dunedaq::datahandlinglibs::RequestHandlerConcept< TriggerPrimitiveTypeAdapter, datahandlinglibs::SkipListLatencyBufferModel< TriggerPrimitiveTypeAdapter > >::RequestResult RequestResult
typename folly::ConcurrentSkipList< TriggerPrimitiveTypeAdapter >::Accessor SkipListAcc
static constexpr timeout_t s_no_block
Definition Sender.hpp:26
Set< trgdataformats::TriggerPrimitive > TPSet
Definition TPSet.hpp:20
void warning(const Issue &issue)
Definition ers.hpp:115
This message represents a request for data sent to a single component of the DAQ.

◆ scrap()

void dunedaq::trigger::TPRequestHandler::scrap ( const appfwk::DAQModule::CommandData_t & args)
overridevirtual

◆ start()

void dunedaq::trigger::TPRequestHandler::start ( const appfwk::DAQModule::CommandData_t & args)
overridevirtual

Member Data Documentation

◆ m_end_win_ts

timestamp_t dunedaq::trigger::TPRequestHandler::m_end_win_ts =0
private

Definition at line 78 of file TPRequestHandler.hpp.

◆ m_first_cycle

bool dunedaq::trigger::TPRequestHandler::m_first_cycle = true
private

Definition at line 79 of file TPRequestHandler.hpp.

◆ m_newest_ts

timestamp_t dunedaq::trigger::TPRequestHandler::m_newest_ts =0
private

Definition at line 76 of file TPRequestHandler.hpp.

◆ m_next_tpset_seqno

uint64_t dunedaq::trigger::TPRequestHandler::m_next_tpset_seqno
private

Definition at line 73 of file TPRequestHandler.hpp.

◆ m_oldest_ts

timestamp_t dunedaq::trigger::TPRequestHandler::m_oldest_ts =0
private

Definition at line 75 of file TPRequestHandler.hpp.

◆ m_run_number

uint64_t dunedaq::trigger::TPRequestHandler::m_run_number
private

Definition at line 72 of file TPRequestHandler.hpp.

◆ m_start_win_ts

timestamp_t dunedaq::trigger::TPRequestHandler::m_start_win_ts =0
private

Definition at line 77 of file TPRequestHandler.hpp.

◆ m_tpset_sink

std::shared_ptr<iomanager::SenderConcept<dunedaq::trigger::TPSet> > dunedaq::trigger::TPRequestHandler::m_tpset_sink
private

Definition at line 71 of file TPRequestHandler.hpp.

◆ m_ts_set_sender_offset_ticks

uint64_t dunedaq::trigger::TPRequestHandler::m_ts_set_sender_offset_ticks = 6250000
private

Definition at line 80 of file TPRequestHandler.hpp.


The documentation for this class was generated from the following files: