DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
dunedaq::flxlibs::ElinkModel< TargetPayloadType > Class Template Reference

#include <ElinkModel.hpp>

Inheritance diagram for dunedaq::flxlibs::ElinkModel< TargetPayloadType >:
[legend]
Collaboration diagram for dunedaq::flxlibs::ElinkModel< TargetPayloadType >:
[legend]

Public Types

using err_sink_t = iomanager::SenderConcept<felix::packetformat::chunk>
 
using inherited = ElinkConcept
 
using data_t = nlohmann::json
 
using sink_cb_t = std::shared_ptr<std::function<void(TargetPayloadType&&)>>
 
- Public Types inherited from dunedaq::opmonlib::MonitorableObject
using NodePtr = std::weak_ptr<MonitorableObject>
 
using NewNodePtr = std::shared_ptr<MonitorableObject>
 
using ElementId = std::string
 

Public Member Functions

 ElinkModel ()
 ElinkModel Constructor.
 
 ~ElinkModel ()
 
std::shared_ptr< err_sink_t > & get_error_sink ()
 
void init (const size_t block_queue_capacity)
 
void conf (size_t block_size, bool is_32b_trailers)
 
void start ()
 
void stop ()
 
void set_running (bool should_run)
 
bool queue_in_block_address (uint64_t block_addr)
 
void acquire_callback () override
 
- Public Member Functions inherited from dunedaq::flxlibs::ElinkConcept
 ElinkConcept ()
 
virtual ~ElinkConcept ()
 
 ElinkConcept (const ElinkConcept &)=delete
 ElinkConcept is not copy-constructible.
 
ElinkConceptoperator= (const ElinkConcept &)=delete
 ElinkConcept is not copy-assginable.
 
 ElinkConcept (ElinkConcept &&)=delete
 ElinkConcept is not move-constructible.
 
ElinkConceptoperator= (ElinkConcept &&)=delete
 ElinkConcept is not move-assignable.
 
DefaultParserImplget_parser ()
 
void set_sink_config (const appmodel::DataMoveCallbackConf *sink_conf)
 
void set_ids (int card, int slr, int id, int tag)
 
- Public Member Functions inherited from dunedaq::opmonlib::MonitorableObject
 MonitorableObject (const MonitorableObject &)=delete
 
MonitorableObjectoperator= (const MonitorableObject &)=delete
 
 MonitorableObject (MonitorableObject &&)=delete
 
MonitorableObjectoperator= (MonitorableObject &&)=delete
 
virtual ~MonitorableObject ()=default
 
auto get_opmon_id () const noexcept
 
auto get_opmon_level () const noexcept
 

Public Attributes

bool m_callback_is_acquired { false }
 
sink_cb_t m_sink_callback
 
- Public Attributes inherited from dunedaq::flxlibs::ElinkConcept
const appmodel::DataMoveCallbackConfm_sink_conf
 

Protected Member Functions

void generate_opmon_data () override
 
- Protected Member Functions inherited from dunedaq::opmonlib::MonitorableObject
 MonitorableObject ()=default
 
void register_node (ElementId name, NewNodePtr)
 
void publish (google::protobuf::Message &&, CustomOrigin &&co={}, OpMonLevel l=to_level(EntryOpMonLevel::kDefault)) const noexcept
 

Private Types

using UniqueBlockAddrQueue = std::unique_ptr<folly::ProducerConsumerQueue<uint64_t>>
 

Private Member Functions

void process_elink ()
 

Private Attributes

std::atomic< boolm_run_marker
 
bool m_configured { false }
 
bool m_sink_is_set { false }
 
std::shared_ptr< err_sink_tm_error_sink_queue
 
UniqueBlockAddrQueue m_block_addr_queue
 
utilities::ReusableThread m_parser_thread
 

Static Private Attributes

static const std::string m_parser_thread_name = "elinkp"
 

Additional Inherited Members

- Static Public Member Functions inherited from dunedaq::opmonlib::MonitorableObject
static bool publishable_metric (OpMonLevel entry, OpMonLevel system) noexcept
 
- Protected Attributes inherited from dunedaq::flxlibs::ElinkConcept
DefaultParserImpl m_parser_impl
 
std::unique_ptr< felix::packetformat::BlockParser< DefaultParserImpl > > m_parser
 
int m_card_id
 
int m_logical_unit
 
int m_link_id
 
int m_link_tag
 
std::string m_elink_str
 
std::string m_elink_source_tid
 
std::chrono::time_point< std::chrono::high_resolution_clock > m_t0
 

Detailed Description

