6namespace datahandlinglibs {
8template<
class RDT,
class RHT,
class LBT,
class RPT,
class IDT>
16 if (input->get_data_type() ==
"DataRequest") {
20 m_raw_data_receiver_connection_name = input->UID();
22 std::string conn_name = input->UID();
23 const char delim =
'_';
24 std::vector<std::string> words;
27 while ((start = conn_name.find_first_not_of(delim, end)) != std::string::npos) {
28 end = conn_name.find(delim, start);
29 words.push_back(conn_name.substr(start, end - start));
32 TLOG_DEBUG(TLVL_WORK_STEPS) <<
"Initialize connection based on uid: "
33 << m_raw_data_receiver_connection_name <<
" front word: " << words.front();
35 std::string cb_prefix(
"cb");
36 if (words.front() == cb_prefix) {
37 m_callback_mode =
true;
40 if (!m_callback_mode) {
42 m_raw_receiver_timeout_ms = std::chrono::milliseconds(input->get_recv_timeout_ms());
47 if (output->get_data_type() ==
"TimeSync") {
48 m_generate_timesync =
true;
50 m_timesync_connection_name = output->UID();
55 throw ResourceQueueError(
ERS_HERE,
"raw_input or frag_output",
"DataHandlingModel", excpt);
59 if (!m_callback_mode && m_raw_data_receiver ==
nullptr) {
65 m_error_registry->set_ers_metadata(
"DLH of SourceID[" + std::to_string(mcfg->
get_source_id()) +
"] ");
66 m_latency_buffer_impl.reset(
new LBT());
68 m_request_handler_impl.reset(
new RHT(m_latency_buffer_impl, m_error_registry));
76 m_request_handler_supports_cutoff_timestamp = m_request_handler_impl->supports_cutoff_timestamp();
77 m_fake_trigger =
false;
78 m_raw_receiver_sleep_us = std::chrono::microseconds::zero();
80 m_sourceid.subsystem = RDT::subsystem;
85 m_raw_processor_impl->conf(mcfg);
90 }
catch (
const std::bad_alloc& be) {
93 m_request_handler_impl->conf(mcfg);
96template<
class RDT,
class RHT,
class LBT,
class RPT,
class IDT>
101 if (m_callback_mode) {
107 dmcbr->register_callback<
RDT>(m_raw_data_receiver_connection_name, m_consume_callback);
111 m_consumer_thread.set_name(
"consumer", m_sourceid.id);
112 if (m_generate_timesync)
113 m_timesync_thread.set_name(
"timesync", m_sourceid.id);
117template<
class RDT,
class RHT,
class LBT,
class RPT,
class IDT>
126 m_num_lb_insert_failures = 0;
127 m_stats_packet_count = 0;
128 m_rawq_timeout_count = 0;
130 m_t0 = std::chrono::high_resolution_clock::now();
134 TLOG_DEBUG(TLVL_WORK_STEPS) <<
"Starting threads...";
135 m_raw_processor_impl->start(args);
136 m_request_handler_impl->start(args);
137 if (!m_callback_mode) {
142 m_data_request_receiver->add_callback(
146template<
class RDT,
class RHT,
class LBT,
class RPT,
class IDT>
150 TLOG_DEBUG(TLVL_WORK_STEPS) <<
"Stoppping threads...";
153 m_data_request_receiver->remove_callback();
155 m_request_handler_impl->stop(args);
156 if (m_generate_timesync) {
157 while (!m_timesync_thread.get_readiness()) {
158 std::this_thread::sleep_for(std::chrono::milliseconds(10));
161 if (!m_callback_mode) {
162 while (!m_consumer_thread.get_readiness()) {
163 std::this_thread::sleep_for(std::chrono::milliseconds(10));
166 TLOG_DEBUG(TLVL_WORK_STEPS) <<
"Flushing latency buffer with occupancy: " << m_latency_buffer_impl->occupancy();
167 m_latency_buffer_impl->flush();
168 m_raw_processor_impl->stop(args);
169 m_raw_processor_impl->reset_last_daq_time();
172template<
class RDT,
class RHT,
class LBT,
class RPT,
class IDT>
182 auto now = std::chrono::high_resolution_clock::now();
183 int new_packets = m_stats_packet_count.exchange(0);
184 double seconds = std::chrono::duration_cast<std::chrono::microseconds>(
now - m_t0).count() / 1000000.;
188 int local_num_lb_insert_failures = m_num_lb_insert_failures.exchange(0);
189 if (local_num_lb_insert_failures != 0) {
199 this->publish(std::move(ri));
202template<
class RDT,
class RHT,
class LBT,
class RPT,
class IDT>
206 m_raw_processor_impl->preprocess_item(&payload);
207 if (m_request_handler_supports_cutoff_timestamp) {
208 int64_t diff1 = payload.get_timestamp() - m_request_handler_impl->get_cutoff_timestamp();
212 m_request_handler_impl->get_cutoff_timestamp(), diff1,
213 (
static_cast<double>(diff1)/62500.0)));
216 if (!m_latency_buffer_impl->write(std::move(payload))) {
218 m_num_lb_insert_failures++;
220 if (m_processing_delay_ticks ==0) {
221 m_raw_processor_impl->postprocess_item(m_latency_buffer_impl->back());
224 ++m_stats_packet_count;
228template<
class RDT,
class RHT,
class LBT,
class RPT,
class IDT>
233 TLOG_DEBUG(TLVL_WORK_STEPS) <<
"Consumer thread started...";
234 m_rawq_timeout_count = 0;
237 m_stats_packet_count = 0;
243 bool first_cycle =
true;
244 auto last_post_proc_time = std::chrono::system_clock::now();
245 auto now = last_post_proc_time;
246 std::chrono::milliseconds milliseconds;
247 RDT processed_element;
249 while (m_run_marker.load()) {
252 auto opt_payload = m_raw_data_receiver->try_receive(m_raw_receiver_timeout_ms);
256 IDT& original = opt_payload.value();
258 if constexpr (std::is_same_v<IDT, RDT>) {
259 process_item(original);
261 auto transformed = transform_payload(original);
262 for (
auto& i : transformed) {
267 ++m_rawq_timeout_count;
269 if ( m_raw_receiver_sleep_us != std::chrono::microseconds::zero())
270 std::this_thread::sleep_for(m_raw_receiver_sleep_us);
275 if (m_processing_delay_ticks !=0 && m_latency_buffer_impl->occupancy() > 0) {
276 now = std::chrono::system_clock::now();
277 milliseconds = std::chrono::duration_cast<std::chrono::milliseconds>(
now - last_post_proc_time);
279 if (milliseconds.count() > 1) {
280 last_post_proc_time =
now;
282 auto tail = m_latency_buffer_impl->back();
283 newest_ts = tail->get_timestamp();
286 auto head = m_latency_buffer_impl->front();
287 processed_element.set_timestamp(head->get_timestamp());
289 TLOG() <<
"***** First pass post processing *****" ;
292 if (newest_ts - processed_element.get_timestamp() > m_processing_delay_ticks) {
293 end_win_ts = newest_ts - m_processing_delay_ticks;
294 auto start_iter=m_latency_buffer_impl->lower_bound(processed_element,
false);
295 processed_element.set_timestamp(end_win_ts);
296 auto end_iter=m_latency_buffer_impl->lower_bound(processed_element,
false);
298 for (
auto it = start_iter; it!= end_iter; ++it) {
299 m_raw_processor_impl->postprocess_item(&(*it));
302 ++m_stats_packet_count;
308 TLOG_DEBUG(TLVL_WORK_STEPS) <<
"Consumer thread joins... ";
311template<
class RDT,
class RHT,
class LBT,
class RPT,
class IDT>
319 m_raw_processor_impl->preprocess_item(&payload);
320 if (m_request_handler_supports_cutoff_timestamp) {
321 int64_t diff1 = payload.get_timestamp() - m_request_handler_impl->get_cutoff_timestamp();
325 m_request_handler_impl->get_cutoff_timestamp(), diff1,
326 (
static_cast<double>(diff1)/62500.0)));
329 if (!m_latency_buffer_impl->write(std::move(payload))) {
330 TLOG_DEBUG(TLVL_TAKE_NOTE) <<
"***ERROR: Latency buffer is full and data was overwritten!";
331 m_num_lb_insert_failures++;
333#warning RS FIXME: Post-processing delay feature is not implemented in callback consume!
334 m_raw_processor_impl->postprocess_item(m_latency_buffer_impl->back());
337 ++m_stats_packet_count;
340template<
class RDT,
class RHT,
class LBT,
class RPT,
class IDT>
344 TLOG_DEBUG(TLVL_WORK_STEPS) <<
"TimeSync thread started...";
347 uint64_t msg_seqno = 0;
349 auto once_per_run =
true;
350 size_t zero_timestamp_count = 0;
351 size_t duplicate_timestamp_count = 0;
352 size_t total_timestamp_count = 0;
353 while (m_run_marker.load()) {
356 ++total_timestamp_count;
360 if (timesyncmsg.daq_time != 0 && timesyncmsg.daq_time != prev_timestamp) {
361 prev_timestamp = timesyncmsg.daq_time;
362 timesyncmsg.run_number = m_run_number;
363 timesyncmsg.sequence_number = ++msg_seqno;
364 timesyncmsg.source_pid = m_pid_of_current_process;
365 TLOG_DEBUG(TLVL_TIME_SYNCS) <<
"New timesync: daq=" << timesyncmsg.daq_time
366 <<
" wall=" << timesyncmsg.system_time <<
" run=" << timesyncmsg.run_number
367 <<
" seqno=" << timesyncmsg.sequence_number <<
" pid=" << timesyncmsg.source_pid;
370 m_timesync_sender->send(std::move(timesyncmsg_copy), std::chrono::milliseconds(500));
376 if (m_fake_trigger) {
378 ++m_current_fake_trigger_id;
380 dr.
trigger_timestamp = timesyncmsg.daq_time > 500 * us ? timesyncmsg.daq_time - 500 * us : 0;
387 TLOG_DEBUG(TLVL_WORK_STEPS) <<
"Issuing fake trigger based on timesync. "
391 m_request_handler_impl->issue_request(dr);
397 if (timesyncmsg.daq_time == 0) {++zero_timestamp_count;}
398 if (timesyncmsg.daq_time == prev_timestamp) {++duplicate_timestamp_count;}
400 TLOG() <<
"Timesync with DAQ time 0 won't be sent out as it's an invalid sync.";
401 once_per_run =
false;
404 }
catch (
const iomanager::TimeoutExpired& excpt) {
408 for (
size_t i=0; i<10; ++i) {
409 std::this_thread::sleep_for(std::chrono::milliseconds(10));
410 if (!m_run_marker.load()) {
416 TLOG_DEBUG(TLVL_WORK_STEPS) <<
"TimeSync thread joins... (timestamp count, zero/same/total = "
417 << zero_timestamp_count <<
"/" << duplicate_timestamp_count <<
"/"
418 << total_timestamp_count <<
")";
421template<
class RDT,
class RHT,
class LBT,
class RPT,
class IDT>
429 TLOG_DEBUG(TLVL_QUEUE_POP) <<
"Received DataRequest"
437 m_request_handler_impl->issue_request(data_request);
const dunedaq::appmodel::LatencyBuffer * get_latency_buffer() const
Get "latency_buffer" relationship value.
uint64_t get_post_processing_delay_ticks() const
Get "post_processing_delay_ticks" attribute value. Number of clock tick by which post processing of i...
const dunedaq::appmodel::DataProcessor * get_data_processor() const
Get "data_processor" relationship value.
const dunedaq::appmodel::RequestHandler * get_request_handler() const
Get "request_handler" relationship value.
const dunedaq::appmodel::DataHandlerConf * get_module_configuration() const
Get "module_configuration" relationship value.
uint32_t get_source_id() const
Get "source_id" attribute value.
bool get_post_processing_enabled() const
Get "post_processing_enabled" attribute value.
const std::string & UID() const noexcept
const std::vector< const dunedaq::confmodel::Connection * > & get_inputs() const
Get "inputs" relationship value. List of connections to/from this module.
const std::vector< const dunedaq::confmodel::Connection * > & get_outputs() const
Get "outputs" relationship value. Output connections from this module.
void conf(const nlohmann::json &args)
std::uint64_t timestamp_t
void stop(const nlohmann::json &args)
void init(const appmodel::DataHandlerModule *modconf)
Forward calls from the appfwk.
void process_item(RDT &payload)
virtual void generate_opmon_data() override
void consume_payload(RDT &&payload)
void dispatch_requests(dfmessages::DataRequest &data_request)
void start(const nlohmann::json &args)
void run_consume()
Function that will be run in its own thread to read the raw packets from the connection and add them ...
void run_timesync()
Function that will be run in its own thread and sends periodic timesync messages by pushing them to t...
static std::shared_ptr< DataMoveCallbackRegistry > get()
void set_sum_payloads(::uint64_t value)
void set_num_lb_insert_failures(::uint64_t value)
void set_rate_payloads_consumed(double value)
void set_sum_requests(::uint64_t value)
void set_last_daq_timestamp(::uint64_t value)
void set_num_data_input_timeouts(::uint64_t value)
::uint64_t num_payloads() const
void set_num_payloads(::uint64_t value)
void set_num_requests(::uint64_t value)
Base class for any user define issue.
#define TLOG_DEBUG(lvl,...)
static std::shared_ptr< iomanager::SenderConcept< Datatype > > get_iom_sender(iomanager::ConnectionId const &id)
SourceID[" << sourceid << "] Command daqdataformats::SourceID Readout Initialization std::string initerror Configuration std::string conferror Configuration std::string conferror TimeSyncTransmissionFailed
static std::shared_ptr< iomanager::ReceiverConcept< Datatype > > get_iom_receiver(iomanager::ConnectionId const &id)
void warning(const Issue &issue)
void error(const Issue &issue)
This message represents a request for data sent to a single component of the DAQ.
sequence_number_t sequence_number
Sequence Number of the request.
std::string data_destination
ComponentRequest request_information
trigger_number_t trigger_number
Trigger number the request corresponds to.
timestamp_t trigger_timestamp
Timestamp of trigger.
run_number_t run_number
The current run number.
A synthetic message used to ensure that all elements of a DAQ system are synchronized.