Line data Source code
1 : /**
2 : * @file ElinkModel.hpp FELIX CR's ELink concept wrapper
3 : *
4 : * This is part of the DUNE DAQ , copyright 2020.
5 : * Licensing/copyright details are in the COPYING file that you should have
6 : * received with this code.
7 : */
8 : #ifndef FLXLIBS_SRC_ELINKMODEL_HPP_
9 : #define FLXLIBS_SRC_ELINKMODEL_HPP_
10 :
11 : #include "ElinkConcept.hpp"
12 :
13 : #include "flxlibs/opmon/ElinkModel.pb.h"
14 :
15 : #include "packetformat/block_format.hpp"
16 :
17 : //#include "appfwk/DAQModuleHelper.hpp"
18 : #include "iomanager/IOManager.hpp"
19 : #include "iomanager/Sender.hpp"
20 : #include "logging/Logging.hpp"
21 : #include "utilities/ReusableThread.hpp"
22 :
23 : #include "datahandlinglibs/DataMoveCallbackRegistry.hpp"
24 :
25 : #include <folly/ProducerConsumerQueue.h>
26 : #include <nlohmann/json.hpp>
27 :
28 : #include <atomic>
29 : #include <memory>
30 : #include <mutex>
31 : #include <string>
32 :
33 : namespace dunedaq::flxlibs {
34 :
35 : template<class TargetPayloadType>
36 : class ElinkModel : public ElinkConcept
37 : {
38 : public:
39 : using err_sink_t = iomanager::SenderConcept<felix::packetformat::chunk>;
40 : using inherited = ElinkConcept;
41 : using data_t = nlohmann::json;
42 :
43 : /**
44 : * @brief ElinkModel Constructor
45 : * @param name Instance name for this ElinkModel instance
46 : */
47 0 : ElinkModel()
48 : : ElinkConcept()
49 0 : , m_run_marker{ false }
50 0 : , m_parser_thread(0)
51 0 : {}
52 0 : ~ElinkModel() {}
53 :
54 : std::shared_ptr<err_sink_t>& get_error_sink() { return m_error_sink_queue; }
55 :
56 0 : void init(const size_t block_queue_capacity)
57 : {
58 0 : m_block_addr_queue = std::make_unique<folly::ProducerConsumerQueue<uint64_t>>(block_queue_capacity); // NOLINT
59 0 : }
60 :
61 0 : void conf(size_t block_size, bool is_32b_trailers)
62 : {
63 0 : if (m_configured) {
64 0 : TLOG_DEBUG(5) << "ElinkModel is already configured!";
65 : } else {
66 0 : m_parser_thread.set_name(inherited::m_elink_source_tid, inherited::m_link_tag);
67 : // if (inconsistency)
68 : // ers::fatal(ElinkConfigurationInconsistency(ERS_HERE, m_num_links));
69 :
70 0 : m_parser->configure(block_size, is_32b_trailers); // unsigned bsize, bool trailer_is_32bit
71 0 : m_configured = true;
72 : }
73 0 : }
74 :
75 0 : void start()
76 : {
77 0 : m_t0 = std::chrono::high_resolution_clock::now();
78 0 : if (!m_run_marker.load()) {
79 0 : set_running(true);
80 0 : m_parser_thread.set_work(&ElinkModel::process_elink, this);
81 0 : TLOG() << "Started ElinkModel of link " << inherited::m_link_id << "...";
82 : } else {
83 0 : TLOG_DEBUG(5) << "ElinkModel of link " << inherited::m_link_id << " is already running!";
84 : }
85 0 : }
86 :
87 0 : void stop()
88 : {
89 0 : if (m_run_marker.load()) {
90 0 : set_running(false);
91 0 : while (!m_parser_thread.get_readiness()) {
92 0 : std::this_thread::sleep_for(std::chrono::milliseconds(10));
93 : }
94 0 : TLOG_DEBUG(5) << "Stopped ElinkModel of link " << m_link_id << "!";
95 : } else {
96 0 : TLOG_DEBUG(5) << "ElinkModel of link " << m_link_id << " is already stopped!";
97 : }
98 0 : }
99 :
100 0 : void set_running(bool should_run)
101 : {
102 0 : bool was_running = m_run_marker.exchange(should_run);
103 0 : TLOG_DEBUG(5) << "Active state was toggled from " << was_running << " to " << should_run;
104 0 : }
105 :
106 0 : bool queue_in_block_address(uint64_t block_addr) // NOLINT(build/unsigned)
107 : {
108 0 : if (m_block_addr_queue->write(block_addr)) { // ok write
109 : return true;
110 : } else { // failed write
111 : return false;
112 : }
113 : }
114 :
115 0 : void acquire_callback() override
116 : {
117 0 : if (m_callback_is_acquired) {
118 0 : TLOG_DEBUG(5) << "SourceModel callback is already acquired!";
119 : } else {
120 : // Getting DataMoveCBRegistry
121 0 : auto dmcbr = datahandlinglibs::DataMoveCallbackRegistry::get();
122 0 : m_sink_callback = dmcbr->get_callback<TargetPayloadType>(inherited::m_sink_conf);
123 0 : m_callback_is_acquired = true;
124 0 : }
125 0 : }
126 :
127 : // Callbacks
128 : bool m_callback_is_acquired{ false };
129 : using sink_cb_t = std::shared_ptr<std::function<void(TargetPayloadType&&)>>;
130 : sink_cb_t m_sink_callback;
131 :
132 : protected:
133 0 : void generate_opmon_data() override {
134 :
135 0 : opmon::CardReaderInfo info;
136 0 : auto now = std::chrono::high_resolution_clock::now();
137 0 : auto& stats = m_parser_impl.get_stats();
138 :
139 0 : double seconds = std::chrono::duration_cast<std::chrono::microseconds>(now - m_t0).count() / 1000000.;
140 :
141 0 : info.set_num_short_chunks_processed( stats.short_ctr.exchange(0) );
142 0 : info.set_num_chunks_processed( stats.chunk_ctr.exchange(0) );
143 0 : info.set_num_subchunks_processed(stats.subchunk_ctr.exchange(0));
144 0 : info.set_num_blocks_processed(stats.block_ctr.exchange(0));
145 :
146 0 : info.set_rate_blocks_processed(info.num_blocks_processed() / seconds / 1000. );
147 0 : info.set_rate_chunks_processed(info.num_chunks_processed() / seconds / 1000. );
148 :
149 0 : info.set_num_short_chunks_processed_with_error(stats.error_short_ctr.exchange(0));
150 0 : info.set_num_chunks_processed_with_error(stats.error_chunk_ctr.exchange(0));
151 0 : info.set_num_subchunks_processed_with_error(stats.error_subchunk_ctr.exchange(0));
152 0 : info.set_num_blocks_processed_with_error(stats.error_block_ctr.exchange(0));
153 0 : info.set_num_subchunk_crc_errors(stats.subchunk_crc_error_ctr.exchange(0));
154 0 : info.set_num_subchunk_trunc_errors(stats.subchunk_trunc_error_ctr.exchange(0));
155 0 : info.set_num_subchunk_errors(stats.subchunk_error_ctr.exchange(0));
156 :
157 :
158 0 : TLOG_DEBUG(2) << inherited::m_elink_str // Move to TLVL_TAKE_NOTE from readout
159 0 : << " Parser stats ->"
160 0 : << " Blocks: " << info.num_blocks_processed() << " Block rate: " << info.rate_blocks_processed()
161 0 : << " [kHz]"
162 0 : << " Chunks: " << info.num_chunks_processed() << " Chunk rate: " << info.rate_chunks_processed()
163 0 : << " [kHz]"
164 0 : << " Shorts: " << info.num_short_chunks_processed() << " Subchunks:" << info.num_subchunks_processed()
165 0 : << " Error Chunks: " << info.num_chunks_processed_with_error()
166 0 : << " Error Shorts: " << info.num_short_chunks_processed_with_error()
167 0 : << " Error Subchunks: " << info.num_subchunks_processed_with_error()
168 0 : << " Error Block: " << info.num_blocks_processed_with_error();
169 :
170 0 : m_t0 = now;
171 :
172 0 : publish( std::move(info),
173 0 : { { "card", std::to_string(m_card_id) },
174 0 : { "logical_unit", std::to_string(m_logical_unit) },
175 0 : { "link", std::to_string(m_link_id) },
176 0 : { "tag", std::to_string(m_link_tag) } } );
177 :
178 0 : }
179 :
180 :
181 : private:
182 : // Types
183 : using UniqueBlockAddrQueue = std::unique_ptr<folly::ProducerConsumerQueue<uint64_t>>; // NOLINT(build/unsigned)
184 :
185 : // Internals
186 : std::atomic<bool> m_run_marker;
187 : bool m_configured{ false };
188 :
189 : // Sink
190 : bool m_sink_is_set{ false };
191 : std::shared_ptr<err_sink_t> m_error_sink_queue;
192 :
193 : // blocks to process
194 : UniqueBlockAddrQueue m_block_addr_queue;
195 :
196 : // Processor
197 : inline static const std::string m_parser_thread_name = "elinkp";
198 : utilities::ReusableThread m_parser_thread;
199 0 : void process_elink()
200 : {
201 0 : while (m_run_marker.load()) {
202 : uint64_t block_addr; // NOLINT
203 0 : if (m_block_addr_queue->read(block_addr)) { // read success
204 : const auto* block = const_cast<felix::packetformat::block*>(
205 0 : felix::packetformat::block_from_bytes(reinterpret_cast<const char*>(block_addr)) // NOLINT
206 : );
207 0 : m_parser->process(block);
208 : } else { // couldn't read from queue
209 0 : std::this_thread::sleep_for(std::chrono::milliseconds(10));
210 : }
211 : }
212 0 : }
213 : };
214 :
215 : } // namespace dunedaq::flxlibs
216 :
217 : #endif // FLXLIBS_SRC_ELINKMODEL_HPP_
|