template<class TargetPayloadType>
class dunedaq::flxlibs::ElinkModel< TargetPayloadType >

Definition at line 36 of file ElinkModel.hpp.

Member Typedef Documentation

◆ data_t

template<class TargetPayloadType >
using dunedaq::flxlibs::ElinkModel< TargetPayloadType >::data_t = nlohmann::json

Definition at line 41 of file ElinkModel.hpp.

◆ err_sink_t

template<class TargetPayloadType >
using dunedaq::flxlibs::ElinkModel< TargetPayloadType >::err_sink_t = iomanager::SenderConcept<felix::packetformat::chunk>

Definition at line 39 of file ElinkModel.hpp.

◆ inherited

template<class TargetPayloadType >
using dunedaq::flxlibs::ElinkModel< TargetPayloadType >::inherited = ElinkConcept

Definition at line 40 of file ElinkModel.hpp.

◆ sink_cb_t

template<class TargetPayloadType >
using dunedaq::flxlibs::ElinkModel< TargetPayloadType >::sink_cb_t = std::shared_ptr<std::function<void(TargetPayloadType&&)>>

Definition at line 129 of file ElinkModel.hpp.

◆ UniqueBlockAddrQueue

template<class TargetPayloadType >
using dunedaq::flxlibs::ElinkModel< TargetPayloadType >::UniqueBlockAddrQueue = std::unique_ptr<folly::ProducerConsumerQueue<uint64_t>>
private

Definition at line 183 of file ElinkModel.hpp.

Constructor & Destructor Documentation

◆ ElinkModel()

template<class TargetPayloadType >
dunedaq::flxlibs::ElinkModel< TargetPayloadType >::ElinkModel ( )
inline

ElinkModel Constructor.

Parameters
nameInstance name for this ElinkModel instance

Definition at line 47 of file ElinkModel.hpp.

48 : ElinkConcept()
49 , m_run_marker{ false }
51 {}
utilities::ReusableThread m_parser_thread
std::atomic< bool > m_run_marker

◆ ~ElinkModel()

template<class TargetPayloadType >
dunedaq::flxlibs::ElinkModel< TargetPayloadType >::~ElinkModel ( )
inline

Definition at line 52 of file ElinkModel.hpp.

52{}

Member Function Documentation

◆ acquire_callback()

template<class TargetPayloadType >
void dunedaq::flxlibs::ElinkModel< TargetPayloadType >::acquire_callback ( )
inlineoverridevirtual

Implements dunedaq::flxlibs::ElinkConcept.

Definition at line 115 of file ElinkModel.hpp.

116 {
118 TLOG_DEBUG(5) << "SourceModel callback is already acquired!";
119 } else {
120 // Getting DataMoveCBRegistry
122 m_sink_callback = dmcbr->get_callback<TargetPayloadType>(inherited::m_sink_conf);
124 }
125 }
static std::shared_ptr< DataMoveCallbackRegistry > get()
const appmodel::DataMoveCallbackConf * m_sink_conf
#define TLOG_DEBUG(lvl,...)
Definition Logging.hpp:112

◆ conf()

template<class TargetPayloadType >
void dunedaq::flxlibs::ElinkModel< TargetPayloadType >::conf ( size_t block_size,
bool is_32b_trailers )
inlinevirtual

Implements dunedaq::flxlibs::ElinkConcept.

Definition at line 61 of file ElinkModel.hpp.

62 {
63 if (m_configured) {
64 TLOG_DEBUG(5) << "ElinkModel is already configured!";
65 } else {
67 // if (inconsistency)
68 // ers::fatal(ElinkConfigurationInconsistency(ERS_HERE, m_num_links));
69
70 m_parser->configure(block_size, is_32b_trailers); // unsigned bsize, bool trailer_is_32bit
71 m_configured = true;
72 }
73 }
std::unique_ptr< felix::packetformat::BlockParser< DefaultParserImpl > > m_parser
void set_name(const std::string &name, int tid)

◆ generate_opmon_data()

template<class TargetPayloadType >
void dunedaq::flxlibs::ElinkModel< TargetPayloadType >::generate_opmon_data ( )
inlineoverrideprotectedvirtual

Hook for customisable pubblication. The function can throw, exception will be caught by the monitoring thread

Reimplemented from dunedaq::opmonlib::MonitorableObject.

Definition at line 133 of file ElinkModel.hpp.

133 {
134
135 opmon::CardReaderInfo info;
136 auto now = std::chrono::high_resolution_clock::now();
137 auto& stats = m_parser_impl.get_stats();
138
139 double seconds = std::chrono::duration_cast<std::chrono::microseconds>(now - m_t0).count() / 1000000.;
140
141 info.set_num_short_chunks_processed( stats.short_ctr.exchange(0) );
142 info.set_num_chunks_processed( stats.chunk_ctr.exchange(0) );
143 info.set_num_subchunks_processed(stats.subchunk_ctr.exchange(0));
144 info.set_num_blocks_processed(stats.block_ctr.exchange(0));
145
146 info.set_rate_blocks_processed(info.num_blocks_processed() / seconds / 1000. );
147 info.set_rate_chunks_processed(info.num_chunks_processed() / seconds / 1000. );
148
149 info.set_num_short_chunks_processed_with_error(stats.error_short_ctr.exchange(0));
150 info.set_num_chunks_processed_with_error(stats.error_chunk_ctr.exchange(0));
151 info.set_num_subchunks_processed_with_error(stats.error_subchunk_ctr.exchange(0));
152 info.set_num_blocks_processed_with_error(stats.error_block_ctr.exchange(0));
153 info.set_num_subchunk_crc_errors(stats.subchunk_crc_error_ctr.exchange(0));
154 info.set_num_subchunk_trunc_errors(stats.subchunk_trunc_error_ctr.exchange(0));
155 info.set_num_subchunk_errors(stats.subchunk_error_ctr.exchange(0));
156
157
158 TLOG_DEBUG(2) << inherited::m_elink_str // Move to TLVL_TAKE_NOTE from readout
159 << " Parser stats ->"
160 << " Blocks: " << info.num_blocks_processed() << " Block rate: " << info.rate_blocks_processed()
161 << " [kHz]"
162 << " Chunks: " << info.num_chunks_processed() << " Chunk rate: " << info.rate_chunks_processed()
163 << " [kHz]"
164 << " Shorts: " << info.num_short_chunks_processed() << " Subchunks:" << info.num_subchunks_processed()
165 << " Error Chunks: " << info.num_chunks_processed_with_error()
166 << " Error Shorts: " << info.num_short_chunks_processed_with_error()
167 << " Error Subchunks: " << info.num_subchunks_processed_with_error()
168 << " Error Block: " << info.num_blocks_processed_with_error();
169
170 m_t0 = now;
171
172 publish( std::move(info),
173 { { "card", std::to_string(m_card_id) },
174 { "logical_unit", std::to_string(m_logical_unit) },
175 { "link", std::to_string(m_link_id) },
176 { "tag", std::to_string(m_link_tag) } } );
177
178 }
std::chrono::time_point< std::chrono::high_resolution_clock > m_t0
void publish(google::protobuf::Message &&, CustomOrigin &&co={}, OpMonLevel l=to_level(EntryOpMonLevel::kDefault)) const noexcept
static int64_t now()

◆ get_error_sink()

template<class TargetPayloadType >
std::shared_ptr< err_sink_t > & dunedaq::flxlibs::ElinkModel< TargetPayloadType >::get_error_sink ( )
inline

Definition at line 54 of file ElinkModel.hpp.

54{ return m_error_sink_queue; }
std::shared_ptr< err_sink_t > m_error_sink_queue

◆ init()

template<class TargetPayloadType >
void dunedaq::flxlibs::ElinkModel< TargetPayloadType >::init ( const size_t block_queue_capacity)
inlinevirtual

Implements dunedaq::flxlibs::ElinkConcept.

Definition at line 56 of file ElinkModel.hpp.

57 {
58 m_block_addr_queue = std::make_unique<folly::ProducerConsumerQueue<uint64_t>>(block_queue_capacity); // NOLINT
59 }
UniqueBlockAddrQueue m_block_addr_queue

◆ process_elink()

template<class TargetPayloadType >
void dunedaq::flxlibs::ElinkModel< TargetPayloadType >::process_elink ( )
inlineprivate

Definition at line 199 of file ElinkModel.hpp.

200 {
201 while (m_run_marker.load()) {
202 uint64_t block_addr; // NOLINT
203 if (m_block_addr_queue->read(block_addr)) { // read success
204 const auto* block = const_cast<felix::packetformat::block*>(
205 felix::packetformat::block_from_bytes(reinterpret_cast<const char*>(block_addr)) // NOLINT
206 );
207 m_parser->process(block);
208 } else { // couldn't read from queue
209 std::this_thread::sleep_for(std::chrono::milliseconds(10));
210 }
211 }
212 }

◆ queue_in_block_address()

