3#include <folly/coro/BlockingWait.h>
4#include <folly/coro/Timeout.h>
11template<
class RDT,
class RHT,
class LBT,
class RPT,
class IDT>
19 if (input->get_data_type() ==
"DataRequest") {
24 if (output->get_data_type() ==
"TimeSync") {
25 m_generate_timesync =
true;
27 m_timesync_connection_name = output->UID();
32 throw datahandlinglibs::ResourceQueueError(
ERS_HERE,
"raw_input or frag_output",
"SNBDataHandlingModel", excpt);
39 m_error_registry->set_ers_metadata(
"DLH of SourceID[" + std::to_string(mcfg->
get_source_id()) +
"] ");
40 m_latency_buffer_impl.reset(
new LBT());
42 m_request_handler_impl.reset(
new RHT(m_latency_buffer_impl, m_error_registry));
50 m_request_handler_supports_cutoff_timestamp = m_request_handler_impl->supports_cutoff_timestamp();
51 m_fake_trigger =
false;
53 m_sourceid.subsystem = RDT::subsystem;
59 m_raw_processor_impl->conf(mcfg);
64 }
catch (
const std::bad_alloc& be) {
66 datahandlinglibs::ConfigurationError(
ERS_HERE, m_sourceid,
"Latency Buffer can't be allocated with size!"));
68 m_request_handler_impl->conf(mcfg);
71template<
class RDT,
class RHT,
class LBT,
class RPT,
class IDT>
81 dmcbr->register_callback<
IDT>(m_raw_data_callback_conf, m_consume_callback);
84 if (m_generate_timesync) {
85 m_timesync_thread.set_name(
"timesync", m_sourceid.id);
87 if (m_processing_delay_ticks) {
88 m_postprocess_scheduler_thread.set_name(
"pprocsched", m_sourceid.id);
89 m_timekeeper = std::make_unique<folly::ThreadWheelTimekeeper>();
93template<
class RDT,
class RHT,
class LBT,
class RPT,
class IDT>
102 m_num_lb_insert_failures = 0;
103 m_stats_packet_count = 0;
104 m_rawq_timeout_count = 0;
105 m_num_post_processing_delay_max_waits = 0;
107 m_t0 = std::chrono::high_resolution_clock::now();
111 TLOG_DEBUG(TLVL_WORK_STEPS) <<
"Starting threads...";
112 m_raw_processor_impl->start(args);
113 m_request_handler_impl->start(args);
114 if (m_generate_timesync) {
117 if (m_processing_delay_ticks) {
122 m_data_request_receiver->add_callback(
126template<
class RDT,
class RHT,
class LBT,
class RPT,
class IDT>
130 TLOG_DEBUG(TLVL_WORK_STEPS) <<
"Stoppping threads...";
133 m_data_request_receiver->remove_callback();
135 m_request_handler_impl->stop(args);
136 if (m_generate_timesync) {
137 while (!m_timesync_thread.get_readiness()) {
138 std::this_thread::sleep_for(std::chrono::milliseconds(10));
141 if (m_processing_delay_ticks) {
143 while (!m_postprocess_scheduler_thread.get_readiness()) {
144 std::this_thread::sleep_for(std::chrono::milliseconds(10));
147 TLOG_DEBUG(TLVL_WORK_STEPS) <<
"Flushing latency buffer with occupancy: " << m_latency_buffer_impl->occupancy();
148 m_latency_buffer_impl->flush();
149 m_raw_processor_impl->stop(args);
150 m_raw_processor_impl->reset_last_daq_time();
153template<
class RDT,
class RHT,
class LBT,
class RPT,
class IDT>
163 auto now = std::chrono::high_resolution_clock::now();
164 int new_packets = m_stats_packet_count.exchange(0);
165 double seconds = std::chrono::duration_cast<std::chrono::microseconds>(
now - m_t0).count() / 1000000.;
169 int local_num_lb_insert_failures = m_num_lb_insert_failures.exchange(0);
170 if (local_num_lb_insert_failures != 0) {
171 ers::warning(datahandlinglibs::NonZeroLatencyBufferInsertFailures(
184 this->publish(std::move(ri));
187template<
class RDT,
class RHT,
class LBT,
class RPT,
class IDT>
191 if constexpr (std::is_same_v<IDT, RDT>) {
192 process_item(std::move(payload));
194 auto transformed = transform_payload(payload);
195 for (
auto& i : transformed) {
196 process_item(std::move(i));
201template<
class RDT,
class RHT,
class LBT,
class RPT,
class IDT>
205 transform_and_process(std::move(payload));
208template<
class RDT,
class RHT,
class LBT,
class RPT,
class IDT>
212 m_raw_processor_impl->preprocess_item(&payload);
213 if (m_request_handler_supports_cutoff_timestamp) {
214 int64_t diff1 = payload.get_timestamp() - m_request_handler_impl->get_cutoff_timestamp();
220 payload.get_timestamp(),
221 m_request_handler_impl->get_cutoff_timestamp(),
223 (
static_cast<double>(diff1) / 62500.0)));
226 while (m_latency_buffer_impl->isFull()) {
227 std::this_thread::sleep_for(std::chrono::milliseconds(1));
229 if (!m_latency_buffer_impl->write(std::move(payload))) {
232 m_num_lb_insert_failures++;
236 if (m_processing_delay_ticks == 0) {
237 m_raw_processor_impl->postprocess_item(m_latency_buffer_impl->back());
240 ++m_stats_packet_count;
246template<
class RDT,
class RHT,
class LBT,
class RPT,
class IDT>
250 folly::coro::blockingWait(postprocess_schedule());
253template<
class RDT,
class RHT,
class LBT,
class RPT,
class IDT>
254folly::coro::Task<void>
261 bool first_cycle =
true;
262 auto last_post_proc_time = std::chrono::system_clock::now();
263 auto now = last_post_proc_time;
264 std::chrono::milliseconds milliseconds;
265 RDT processed_element;
270 while (m_run_marker.load()) {
272 co_await folly::coro::timeout(
273 m_baton.operator
co_await(), std::chrono::milliseconds{ m_post_processing_delay_max_wait }, m_timekeeper.get());
275 }
catch (
const folly::FutureTimeout&) {
276 ++m_num_post_processing_delay_max_waits;
279 if (m_latency_buffer_impl->occupancy() == 0) {
283 now = std::chrono::system_clock::now();
284 milliseconds = std::chrono::duration_cast<std::chrono::milliseconds>(
now - last_post_proc_time);
286 if (
static_cast<uint64_t
>(milliseconds.count()) <= m_post_processing_delay_min_wait) {
290 last_post_proc_time =
now;
293 auto tail = m_latency_buffer_impl->back();
294 newest_ts = tail->get_timestamp();
297 auto head = m_latency_buffer_impl->front();
298 processed_element.set_timestamp(head->get_timestamp());
303 if (newest_ts - processed_element.get_timestamp() > m_processing_delay_ticks) {
304 end_win_ts = newest_ts - m_processing_delay_ticks;
305 auto start_iter = m_latency_buffer_impl->lower_bound(processed_element,
false);
306 processed_element.set_timestamp(end_win_ts);
307 auto end_iter = m_latency_buffer_impl->lower_bound(processed_element,
false);
309 for (
auto it = start_iter; it != end_iter; ++it) {
310 m_raw_processor_impl->postprocess_item(&(*it));
313 ++m_stats_packet_count;
319template<
class RDT,
class RHT,
class LBT,
class RPT,
class IDT>
323 TLOG_DEBUG(TLVL_WORK_STEPS) <<
"TimeSync thread started...";
326 uint64_t msg_seqno = 0;
328 auto once_per_run =
true;
329 size_t zero_timestamp_count = 0;
330 size_t duplicate_timestamp_count = 0;
331 size_t total_timestamp_count = 0;
332 while (m_run_marker.load()) {
335 ++total_timestamp_count;
339 if (timesyncmsg.daq_time != 0 && timesyncmsg.daq_time != prev_timestamp) {
340 prev_timestamp = timesyncmsg.daq_time;
341 timesyncmsg.run_number = m_run_number;
342 timesyncmsg.sequence_number = ++msg_seqno;
343 timesyncmsg.source_id = m_sourceid.id;
344 TLOG_DEBUG(TLVL_TIME_SYNCS) <<
"New timesync: daq=" << timesyncmsg.daq_time
345 <<
" wall=" << timesyncmsg.system_time <<
" run=" << timesyncmsg.run_number
346 <<
" seqno=" << timesyncmsg.sequence_number
347 <<
" source_id=" << timesyncmsg.source_id;
350 m_timesync_sender->send(std::move(timesyncmsg_copy), std::chrono::milliseconds(500));
353 datahandlinglibs::TimeSyncTransmissionFailed(
ERS_HERE, m_sourceid, m_timesync_connection_name, excpt));
356 if (m_fake_trigger) {
358 ++m_current_fake_trigger_id;
360 dr.
trigger_timestamp = timesyncmsg.daq_time > 500 * us ? timesyncmsg.daq_time - 500 * us : 0;
370 m_request_handler_impl->issue_request(dr);
376 if (timesyncmsg.daq_time == 0) {
377 ++zero_timestamp_count;
379 if (timesyncmsg.daq_time == prev_timestamp) {
380 ++duplicate_timestamp_count;
383 TLOG() <<
"Timesync with DAQ time 0 won't be sent out as it's an invalid sync.";
384 once_per_run =
false;
387 }
catch (
const iomanager::TimeoutExpired& excpt) {
391 for (
size_t i = 0; i < 10; ++i) {
392 std::this_thread::sleep_for(std::chrono::milliseconds(10));
393 if (!m_run_marker.load()) {
399 TLOG_DEBUG(TLVL_WORK_STEPS) <<
"TimeSync thread joins... (timestamp count, zero/same/total = "
400 << zero_timestamp_count <<
"/" << duplicate_timestamp_count <<
"/"
401 << total_timestamp_count <<
")";
404template<
class RDT,
class RHT,
class LBT,
class RPT,
class IDT>
413 TLOG_DEBUG(TLVL_QUEUE_POP) <<
"Received DataRequest" <<
" for trig/seq_number " << data_request.
trigger_number <<
"."
420 m_request_handler_impl->issue_request(data_request);
const dunedaq::appmodel::LatencyBuffer * get_latency_buffer() const
Get "latency_buffer" relationship value.
uint64_t get_post_processing_delay_ticks() const
Get "post_processing_delay_ticks" attribute value. Number of clock tick by which post processing of i...
const dunedaq::appmodel::DataProcessor * get_data_processor() const
Get "data_processor" relationship value.
uint64_t get_post_processing_delay_max_wait() const
Get "post_processing_delay_max_wait" attribute value. Maximum wait time (ms) before post processing c...
uint64_t get_post_processing_delay_min_wait() const
Get "post_processing_delay_min_wait" attribute value. Minimum time (ms) between consecutive post proc...
const dunedaq::appmodel::RequestHandler * get_request_handler() const
Get "request_handler" relationship value.
const dunedaq::appmodel::DataHandlerConf * get_module_configuration() const
Get "module_configuration" relationship value.
const dunedaq::appmodel::DataMoveCallbackConf * get_raw_data_callback() const
Get "raw_data_callback" relationship value. Configuration for raw data callback.
uint32_t get_source_id() const
Get "source_id" attribute value.
bool get_post_processing_enabled() const
Get "post_processing_enabled" attribute value.
const std::string & UID() const noexcept
const std::vector< const dunedaq::confmodel::Connection * > & get_inputs() const
Get "inputs" relationship value. List of connections to/from this module.
const std::vector< const dunedaq::confmodel::Connection * > & get_outputs() const
Get "outputs" relationship value. Output connections from this module.
static std::shared_ptr< DataMoveCallbackRegistry > get()
void set_sum_payloads(::uint64_t value)
void set_num_lb_insert_failures(::uint64_t value)
void set_rate_payloads_consumed(double value)
void set_sum_requests(::uint64_t value)
void set_num_post_processing_delay_max_waits(::uint64_t value)
void set_last_daq_timestamp(::uint64_t value)
void set_oldest_timestamp(::uint64_t value)
void set_num_data_input_timeouts(::uint64_t value)
::uint64_t num_payloads() const
void set_num_payloads(::uint64_t value)
void set_num_requests(::uint64_t value)
void set_newest_timestamp(::uint64_t value)
void start(const appfwk::DAQModule::CommandData_t &args)
void process_item(RDT &&payload)
void init(const appmodel::DataHandlerModule *modconf)
Forward calls from the appfwk.
virtual void generate_opmon_data() override
std::uint64_t timestamp_t
folly::coro::Task< void > postprocess_schedule()
void transform_and_process(IDT &&payload)
void conf(const appfwk::DAQModule::CommandData_t &args)
void stop(const appfwk::DAQModule::CommandData_t &args)
void dispatch_requests(dfmessages::DataRequest &data_request)
void run_postprocess_scheduler()
void run_timesync()
Function that will be run in its own thread and sends periodic timesync messages by pushing them to t...
void consume_callback(IDT &&payload)
Base class for any user define issue.
#define TLOG_DEBUG(lvl,...)
static std::shared_ptr< iomanager::SenderConcept< Datatype > > get_iom_sender(iomanager::ConnectionId const &id)
static std::shared_ptr< iomanager::ReceiverConcept< Datatype > > get_iom_receiver(iomanager::ConnectionId const &id)
void warning(const Issue &issue)
void error(const Issue &issue)
This message represents a request for data sent to a single component of the DAQ.
sequence_number_t sequence_number
Sequence Number of the request.
std::string data_destination
ComponentRequest request_information
trigger_number_t trigger_number
Trigger number the request corresponds to.
timestamp_t trigger_timestamp
Timestamp of trigger.
run_number_t run_number
The current run number.
A synthetic message used to ensure that all elements of a DAQ system are synchronized.