Line data Source code
1 : /**
2 : * @file ElinkModel.hpp FELIX CR's ELink concept wrapper
3 : *
4 : * This is part of the DUNE DAQ , copyright 2020.
5 : * Licensing/copyright details are in the COPYING file that you should have
6 : * received with this code.
7 : */
8 : #ifndef FLXLIBS_SRC_ELINKMODEL_HPP_
9 : #define FLXLIBS_SRC_ELINKMODEL_HPP_
10 :
11 : #include "ElinkConcept.hpp"
12 :
13 : #include "flxlibs/opmon/ElinkModel.pb.h"
14 :
15 : #include "packetformat/block_format.hpp"
16 :
17 : //#include "appfwk/DAQModuleHelper.hpp"
18 : #include "iomanager/IOManager.hpp"
19 : #include "iomanager/Sender.hpp"
20 : #include "logging/Logging.hpp"
21 : #include "utilities/ReusableThread.hpp"
22 :
23 : #include <folly/ProducerConsumerQueue.h>
24 : #include <nlohmann/json.hpp>
25 :
26 : #include <atomic>
27 : #include <memory>
28 : #include <mutex>
29 : #include <string>
30 :
31 : namespace dunedaq::flxlibs {
32 :
33 : template<class TargetPayloadType>
34 : class ElinkModel : public ElinkConcept
35 : {
36 : public:
37 : using sink_t = iomanager::SenderConcept<TargetPayloadType>;
38 : using err_sink_t = iomanager::SenderConcept<felix::packetformat::chunk>;
39 : using inherited = ElinkConcept;
40 : using data_t = nlohmann::json;
41 :
42 : /**
43 : * @brief ElinkModel Constructor
44 : * @param name Instance name for this ElinkModel instance
45 : */
46 0 : ElinkModel()
47 : : ElinkConcept()
48 0 : , m_run_marker{ false }
49 0 : , m_parser_thread(0)
50 0 : {}
51 0 : ~ElinkModel() {}
52 :
53 0 : void set_sink(const std::string& sink_name) override
54 : {
55 0 : if (m_sink_is_set) {
56 0 : TLOG_DEBUG(5) << "ElinkModel sink is already set in initialized!";
57 : } else {
58 0 : m_sink_queue = get_iom_sender<TargetPayloadType>(sink_name);
59 0 : m_sink_is_set = true;
60 : }
61 0 : }
62 :
63 0 : std::shared_ptr<sink_t>& get_sink() { return m_sink_queue; }
64 :
65 : std::shared_ptr<err_sink_t>& get_error_sink() { return m_error_sink_queue; }
66 :
67 0 : void init(const size_t block_queue_capacity)
68 : {
69 0 : m_block_addr_queue = std::make_unique<folly::ProducerConsumerQueue<uint64_t>>(block_queue_capacity); // NOLINT
70 0 : }
71 :
72 0 : void conf(size_t block_size, bool is_32b_trailers)
73 : {
74 0 : if (m_configured) {
75 0 : TLOG_DEBUG(5) << "ElinkModel is already configured!";
76 : } else {
77 0 : m_parser_thread.set_name(inherited::m_elink_source_tid, inherited::m_link_tag);
78 : // if (inconsistency)
79 : // ers::fatal(ElinkConfigurationInconsistency(ERS_HERE, m_num_links));
80 :
81 0 : m_parser->configure(block_size, is_32b_trailers); // unsigned bsize, bool trailer_is_32bit
82 0 : m_configured = true;
83 : }
84 0 : }
85 :
86 0 : void start()
87 : {
88 0 : m_t0 = std::chrono::high_resolution_clock::now();
89 0 : if (!m_run_marker.load()) {
90 0 : set_running(true);
91 0 : m_parser_thread.set_work(&ElinkModel::process_elink, this);
92 0 : TLOG() << "Started ElinkModel of link " << inherited::m_link_id << "...";
93 : } else {
94 0 : TLOG_DEBUG(5) << "ElinkModel of link " << inherited::m_link_id << " is already running!";
95 : }
96 0 : }
97 :
98 0 : void stop()
99 : {
100 0 : if (m_run_marker.load()) {
101 0 : set_running(false);
102 0 : while (!m_parser_thread.get_readiness()) {
103 0 : std::this_thread::sleep_for(std::chrono::milliseconds(10));
104 : }
105 0 : TLOG_DEBUG(5) << "Stopped ElinkModel of link " << m_link_id << "!";
106 : } else {
107 0 : TLOG_DEBUG(5) << "ElinkModel of link " << m_link_id << " is already stopped!";
108 : }
109 0 : }
110 :
111 0 : void set_running(bool should_run)
112 : {
113 0 : bool was_running = m_run_marker.exchange(should_run);
114 0 : TLOG_DEBUG(5) << "Active state was toggled from " << was_running << " to " << should_run;
115 0 : }
116 :
117 0 : bool queue_in_block_address(uint64_t block_addr) // NOLINT(build/unsigned)
118 : {
119 0 : if (m_block_addr_queue->write(block_addr)) { // ok write
120 : return true;
121 : } else { // failed write
122 0 : return false;
123 : }
124 : }
125 :
126 :
127 : protected:
128 0 : void generate_opmon_data() override {
129 :
130 0 : opmon::CardReaderInfo info;
131 0 : auto now = std::chrono::high_resolution_clock::now();
132 0 : auto& stats = m_parser_impl.get_stats();
133 :
134 0 : double seconds = std::chrono::duration_cast<std::chrono::microseconds>(now - m_t0).count() / 1000000.;
135 :
136 0 : info.set_num_short_chunks_processed( stats.short_ctr.exchange(0) );
137 0 : info.set_num_chunks_processed( stats.chunk_ctr.exchange(0) );
138 0 : info.set_num_subchunks_processed(stats.subchunk_ctr.exchange(0));
139 0 : info.set_num_blocks_processed(stats.block_ctr.exchange(0));
140 :
141 0 : info.set_rate_blocks_processed(info.num_blocks_processed() / seconds / 1000. );
142 0 : info.set_rate_chunks_processed(info.num_chunks_processed() / seconds / 1000. );
143 :
144 0 : info.set_num_short_chunks_processed_with_error(stats.error_short_ctr.exchange(0));
145 0 : info.set_num_chunks_processed_with_error(stats.error_chunk_ctr.exchange(0));
146 0 : info.set_num_subchunks_processed_with_error(stats.error_subchunk_ctr.exchange(0));
147 0 : info.set_num_blocks_processed_with_error(stats.error_block_ctr.exchange(0));
148 0 : info.set_num_subchunk_crc_errors(stats.subchunk_crc_error_ctr.exchange(0));
149 0 : info.set_num_subchunk_trunc_errors(stats.subchunk_trunc_error_ctr.exchange(0));
150 0 : info.set_num_subchunk_errors(stats.subchunk_error_ctr.exchange(0));
151 :
152 :
153 0 : TLOG_DEBUG(2) << inherited::m_elink_str // Move to TLVL_TAKE_NOTE from readout
154 0 : << " Parser stats ->"
155 0 : << " Blocks: " << info.num_blocks_processed() << " Block rate: " << info.rate_blocks_processed()
156 0 : << " [kHz]"
157 0 : << " Chunks: " << info.num_chunks_processed() << " Chunk rate: " << info.rate_chunks_processed()
158 0 : << " [kHz]"
159 0 : << " Shorts: " << info.num_short_chunks_processed() << " Subchunks:" << info.num_subchunks_processed()
160 0 : << " Error Chunks: " << info.num_chunks_processed_with_error()
161 0 : << " Error Shorts: " << info.num_short_chunks_processed_with_error()
162 0 : << " Error Subchunks: " << info.num_subchunks_processed_with_error()
163 0 : << " Error Block: " << info.num_blocks_processed_with_error();
164 :
165 0 : m_t0 = now;
166 :
167 0 : publish( std::move(info),
168 : { { "card", std::to_string(m_card_id) },
169 : { "logical_unit", std::to_string(m_logical_unit) },
170 : { "link", std::to_string(m_link_id) },
171 : { "tag", std::to_string(m_link_tag) } } );
172 :
173 0 : }
174 :
175 : private:
176 : // Types
177 : using UniqueBlockAddrQueue = std::unique_ptr<folly::ProducerConsumerQueue<uint64_t>>; // NOLINT(build/unsigned)
178 :
179 : // Internals
180 : std::atomic<bool> m_run_marker;
181 : bool m_configured{ false };
182 :
183 : // Sink
184 : bool m_sink_is_set{ false };
185 : std::shared_ptr<sink_t> m_sink_queue;
186 : std::shared_ptr<err_sink_t> m_error_sink_queue;
187 :
188 : // blocks to process
189 : UniqueBlockAddrQueue m_block_addr_queue;
190 :
191 : // Processor
192 : inline static const std::string m_parser_thread_name = "elinkp";
193 : utilities::ReusableThread m_parser_thread;
194 0 : void process_elink()
195 : {
196 0 : while (m_run_marker.load()) {
197 : uint64_t block_addr; // NOLINT
198 0 : if (m_block_addr_queue->read(block_addr)) { // read success
199 : const auto* block = const_cast<felix::packetformat::block*>(
200 0 : felix::packetformat::block_from_bytes(reinterpret_cast<const char*>(block_addr)) // NOLINT
201 : );
202 0 : m_parser->process(block);
203 : } else { // couldn't read from queue
204 0 : std::this_thread::sleep_for(std::chrono::milliseconds(10));
205 : }
206 : }
207 0 : }
208 : };
209 :
210 : } // namespace dunedaq::flxlibs
211 :
212 : #endif // FLXLIBS_SRC_ELINKMODEL_HPP_
|