9#ifndef DATAHANDLINGLIBS_INCLUDE_DATAHANDLINGLIBS_MODELS_DEFAULTREQUESTHANDLERMODEL_HPP_
10#define DATAHANDLINGLIBS_INCLUDE_DATAHANDLINGLIBS_MODELS_DEFAULTREQUESTHANDLERMODEL_HPP_
34#include <boost/asio.hpp>
37#include <folly/concurrency/UnboundedQueue.h>
60namespace datahandlinglibs {
77 return iter->get_timestamp();
81template<
class ReadoutType,
class LatencyBufferType>
86 using RDT = ReadoutType;
87 using LBT = LatencyBufferType;
96 std::unique_ptr<FrameErrorRegistry>& error_registry)
114 TLOG_DEBUG(TLVL_WORK_STEPS) <<
"DefaultRequestHandlerModel created...";
121 const std::chrono::time_point<std::chrono::high_resolution_clock>& tp_value)
127 std::chrono::time_point<std::chrono::high_resolution_clock>
start_time;
134 void scrap(
const nlohmann::json& )
override;
137 void start(
const nlohmann::json& );
140 void stop(
const nlohmann::json& );
143 void record(
const nlohmann::json& args)
override;
166 fh.
size =
sizeof(fh);
185 void* buffer, uint32_t buffer_pos,
186 const std::size_t& buffer_size)
188 auto bytes_to_copy =
size;
189 while (bytes_to_copy > 0) {
190 auto n = std::min(bytes_to_copy, buffer_size - buffer_pos);
191 std::memcpy(
static_cast<char*
>(buffer) + buffer_pos,
static_cast<const char*
>(data), n);
194 if (buffer_pos == buffer_size) {
252 std::chrono::time_point<std::chrono::high_resolution_clock>
m_t0;
virtual bool supports_cutoff_timestamp()
std::atomic< bool > m_run_marker
std::map< dfmessages::DataRequest, int > m_request_counter
std::size_t m_max_requested_elements
int m_fragment_send_timeout_ms
std::unique_ptr< FrameErrorRegistry > & m_error_registry
std::atomic< int > m_response_time_max
std::atomic< int > m_handled_requests
void conf(const dunedaq::appmodel::DataHandlerModule *)
virtual void periodic_data_transmission() override
Periodic data transmission - relevant for trigger in particular.
std::atomic< bool > m_recording
std::vector< std::string > m_frag_out_conn_ids
void issue_request(dfmessages::DataRequest datarequest, bool is_retry=false) override
Issue a data request to the request handler.
virtual void generate_opmon_data() override
std::atomic< bool > m_cleanup_requested
std::atomic< int > m_num_requests_uncategorized
RequestResult data_request(dfmessages::DataRequest dr) override
std::atomic< int > m_bytes_written
std::atomic< int > m_pop_counter
std::atomic< int > m_num_requests_found
std::atomic< uint64_t > m_next_timestamp_to_record
typename dunedaq::datahandlinglibs::RequestHandlerConcept< ReadoutType, LatencyBufferType >::ResultCode ResultCode
utilities::ReusableThread m_cleanup_thread
std::string m_output_file
std::condition_variable m_cv
bool m_warn_about_empty_buffer
void dump_to_buffer(const void *data, std::size_t size, void *buffer, uint32_t buffer_pos, const std::size_t &buffer_size)
void check_waiting_requests()
utilities::ReusableThread m_recording_thread
DefaultRequestHandlerModel(std::shared_ptr< LatencyBufferType > &latency_buffer, std::unique_ptr< FrameErrorRegistry > &error_registry)
std::atomic< int > m_pops_count
void record(const nlohmann::json &args) override
std::atomic< int > m_occupancy
void cleanup_check() override
Check if cleanup is necessary and execute it if necessary.
bool m_recording_configured
typename dunedaq::datahandlinglibs::RequestHandlerConcept< ReadoutType, LatencyBufferType >::RequestResult RequestResult
BufferedFileWriter m_buffered_writer
std::atomic< int > m_num_buffer_cleanups
size_t m_stream_buffer_size
std::unique_ptr< daqdataformats::Fragment > create_empty_fragment(const dfmessages::DataRequest &dr)
std::atomic< uint64_t > m_num_periodic_send_failed
void periodic_data_transmissions()
std::atomic< uint64_t > m_num_periodic_sent
std::atomic< int > m_payloads_written
std::atomic< int > m_num_requests_timed_out
std::atomic< int > m_num_requests_delayed
std::atomic< int > m_response_time_acc
std::vector< std::pair< void *, size_t > > get_fragment_pieces(uint64_t start_win_ts, uint64_t end_win_ts, RequestResult &rres)
daqdataformats::FragmentHeader create_fragment_header(const dfmessages::DataRequest &dr)
size_t m_num_request_handling_threads
std::shared_ptr< LatencyBufferType > & m_latency_buffer
std::mutex m_waiting_requests_lock
uint32_t m_periodic_data_transmission_ms
std::atomic< int > m_response_time_min
virtual dunedaq::daqdataformats::timestamp_t get_cutoff_timestamp()
std::atomic< int > m_num_requests_bad
std::vector< RequestElement > m_waiting_requests
void scrap(const nlohmann::json &) override
daqdataformats::SourceID m_sourceid
void start(const nlohmann::json &)
std::thread m_waiting_queue_thread
std::atomic< int > m_requests_running
unsigned m_pop_limit_size
std::chrono::time_point< std::chrono::high_resolution_clock > m_t0
std::atomic< int > m_num_requests_old_window
void stop(const nlohmann::json &)
std::unique_ptr< boost::asio::thread_pool > m_request_handler_thread_pool
utilities::ReusableThread m_periodic_transmission_thread
std::atomic< int > m_pop_reqs
#define TLOG_DEBUG(lvl,...)
uint64_t get_frame_iterator_timestamp(T iter)
FELIX Initialization std::string initerror FELIX queue timed std::string queuename Unexpected chunk size
dfmessages::DataRequest request
RequestElement(const dfmessages::DataRequest &data_request, const std::chrono::time_point< std::chrono::high_resolution_clock > &tp_value)
std::chrono::time_point< std::chrono::high_resolution_clock > start_time
This message represents a request for data sent to a single component of the DAQ.
sequence_number_t sequence_number
Sequence Number of the request.
ComponentRequest request_information
trigger_number_t trigger_number
Trigger number the request corresponds to.
timestamp_t trigger_timestamp
Timestamp of trigger.
run_number_t run_number
The current run number.