DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
ElinkModel.hpp
Go to the documentation of this file.
1
8#ifndef FLXLIBS_SRC_ELINKMODEL_HPP_
9#define FLXLIBS_SRC_ELINKMODEL_HPP_
10
11#include "ElinkConcept.hpp"
12
14
15#include "packetformat/block_format.hpp"
16
17//#include "appfwk/DAQModuleHelper.hpp"
19#include "iomanager/Sender.hpp"
20#include "logging/Logging.hpp"
22
23#include <folly/ProducerConsumerQueue.h>
24#include <nlohmann/json.hpp>
25
26#include <atomic>
27#include <memory>
28#include <mutex>
29#include <string>
30
31namespace dunedaq::flxlibs {
32
33template<class TargetPayloadType>
35{
36public:
40 using data_t = nlohmann::json;
41
47 : ElinkConcept()
48 , m_run_marker{ false }
50 {}
52
53 void set_sink(const std::string& sink_name) override
54 {
55 if (m_sink_is_set) {
56 TLOG_DEBUG(5) << "ElinkModel sink is already set in initialized!";
57 } else {
59 m_sink_is_set = true;
60 }
61 }
62
63 std::shared_ptr<sink_t>& get_sink() { return m_sink_queue; }
64
65 std::shared_ptr<err_sink_t>& get_error_sink() { return m_error_sink_queue; }
66
67 void init(const size_t block_queue_capacity)
68 {
69 m_block_addr_queue = std::make_unique<folly::ProducerConsumerQueue<uint64_t>>(block_queue_capacity); // NOLINT
70 }
71
72 void conf(size_t block_size, bool is_32b_trailers)
73 {
74 if (m_configured) {
75 TLOG_DEBUG(5) << "ElinkModel is already configured!";
76 } else {
78 // if (inconsistency)
79 // ers::fatal(ElinkConfigurationInconsistency(ERS_HERE, m_num_links));
80
81 m_parser->configure(block_size, is_32b_trailers); // unsigned bsize, bool trailer_is_32bit
82 m_configured = true;
83 }
84 }
85
86 void start()
87 {
88 m_t0 = std::chrono::high_resolution_clock::now();
89 if (!m_run_marker.load()) {
90 set_running(true);
92 TLOG() << "Started ElinkModel of link " << inherited::m_link_id << "...";
93 } else {
94 TLOG_DEBUG(5) << "ElinkModel of link " << inherited::m_link_id << " is already running!";
95 }
96 }
97
98 void stop()
99 {
100 if (m_run_marker.load()) {
101 set_running(false);
102 while (!m_parser_thread.get_readiness()) {
103 std::this_thread::sleep_for(std::chrono::milliseconds(10));
104 }
105 TLOG_DEBUG(5) << "Stopped ElinkModel of link " << m_link_id << "!";
106 } else {
107 TLOG_DEBUG(5) << "ElinkModel of link " << m_link_id << " is already stopped!";
108 }
109 }
110
111 void set_running(bool should_run)
112 {
113 bool was_running = m_run_marker.exchange(should_run);
114 TLOG_DEBUG(5) << "Active state was toggled from " << was_running << " to " << should_run;
115 }
116
117 bool queue_in_block_address(uint64_t block_addr) // NOLINT(build/unsigned)
118 {
119 if (m_block_addr_queue->write(block_addr)) { // ok write
120 return true;
121 } else { // failed write
122 return false;
123 }
124 }
125
126
127protected:
128 void generate_opmon_data() override {
129
131 auto now = std::chrono::high_resolution_clock::now();
132 auto& stats = m_parser_impl.get_stats();
133
134 double seconds = std::chrono::duration_cast<std::chrono::microseconds>(now - m_t0).count() / 1000000.;
135
136 info.set_num_short_chunks_processed( stats.short_ctr.exchange(0) );
137 info.set_num_chunks_processed( stats.chunk_ctr.exchange(0) );
138 info.set_num_subchunks_processed(stats.subchunk_ctr.exchange(0));
139 info.set_num_blocks_processed(stats.block_ctr.exchange(0));
140
141 info.set_rate_blocks_processed(info.num_blocks_processed() / seconds / 1000. );
142 info.set_rate_chunks_processed(info.num_chunks_processed() / seconds / 1000. );
143
144 info.set_num_short_chunks_processed_with_error(stats.error_short_ctr.exchange(0));
145 info.set_num_chunks_processed_with_error(stats.error_chunk_ctr.exchange(0));
146 info.set_num_subchunks_processed_with_error(stats.error_subchunk_ctr.exchange(0));
147 info.set_num_blocks_processed_with_error(stats.error_block_ctr.exchange(0));
148 info.set_num_subchunk_crc_errors(stats.subchunk_crc_error_ctr.exchange(0));
149 info.set_num_subchunk_trunc_errors(stats.subchunk_trunc_error_ctr.exchange(0));
150 info.set_num_subchunk_errors(stats.subchunk_error_ctr.exchange(0));
151
152
153 TLOG_DEBUG(2) << inherited::m_elink_str // Move to TLVL_TAKE_NOTE from readout
154 << " Parser stats ->"
155 << " Blocks: " << info.num_blocks_processed() << " Block rate: " << info.rate_blocks_processed()
156 << " [kHz]"
157 << " Chunks: " << info.num_chunks_processed() << " Chunk rate: " << info.rate_chunks_processed()
158 << " [kHz]"
159 << " Shorts: " << info.num_short_chunks_processed() << " Subchunks:" << info.num_subchunks_processed()
160 << " Error Chunks: " << info.num_chunks_processed_with_error()
161 << " Error Shorts: " << info.num_short_chunks_processed_with_error()
162 << " Error Subchunks: " << info.num_subchunks_processed_with_error()
163 << " Error Block: " << info.num_blocks_processed_with_error();
164
165 m_t0 = now;
166
167 publish( std::move(info),
168 { { "card", std::to_string(m_card_id) },
169 { "logical_unit", std::to_string(m_logical_unit) },
170 { "link", std::to_string(m_link_id) },
171 { "tag", std::to_string(m_link_tag) } } );
172
173 }
174
175private:
176 // Types
177 using UniqueBlockAddrQueue = std::unique_ptr<folly::ProducerConsumerQueue<uint64_t>>; // NOLINT(build/unsigned)
178
179 // Internals
180 std::atomic<bool> m_run_marker;
181 bool m_configured{ false };
182
183 // Sink
184 bool m_sink_is_set{ false };
185 std::shared_ptr<sink_t> m_sink_queue;
186 std::shared_ptr<err_sink_t> m_error_sink_queue;
187
188 // blocks to process
190
191 // Processor
192 inline static const std::string m_parser_thread_name = "elinkp";
195 {
196 while (m_run_marker.load()) {
197 uint64_t block_addr; // NOLINT
198 if (m_block_addr_queue->read(block_addr)) { // read success
199 const auto* block = const_cast<felix::packetformat::block*>(
200 felix::packetformat::block_from_bytes(reinterpret_cast<const char*>(block_addr)) // NOLINT
201 );
202 m_parser->process(block);
203 } else { // couldn't read from queue
204 std::this_thread::sleep_for(std::chrono::milliseconds(10));
205 }
206 }
207 }
208};
209
210} // namespace dunedaq::flxlibs
211
212#endif // FLXLIBS_SRC_ELINKMODEL_HPP_
std::unique_ptr< felix::packetformat::BlockParser< DefaultParserImpl > > m_parser
std::chrono::time_point< std::chrono::high_resolution_clock > m_t0
utilities::ReusableThread m_parser_thread
std::shared_ptr< sink_t > m_sink_queue
UniqueBlockAddrQueue m_block_addr_queue
std::shared_ptr< sink_t > & get_sink()
static const std::string m_parser_thread_name
void conf(size_t block_size, bool is_32b_trailers)
void set_running(bool should_run)
bool queue_in_block_address(uint64_t block_addr)
ElinkModel()
ElinkModel Constructor.
void set_sink(const std::string &sink_name) override
std::atomic< bool > m_run_marker
std::shared_ptr< err_sink_t > m_error_sink_queue
void generate_opmon_data() override
std::unique_ptr< folly::ProducerConsumerQueue< uint64_t > > UniqueBlockAddrQueue
void init(const size_t block_queue_capacity)
std::shared_ptr< err_sink_t > & get_error_sink()
void publish(google::protobuf::Message &&, CustomOrigin &&co={}, OpMonLevel l=to_level(EntryOpMonLevel::kDefault)) const noexcept
bool set_work(Function &&f, Args &&... args)
void set_name(const std::string &name, int tid)
static int64_t now()
#define TLOG_DEBUG(lvl,...)
Definition Logging.hpp:112
#define TLOG(...)
Definition macro.hpp:22
static std::shared_ptr< iomanager::SenderConcept< Datatype > > get_iom_sender(iomanager::ConnectionId const &id)