template<class TargetPayloadType >
bool dunedaq::flxlibs::ElinkModel< TargetPayloadType >::queue_in_block_address ( uint64_t block_addr)
inlinevirtual

Implements dunedaq::flxlibs::ElinkConcept.

Definition at line 106 of file ElinkModel.hpp.

107 {
108 if (m_block_addr_queue->write(block_addr)) { // ok write
109 return true;
110 } else { // failed write
111 return false;
112 }
113 }

◆ set_running()

template<class TargetPayloadType >
void dunedaq::flxlibs::ElinkModel< TargetPayloadType >::set_running ( bool should_run)
inline

Definition at line 100 of file ElinkModel.hpp.

101 {
102 bool was_running = m_run_marker.exchange(should_run);
103 TLOG_DEBUG(5) << "Active state was toggled from " << was_running << " to " << should_run;
104 }

◆ start()

template<class TargetPayloadType >
void dunedaq::flxlibs::ElinkModel< TargetPayloadType >::start ( )
inlinevirtual

Implements dunedaq::flxlibs::ElinkConcept.

Definition at line 75 of file ElinkModel.hpp.

76 {
77 m_t0 = std::chrono::high_resolution_clock::now();
78 if (!m_run_marker.load()) {
79 set_running(true);
81 TLOG() << "Started ElinkModel of link " << inherited::m_link_id << "...";
82 } else {
83 TLOG_DEBUG(5) << "ElinkModel of link " << inherited::m_link_id << " is already running!";
84 }
85 }
void set_running(bool should_run)
bool set_work(Function &&f, Args &&... args)
#define TLOG(...)
Definition macro.hpp:22

◆ stop()

template<class TargetPayloadType >
void dunedaq::flxlibs::ElinkModel< TargetPayloadType >::stop ( )
inlinevirtual

Implements dunedaq::flxlibs::ElinkConcept.

Definition at line 87 of file ElinkModel.hpp.

88 {
89 if (m_run_marker.load()) {
90 set_running(false);
92 std::this_thread::sleep_for(std::chrono::milliseconds(10));
93 }
94 TLOG_DEBUG(5) << "Stopped ElinkModel of link " << m_link_id << "!";
95 } else {
96 TLOG_DEBUG(5) << "ElinkModel of link " << m_link_id << " is already stopped!";
97 }
98 }

Member Data Documentation

◆ m_block_addr_queue

template<class TargetPayloadType >
UniqueBlockAddrQueue dunedaq::flxlibs::ElinkModel< TargetPayloadType >::m_block_addr_queue
private

Definition at line 194 of file ElinkModel.hpp.

◆ m_callback_is_acquired

template<class TargetPayloadType >
bool dunedaq::flxlibs::ElinkModel< TargetPayloadType >::m_callback_is_acquired { false }

Definition at line 128 of file ElinkModel.hpp.

128{ false };

◆ m_configured

template<class TargetPayloadType >
bool dunedaq::flxlibs::ElinkModel< TargetPayloadType >::m_configured { false }
private

Definition at line 187 of file ElinkModel.hpp.

187{ false };

◆ m_error_sink_queue

template<class TargetPayloadType >
std::shared_ptr<err_sink_t> dunedaq::flxlibs::ElinkModel< TargetPayloadType >::m_error_sink_queue
private

Definition at line 191 of file ElinkModel.hpp.

◆ m_parser_thread

template<class TargetPayloadType >
utilities::ReusableThread dunedaq::flxlibs::ElinkModel< TargetPayloadType >::m_parser_thread
private

Definition at line 198 of file ElinkModel.hpp.

◆ m_parser_thread_name

template<class TargetPayloadType >
const std::string dunedaq::flxlibs::ElinkModel< TargetPayloadType >::m_parser_thread_name = "elinkp"
inlinestaticprivate

Definition at line 197 of file ElinkModel.hpp.

◆ m_run_marker

template<class TargetPayloadType >
std::atomic<bool> dunedaq::flxlibs::ElinkModel< TargetPayloadType >::m_run_marker
private

Definition at line 186 of file ElinkModel.hpp.

◆ m_sink_callback

template<class TargetPayloadType >
sink_cb_t dunedaq::flxlibs::ElinkModel< TargetPayloadType >::m_sink_callback

Definition at line 130 of file ElinkModel.hpp.

◆ m_sink_is_set

template<class TargetPayloadType >
bool dunedaq::flxlibs::ElinkModel< TargetPayloadType >::m_sink_is_set { false }
private

Definition at line 190 of file ElinkModel.hpp.

190{ false };

The documentation for this class was generated from the following file: