DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
ElinkModel.hpp
Go to the documentation of this file.
1
8#ifndef FLXLIBS_SRC_ELINKMODEL_HPP_
9#define FLXLIBS_SRC_ELINKMODEL_HPP_
10
11#include "ElinkConcept.hpp"
12
14
15#include "packetformat/block_format.hpp"
16
17//#include "appfwk/DAQModuleHelper.hpp"
19#include "iomanager/Sender.hpp"
20#include "logging/Logging.hpp"
22
24
25#include <folly/ProducerConsumerQueue.h>
26#include <nlohmann/json.hpp>
27
28#include <atomic>
29#include <memory>
30#include <mutex>
31#include <string>
32
33namespace dunedaq::flxlibs {
34
35template<class TargetPayloadType>
37{
38public:
41 using data_t = nlohmann::json;
42
48 : ElinkConcept()
49 , m_run_marker{ false }
51 {}
53
54 std::shared_ptr<err_sink_t>& get_error_sink() { return m_error_sink_queue; }
55
56 void init(const size_t block_queue_capacity)
57 {
58 m_block_addr_queue = std::make_unique<folly::ProducerConsumerQueue<uint64_t>>(block_queue_capacity); // NOLINT
59 }
60
61 void conf(size_t block_size, bool is_32b_trailers)
62 {
63 if (m_configured) {
64 TLOG_DEBUG(5) << "ElinkModel is already configured!";
65 } else {
67 // if (inconsistency)
68 // ers::fatal(ElinkConfigurationInconsistency(ERS_HERE, m_num_links));
69
70 m_parser->configure(block_size, is_32b_trailers); // unsigned bsize, bool trailer_is_32bit
71 m_configured = true;
72 }
73 }
74
75 void start()
76 {
77 m_t0 = std::chrono::high_resolution_clock::now();
78 if (!m_run_marker.load()) {
79 set_running(true);
81 TLOG() << "Started ElinkModel of link " << inherited::m_link_id << "...";
82 } else {
83 TLOG_DEBUG(5) << "ElinkModel of link " << inherited::m_link_id << " is already running!";
84 }
85 }
86
87 void stop()
88 {
89 if (m_run_marker.load()) {
90 set_running(false);
92 std::this_thread::sleep_for(std::chrono::milliseconds(10));
93 }
94 TLOG_DEBUG(5) << "Stopped ElinkModel of link " << m_link_id << "!";
95 } else {
96 TLOG_DEBUG(5) << "ElinkModel of link " << m_link_id << " is already stopped!";
97 }
98 }
99
100 void set_running(bool should_run)
101 {
102 bool was_running = m_run_marker.exchange(should_run);
103 TLOG_DEBUG(5) << "Active state was toggled from " << was_running << " to " << should_run;
104 }
105
106 bool queue_in_block_address(uint64_t block_addr) // NOLINT(build/unsigned)
107 {
108 if (m_block_addr_queue->write(block_addr)) { // ok write
109 return true;
110 } else { // failed write
111 return false;
112 }
113 }
114
115 void acquire_callback() override
116 {
118 TLOG_DEBUG(5) << "SourceModel callback is already acquired!";
119 } else {
120 // Getting DataMoveCBRegistry
122 m_sink_callback = dmcbr->get_callback<TargetPayloadType>(inherited::m_sink_conf);
124 }
125 }
126
127 // Callbacks
129 using sink_cb_t = std::shared_ptr<std::function<void(TargetPayloadType&&)>>;
131
132protected:
133 void generate_opmon_data() override {
134
136 auto now = std::chrono::high_resolution_clock::now();
137 auto& stats = m_parser_impl.get_stats();
138
139 double seconds = std::chrono::duration_cast<std::chrono::microseconds>(now - m_t0).count() / 1000000.;
140
141 info.set_num_short_chunks_processed( stats.short_ctr.exchange(0) );
142 info.set_num_chunks_processed( stats.chunk_ctr.exchange(0) );
143 info.set_num_subchunks_processed(stats.subchunk_ctr.exchange(0));
144 info.set_num_blocks_processed(stats.block_ctr.exchange(0));
145
146 info.set_rate_blocks_processed(info.num_blocks_processed() / seconds / 1000. );
147 info.set_rate_chunks_processed(info.num_chunks_processed() / seconds / 1000. );
148
149 info.set_num_short_chunks_processed_with_error(stats.error_short_ctr.exchange(0));
150 info.set_num_chunks_processed_with_error(stats.error_chunk_ctr.exchange(0));
151 info.set_num_subchunks_processed_with_error(stats.error_subchunk_ctr.exchange(0));
152 info.set_num_blocks_processed_with_error(stats.error_block_ctr.exchange(0));
153 info.set_num_subchunk_crc_errors(stats.subchunk_crc_error_ctr.exchange(0));
154 info.set_num_subchunk_trunc_errors(stats.subchunk_trunc_error_ctr.exchange(0));
155 info.set_num_subchunk_errors(stats.subchunk_error_ctr.exchange(0));
156
157
158 TLOG_DEBUG(2) << inherited::m_elink_str // Move to TLVL_TAKE_NOTE from readout
159 << " Parser stats ->"
160 << " Blocks: " << info.num_blocks_processed() << " Block rate: " << info.rate_blocks_processed()
161 << " [kHz]"
162 << " Chunks: " << info.num_chunks_processed() << " Chunk rate: " << info.rate_chunks_processed()
163 << " [kHz]"
164 << " Shorts: " << info.num_short_chunks_processed() << " Subchunks:" << info.num_subchunks_processed()
165 << " Error Chunks: " << info.num_chunks_processed_with_error()
166 << " Error Shorts: " << info.num_short_chunks_processed_with_error()
167 << " Error Subchunks: " << info.num_subchunks_processed_with_error()
168 << " Error Block: " << info.num_blocks_processed_with_error();
169
170 m_t0 = now;
171
172 publish( std::move(info),
173 { { "card", std::to_string(m_card_id) },
174 { "logical_unit", std::to_string(m_logical_unit) },
175 { "link", std::to_string(m_link_id) },
176 { "tag", std::to_string(m_link_tag) } } );
177
178 }
179
180
181private:
182 // Types
183 using UniqueBlockAddrQueue = std::unique_ptr<folly::ProducerConsumerQueue<uint64_t>>; // NOLINT(build/unsigned)
184
185 // Internals
186 std::atomic<bool> m_run_marker;
187 bool m_configured{ false };
188
189 // Sink
190 bool m_sink_is_set{ false };
191 std::shared_ptr<err_sink_t> m_error_sink_queue;
192
193 // blocks to process
195
196 // Processor
197 inline static const std::string m_parser_thread_name = "elinkp";
200 {
201 while (m_run_marker.load()) {
202 uint64_t block_addr; // NOLINT
203 if (m_block_addr_queue->read(block_addr)) { // read success
204 const auto* block = const_cast<felix::packetformat::block*>(
205 felix::packetformat::block_from_bytes(reinterpret_cast<const char*>(block_addr)) // NOLINT
206 );
207 m_parser->process(block);
208 } else { // couldn't read from queue
209 std::this_thread::sleep_for(std::chrono::milliseconds(10));
210 }
211 }
212 }
213};
214
215} // namespace dunedaq::flxlibs
216
217#endif // FLXLIBS_SRC_ELINKMODEL_HPP_
static std::shared_ptr< DataMoveCallbackRegistry > get()
std::unique_ptr< felix::packetformat::BlockParser< DefaultParserImpl > > m_parser
std::chrono::time_point< std::chrono::high_resolution_clock > m_t0
const appmodel::DataMoveCallbackConf * m_sink_conf
utilities::ReusableThread m_parser_thread
UniqueBlockAddrQueue m_block_addr_queue
static const std::string m_parser_thread_name
void conf(size_t block_size, bool is_32b_trailers)
void set_running(bool should_run)
bool queue_in_block_address(uint64_t block_addr)
void acquire_callback() override
ElinkModel()
ElinkModel Constructor.
std::atomic< bool > m_run_marker
std::shared_ptr< std::function< void(TargetPayloadType &&)> > sink_cb_t
std::shared_ptr< err_sink_t > m_error_sink_queue
void generate_opmon_data() override
std::unique_ptr< folly::ProducerConsumerQueue< uint64_t > > UniqueBlockAddrQueue
void init(const size_t block_queue_capacity)
std::shared_ptr< err_sink_t > & get_error_sink()
void publish(google::protobuf::Message &&, CustomOrigin &&co={}, OpMonLevel l=to_level(EntryOpMonLevel::kDefault)) const noexcept
bool set_work(Function &&f, Args &&... args)
void set_name(const std::string &name, int tid)
static int64_t now()
#define TLOG_DEBUG(lvl,...)
Definition Logging.hpp:112
#define TLOG(...)
Definition macro.hpp:22