8#ifndef DATAHANDLINGLIBS_INCLUDE_DATAHANDLINGLIBS_MODELS_SNBRequestHandlerModel_HPP_
9#define DATAHANDLINGLIBS_INCLUDE_DATAHANDLINGLIBS_MODELS_SNBRequestHandlerModel_HPP_
33#include <boost/asio.hpp>
36#include <folly/concurrency/UnboundedQueue.h>
76 return iter->get_timestamp();
79template<
class ReadoutType,
class LatencyBufferType>
84 using RDT = ReadoutType;
85 using LBT = LatencyBufferType;
94 std::unique_ptr<datahandlinglibs::FrameErrorRegistry>& error_registry)
109 TLOG_DEBUG(TLVL_WORK_STEPS) <<
"SNBRequestHandlerModel created...";
116 const std::chrono::time_point<std::chrono::high_resolution_clock>& tp_value)
123 std::chrono::time_point<std::chrono::high_resolution_clock>
start_time;
130 void scrap(
const appfwk::DAQModule::CommandData_t& )
override;
133 void start(
const appfwk::DAQModule::CommandData_t& );
136 void stop(
const appfwk::DAQModule::CommandData_t& );
139 void record(
const appfwk::DAQModule::CommandData_t& args)
override;
167 fh.
size =
sizeof(fh);
188 const std::size_t& buffer_size)
190 auto bytes_to_copy =
size;
191 while (bytes_to_copy > 0) {
192 auto n = std::min(bytes_to_copy, buffer_size - buffer_pos);
193 std::memcpy(
static_cast<char*
>(buffer) + buffer_pos,
static_cast<const char*
>(data), n);
196 if (buffer_pos == buffer_size) {
252 std::chrono::time_point<std::chrono::high_resolution_clock>
m_t0;
std::atomic< bool > m_recording
void start(const appfwk::DAQModule::CommandData_t &)
std::unique_ptr< datahandlinglibs::FrameErrorRegistry > & m_error_registry
typename dunedaq::datahandlinglibs::RequestHandlerConcept< ReadoutType, LatencyBufferType >::RequestResult RequestResult
std::set< uint64_t > m_pop_list
std::string m_output_file
RequestResult data_request(dfmessages::DataRequest dr) override
virtual void generate_opmon_data() override
std::atomic< int > m_response_time_acc
std::shared_ptr< LatencyBufferType > & m_latency_buffer
virtual dunedaq::daqdataformats::timestamp_t get_cutoff_timestamp()
std::vector< std::string > m_frag_out_conn_ids
std::condition_variable m_cv
void conf(const dunedaq::appmodel::DataHandlerModule *)
bool m_warn_about_empty_buffer
typename dunedaq::datahandlinglibs::RequestHandlerConcept< ReadoutType, LatencyBufferType >::ResultCode ResultCode
virtual bool supports_cutoff_timestamp()
std::atomic< int > m_pop_counter
std::unique_ptr< daqdataformats::Fragment > create_empty_fragment(const dfmessages::DataRequest &dr)
bool m_recording_configured
std::atomic< bool > m_run_marker
void issue_request(dfmessages::DataRequest datarequest, bool is_retry=false) override
Issue a data request to the request handler.
std::atomic< int > m_payloads_written
void dump_to_buffer(const void *data, std::size_t size, void *buffer, uint32_t buffer_pos, const std::size_t &buffer_size)
std::atomic< int > m_response_time_max
std::atomic< int > m_requests_running
std::mutex m_pop_list_mutex
daqdataformats::FragmentHeader create_fragment_header(const dfmessages::DataRequest &dr)
void scrap(const appfwk::DAQModule::CommandData_t &) override
std::atomic< int > m_num_requests_bad
void cleanup_check() override
Check if cleanup is necessary and execute it if necessary.
void check_waiting_requests()
std::atomic< int > m_num_requests_timed_out
int m_fragment_send_timeout_ms
utilities::ReusableThread m_periodic_transmission_thread
std::atomic< int > m_num_buffer_cleanups
std::atomic< int > m_response_time_min
std::uint64_t get_oldest_time() override
Get oldest timestamp in the buffer.
std::atomic< uint64_t > m_oldest_timestamp
utilities::ReusableThread m_recording_thread
std::atomic< uint64_t > m_num_periodic_sent
std::thread m_waiting_queue_thread
void stop(const appfwk::DAQModule::CommandData_t &)
std::atomic< int > m_num_requests_delayed
std::atomic< bool > m_cleanup_requested
std::map< dfmessages::DataRequest, int > m_request_counter
std::atomic< int > m_num_requests_found
std::vector< std::pair< void *, size_t > > get_fragment_pieces(uint64_t start_win_ts, uint64_t end_win_ts, RequestResult &rres)
SNBRequestHandlerModel(std::shared_ptr< LatencyBufferType > &latency_buffer, std::unique_ptr< datahandlinglibs::FrameErrorRegistry > &error_registry)
std::atomic< uint64_t > m_num_periodic_send_failed
std::atomic< int > m_pop_reqs
std::chrono::time_point< std::chrono::high_resolution_clock > m_t0
std::atomic< int > m_num_requests_uncategorized
utilities::ReusableThread m_cleanup_thread
std::atomic< uint64_t > m_next_timestamp_to_record
std::atomic< int > m_handled_requests
void record(const appfwk::DAQModule::CommandData_t &args) override
datahandlinglibs::BufferedFileWriter m_buffered_writer
size_t m_num_request_handling_threads
size_t m_stream_buffer_size
std::atomic< int > m_occupancy
std::atomic< int > m_pops_count
uint32_t m_periodic_data_transmission_ms
std::unique_ptr< boost::asio::thread_pool > m_request_handler_thread_pool
std::vector< RequestElement > m_waiting_requests
void periodic_data_transmissions()
virtual void periodic_data_transmission() override
Periodic data transmission - relevant for trigger in particular.
daqdataformats::SourceID m_sourceid
std::atomic< int > m_bytes_written
std::atomic< int > m_num_requests_old_window
std::mutex m_waiting_requests_lock
#define TLOG_DEBUG(lvl,...)
uint64_t get_frame_iterator_timestamp(T iter)
FELIX Initialization std::string initerror FELIX queue timed std::string queuename Unexpected chunk size
This message represents a request for data sent to a single component of the DAQ.
sequence_number_t sequence_number
Sequence Number of the request.
ComponentRequest request_information
trigger_number_t trigger_number
Trigger number the request corresponds to.
timestamp_t trigger_timestamp
Timestamp of trigger.
run_number_t run_number
The current run number.
RequestElement(const dfmessages::DataRequest &data_request, const std::chrono::time_point< std::chrono::high_resolution_clock > &tp_value)
std::chrono::time_point< std::chrono::high_resolution_clock > start_time
dfmessages::DataRequest request