DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
dunedaq::fdreadoutlibs::DAPHNEEthStreamFrameProcessor Class Reference

#include <DAPHNEEthStreamFrameProcessor.hpp>

Inheritance diagram for dunedaq::fdreadoutlibs::DAPHNEEthStreamFrameProcessor:
[legend]
Collaboration diagram for dunedaq::fdreadoutlibs::DAPHNEEthStreamFrameProcessor:
[legend]

Public Types

using inherited = datahandlinglibs::TaskRawDataProcessorModel<types::DAPHNEEthStreamTypeAdapter>
using frameptr = types::DAPHNEEthStreamTypeAdapter*
using daphneframeptr = dunedaq::fddetdataformats::DAPHNEEthStreamFrame*
using timestamp_t = std::uint64_t
Public Types inherited from dunedaq::opmonlib::MonitorableObject
using NodePtr = std::weak_ptr<MonitorableObject>
using NewNodePtr = std::shared_ptr<MonitorableObject>
using ElementId = std::string

Public Member Functions

 DAPHNEEthStreamFrameProcessor (std::unique_ptr< datahandlinglibs::FrameErrorRegistry > &error_registry, bool post_processing_enabled)
void conf (const appmodel::DataHandlerModule *conf) override
 Set the emulator mode, if active, timestamps of processed packets are overwritten with new ones.
Public Member Functions inherited from dunedaq::datahandlinglibs::TaskRawDataProcessorModel< types::DAPHNEEthStreamTypeAdapter >
 TaskRawDataProcessorModel (std::unique_ptr< FrameErrorRegistry > &error_registry, bool post_processing_enabled)
 ~TaskRawDataProcessorModel ()
void conf (const appmodel::DataHandlerModule *conf) override
 Set the emulator mode, if active, timestamps of processed packets are overwritten with new ones.
void scrap (const appfwk::DAQModule::CommandData_t &) override
 Unconfigure.
void start (const appfwk::DAQModule::CommandData_t &) override
 Start operation.
void stop (const appfwk::DAQModule::CommandData_t &) override
 Stop operation.
void reset_last_daq_time ()
std::uint64_t get_last_daq_time () override
 Get newest timestamp of last seen packet.
void preprocess_item (types::DAPHNEEthStreamTypeAdapter *item) override
 Preprocess one element.
void postprocess_item (const types::DAPHNEEthStreamTypeAdapter *item) override
 Postprocess one element.
void invoke_postprocess_schedule_timeout_policy (std::uint64_t accumilated_timeout_ticks) override
 Handle postprocess timeout event.
void add_preprocess_task (Task &&task)
void add_postprocess_task (Task &&task)
void invoke_all_preprocess_functions (types::DAPHNEEthStreamTypeAdapter *item)
void launch_all_preprocess_functions (types::DAPHNEEthStreamTypeAdapter *item)
Public Member Functions inherited from dunedaq::datahandlinglibs::RawDataProcessorConcept< ReadoutType >
 RawDataProcessorConcept ()
virtual ~RawDataProcessorConcept ()
 RawDataProcessorConcept (const RawDataProcessorConcept &)=delete
 RawDataProcessorConcept is not copy-constructible.
RawDataProcessorConceptoperator= (const RawDataProcessorConcept &)=delete
 RawDataProcessorConcept is not copy-assginable.
 RawDataProcessorConcept (RawDataProcessorConcept &&)=delete
 RawDataProcessorConcept is not move-constructible.
RawDataProcessorConceptoperator= (RawDataProcessorConcept &&)=delete
 RawDataProcessorConcept is not move-assignable.
virtual void preprocess_item (ReadoutType *item)=0
 Preprocess one element.
virtual void postprocess_item (const ReadoutType *item)=0
 Postprocess one element.
Public Member Functions inherited from dunedaq::opmonlib::MonitorableObject
 MonitorableObject (const MonitorableObject &)=delete
MonitorableObjectoperator= (const MonitorableObject &)=delete
 MonitorableObject (MonitorableObject &&)=delete
MonitorableObjectoperator= (MonitorableObject &&)=delete
virtual ~MonitorableObject ()=default
auto get_opmon_id () const noexcept
auto get_opmon_level () const noexcept

Protected Member Functions

void timestamp_check (frameptr)
void frame_error_check (frameptr)
Protected Member Functions inherited from dunedaq::datahandlinglibs::TaskRawDataProcessorModel< types::DAPHNEEthStreamTypeAdapter >
virtual void generate_opmon_data () override
void run_post_processing_thread (std::function< void(const types::DAPHNEEthStreamTypeAdapter *)> &function, folly::ProducerConsumerQueue< const types::DAPHNEEthStreamTypeAdapter * > &queue)
Protected Member Functions inherited from dunedaq::opmonlib::MonitorableObject
 MonitorableObject ()=default
void register_node (ElementId name, NewNodePtr)
void publish (google::protobuf::Message &&, CustomOrigin &&co={}, OpMonLevel l=to_level(EntryOpMonLevel::kDefault)) const noexcept

Protected Attributes

timestamp_t m_previous_ts = 0
timestamp_t m_current_ts = 0
bool m_first_ts_fake = true
bool m_first_ts_missmatch = true
bool m_problem_reported = false
std::atomic< int > m_ts_error_ctr { 0 }
Protected Attributes inherited from dunedaq::datahandlinglibs::TaskRawDataProcessorModel< types::DAPHNEEthStreamTypeAdapter >
std::vector< std::function< void(types::DAPHNEEthStreamTypeAdapter *)> > m_preprocess_functions
std::unique_ptr< FrameErrorRegistry > & m_error_registry
bool m_post_processing_enabled
std::atomic< boolm_run_marker
std::vector< std::function< void(const types::DAPHNEEthStreamTypeAdapter *)> > m_post_process_functions
std::vector< std::unique_ptr< folly::ProducerConsumerQueue< const types::DAPHNEEthStreamTypeAdapter * > > > m_items_to_postprocess_queues
std::vector< std::unique_ptr< utilities::ReusableThread > > m_post_process_threads
size_t m_postprocess_queue_sizes
daqdataformats::SourceID m_sourceid
std::atomic< uint64_t > m_last_processed_daq_ts

Additional Inherited Members

Static Public Member Functions inherited from dunedaq::opmonlib::MonitorableObject
static bool publishable_metric (OpMonLevel entry, OpMonLevel system) noexcept

Detailed Description

Definition at line 32 of file DAPHNEEthStreamFrameProcessor.hpp.

Member Typedef Documentation

◆ daphneframeptr

◆ frameptr

◆ inherited

◆ timestamp_t

Constructor & Destructor Documentation

◆ DAPHNEEthStreamFrameProcessor()

dunedaq::fdreadoutlibs::DAPHNEEthStreamFrameProcessor::DAPHNEEthStreamFrameProcessor ( std::unique_ptr< datahandlinglibs::FrameErrorRegistry > & error_registry,
bool post_processing_enabled )
inlineexplicit

Definition at line 41 of file DAPHNEEthStreamFrameProcessor.hpp.

42 : datahandlinglibs::TaskRawDataProcessorModel<types::DAPHNEEthStreamTypeAdapter>(error_registry, post_processing_enabled)
43 {}

Member Function Documentation

◆ conf()

void dunedaq::fdreadoutlibs::DAPHNEEthStreamFrameProcessor::conf ( const appmodel::DataHandlerModule * conf)
overridevirtual

Set the emulator mode, if active, timestamps of processed packets are overwritten with new ones.

Implements dunedaq::datahandlinglibs::RawDataProcessorConcept< ReadoutType >.

Definition at line 24 of file DAPHNEEthStreamFrameProcessor.cpp.

25{
27 std::bind(&DAPHNEEthStreamFrameProcessor::timestamp_check, this, std::placeholders::_1));
28 // m_tasklist.push_back( std::bind(&DAPHNEStreamFrameProcessor::frame_error_check, this, std::placeholders::_1) );
30}
TaskRawDataProcessorModel(std::unique_ptr< FrameErrorRegistry > &error_registry, bool post_processing_enabled)
void conf(const appmodel::DataHandlerModule *conf) override
Set the emulator mode, if active, timestamps of processed packets are overwritten with new ones.

◆ frame_error_check()

void dunedaq::fdreadoutlibs::DAPHNEEthStreamFrameProcessor::frame_error_check ( frameptr )
protected

Pipeline Stage 2.: Check DAPHNE headers for error flags

Definition at line 80 of file DAPHNEEthStreamFrameProcessor.cpp.

81{
82 // check error fields
83}

◆ timestamp_check()

void dunedaq::fdreadoutlibs::DAPHNEEthStreamFrameProcessor::timestamp_check ( frameptr fp)
protected

Pipeline Stage 1.: Check proper timestamp increments in DAPHNE frame

Definition at line 36 of file DAPHNEEthStreamFrameProcessor.cpp.

37{
38/* Let Source Emulator deal with this
39 // If EMU data, emulate perfectly incrementing timestamp
40 if (inherited::m_emulator_mode) { // emulate perfectly incrementing timestamp
41 uint64_t ts_next = m_previous_ts + 64; // NOLINT(build/unsigned)
42 auto df = reinterpret_cast<daphneframeptr>(((uint8_t*)fp)); // NOLINT
43 for (unsigned int i = 0; i < fp->get_num_frames(); ++i) { // NOLINT(build/unsigned)
44 //auto wfh = const_cast<dunedaq::fddetdataformats::WIB2Header*>(wf->get_wib_header());
45 df->set_timestamp(ts_next);
46 ts_next += 64;
47 df++;
48 }
49 }
50*/
51 // Acquire timestamp
52 m_current_ts = fp->get_timestamp();
53 uint64_t k_clock_frequency = 62500000; // NOLINT(build/unsigned)
55 uint16_t frame_tick_difference = tick_difference * fp->get_num_frames();
56 TLOG_DEBUG(TLVL_FRAME_RECEIVED) << "Received DAPHNEEthStream frame timestamp value of " << m_current_ts << " ticks (..." << std::fixed << std::setprecision(8) << (static_cast<double>(m_current_ts % (k_clock_frequency*1000)) / static_cast<double>(k_clock_frequency)) << " sec)"; // NOLINT
57
58 // Check timestamp
59 // RS warning : not fixed rate!
60 if (m_current_ts - m_previous_ts != frame_tick_difference) {
62 }
63
64 if (m_ts_error_ctr > 1000) {
65 if (!m_problem_reported) {
66 TLOG() << "*** Data Integrity ERROR *** Timestamp continuity is completely broken! "
67 << "Something is wrong with the FE source or with the configuration!";
68 m_problem_reported = true;
69 }
70 }
71
74}
#define TLOG_DEBUG(lvl,...)
Definition Logging.hpp:112
#define TLOG(...)
Definition macro.hpp:22
@ TLVL_FRAME_RECEIVED

Member Data Documentation

◆ m_current_ts

timestamp_t dunedaq::fdreadoutlibs::DAPHNEEthStreamFrameProcessor::m_current_ts = 0
protected

Definition at line 61 of file DAPHNEEthStreamFrameProcessor.hpp.

◆ m_first_ts_fake

bool dunedaq::fdreadoutlibs::DAPHNEEthStreamFrameProcessor::m_first_ts_fake = true
protected

Definition at line 62 of file DAPHNEEthStreamFrameProcessor.hpp.

◆ m_first_ts_missmatch

bool dunedaq::fdreadoutlibs::DAPHNEEthStreamFrameProcessor::m_first_ts_missmatch = true
protected

Definition at line 63 of file DAPHNEEthStreamFrameProcessor.hpp.

◆ m_previous_ts

timestamp_t dunedaq::fdreadoutlibs::DAPHNEEthStreamFrameProcessor::m_previous_ts = 0
protected

Definition at line 60 of file DAPHNEEthStreamFrameProcessor.hpp.

◆ m_problem_reported

bool dunedaq::fdreadoutlibs::DAPHNEEthStreamFrameProcessor::m_problem_reported = false
protected

Definition at line 64 of file DAPHNEEthStreamFrameProcessor.hpp.

◆ m_ts_error_ctr

std::atomic<int> dunedaq::fdreadoutlibs::DAPHNEEthStreamFrameProcessor::m_ts_error_ctr { 0 }
protected

Definition at line 65 of file DAPHNEEthStreamFrameProcessor.hpp.

65{ 0 };

The documentation for this class was generated from the following files: