Line data Source code
1 : /**
2 : * @file CRTBernReaderModule.cpp
3 :
4 : * Reads data from the HW then puts it in a queue
5 : *
6 : * This is part of the DUNE DAQ Software Suite, copyright 2020.
7 : * Licensing/copyright details are in the COPYING file that you should have
8 : * received with this code.
9 : */
10 :
11 : #include "CRTBernReaderModule.hpp"
12 :
13 : #include "CreateSource.hpp"
14 :
15 : #include "crtmodules/opmon/CRTBernReaderModule.pb.h"
16 :
17 : #include "datahandlinglibs/utils/RateLimiter.hpp"
18 :
19 : #include "appmodel/DataReaderModule.hpp"
20 :
21 : #include "confmodel/QueueWithSourceId.hpp"
22 :
23 : #include "fddetdataformats/CRTBernFrame.hpp"
24 :
25 : #include "detdataformats/DetID.hpp"
26 :
27 : namespace dunedaq {
28 : namespace crtmodules{
29 :
30 : /**
31 : * @brief Maximum packet sequence ID before reset
32 : */
33 : constexpr uint64_t max_seq_id = 4095;
34 :
35 : /**
36 : * @brief Fake packet detector ID
37 : */
38 : constexpr uint8_t fake_det_id = (uint8_t)detdataformats::DetID::Subdetector::kVD_BernCRT;
39 :
40 : /**
41 : * @brief Fake packet stream ID
42 : */
43 : constexpr uint64_t fake_stream_id = 0;
44 :
45 : /**
46 : * @brief Fake packet block length
47 : */
48 : constexpr uint64_t fake_block_length = 0x382;
49 :
50 : /**
51 : * @brief Calculate the next fake sequence ID for a packet
52 : * @param seq_id Fake packet sequence ID
53 : */
54 : void
55 0 : fake_sequence_id(uint64_t& seq_id)
56 : {
57 0 : seq_id = (seq_id == max_seq_id ? 0 : seq_id+1);
58 0 : }
59 :
60 : /**
61 : * @brief Calculate the next fake timestamp for a packet
62 : * @param timestamp Fake packet timestamp
63 : */
64 : void
65 0 : fake_timestamp(uint64_t& timestamp)
66 : {
67 0 : auto time_now = std::chrono::steady_clock::now().time_since_epoch();
68 0 : uint64_t current_time = // NOLINT (build/unsigned)
69 0 : std::chrono::duration_cast<std::chrono::nanoseconds>(time_now).count();
70 0 : timestamp = current_time / 16; // 625/10000 (same as 625*us/10)
71 0 : }
72 :
73 : /**
74 : * @brief Fake ADC of the given packet
75 : * @param frame Fake packet
76 : */
77 : void
78 0 : fake_adc(fddetdataformats::CRTBernFrame& frame)
79 : {
80 0 : for (int channel = 0; channel < fddetdataformats::CRTBernFrame::s_num_channels; ++channel) {
81 0 : frame.set_adc(channel, 0);
82 : }
83 0 : }
84 :
85 : /**
86 : * @brief Create a fake packet
87 : * @param frame Fake packet
88 : * @param seq_id Fake packet sequence ID
89 : * @param timestamp Fake packet timestamp
90 : */
91 : void
92 0 : fake_data(fddetdataformats::CRTBernFrame& frame, uint64_t& seq_id, uint64_t& timestamp)
93 : {
94 0 : frame.daq_header.det_id = fake_det_id & 0x3f; //6 bits for det id
95 0 : frame.daq_header.crate_id = 1;
96 0 : frame.daq_header.slot_id = 1;
97 0 : frame.daq_header.stream_id = fake_stream_id;
98 0 : fake_sequence_id(seq_id);
99 0 : frame.daq_header.seq_id = seq_id;
100 0 : frame.daq_header.block_length = fake_block_length;
101 0 : fake_timestamp(timestamp);
102 0 : frame.daq_header.timestamp = timestamp;
103 0 : fake_adc(frame);
104 0 : }
105 :
106 0 : CRTBernReaderModule::CRTBernReaderModule(const std::string& name)
107 0 : : DAQModule(name)
108 : {
109 0 : register_command("conf", &CRTBernReaderModule::do_conf);
110 0 : register_command("start", &CRTBernReaderModule::do_start);
111 0 : register_command("stop_trigger_sources", &CRTBernReaderModule::do_stop);
112 0 : register_command("scrap", &CRTBernReaderModule::do_scrap);
113 0 : }
114 :
115 : inline void
116 : tokenize(std::string const& str, const char delim, std::vector<std::string>& out)
117 : {
118 : std::size_t start;
119 : std::size_t end = 0;
120 : while ((start = str.find_first_not_of(delim, end)) != std::string::npos) {
121 : end = str.find(delim, start);
122 : out.push_back(str.substr(start, end - start));
123 : }
124 : }
125 :
126 : void
127 0 : CRTBernReaderModule::init(const std::shared_ptr<appfwk::ConfigurationManager> mfcg)
128 : {
129 0 : auto* mdal = mfcg->get_dal<appmodel::DataReaderModule>(get_name());
130 :
131 0 : if (mdal->get_raw_data_callbacks().empty()) {
132 0 : auto err = dunedaq::datahandlinglibs::InitializationError(ERS_HERE,
133 0 : "No outputs defined for CRT Bern reader in configuration.");
134 0 : ers::fatal(err);
135 0 : throw err;
136 0 : }
137 :
138 0 : for (auto* con : mdal->get_raw_data_callbacks()) {
139 0 : m_source_id = con->get_source_id();
140 0 : auto ptr = m_sources[con->get_source_id()] = createSourceModel(con);
141 0 : register_node(con->UID(), ptr);
142 0 : }
143 0 : }
144 :
145 : void
146 0 : CRTBernReaderModule::do_conf(const CommandData_t& /*obj*/)
147 : {
148 : // Configure HW interface?
149 0 : if (!m_run_marker.load()) {
150 0 : set_running(true);
151 : } else {
152 0 : TLOG_DEBUG(5) << "Already running!";
153 : }
154 0 : }
155 :
156 : void
157 0 : CRTBernReaderModule::do_scrap(const CommandData_t& /*obj*/)
158 : {
159 0 : if (m_run_marker.load()) {
160 0 : TLOG() << "Raising stop through variables!";
161 0 : set_running(false);
162 : // if (!m_callback_mode) {
163 0 : while (!m_producer_thread.get_readiness()) {
164 0 : std::this_thread::sleep_for(std::chrono::milliseconds(10));
165 : }
166 : // }
167 : } else {
168 0 : TLOG_DEBUG(5) << "Already stopped!";
169 : }
170 0 : }
171 :
172 : void
173 0 : CRTBernReaderModule::do_start(const CommandData_t& /*startobj*/)
174 : {
175 : // Setup callbacks on all sourcemodels
176 0 : for (auto& [sourceid, source] : m_sources) {
177 0 : source->acquire_callback();
178 : }
179 :
180 0 : enable_flow();
181 :
182 0 : m_packet_count = 0;
183 :
184 0 : m_t0 = std::chrono::high_resolution_clock::now();
185 :
186 : //if (!m_callback_mode) {
187 0 : m_producer_thread.set_work(&CRTBernReaderModule::run_produce, this);
188 : //}
189 0 : }
190 :
191 : void
192 0 : CRTBernReaderModule::do_stop(const CommandData_t& /*stopobj*/)
193 : {
194 0 : disable_flow();
195 0 : }
196 :
197 : void
198 0 : CRTBernReaderModule::generate_opmon_data()
199 : {
200 0 : opmon::CRTBernReaderInfo i;
201 :
202 0 : auto now = std::chrono::high_resolution_clock::now();
203 0 : int new_packets = m_packet_count.exchange(0);
204 0 : double seconds = std::chrono::duration_cast<std::chrono::microseconds>(now - m_t0).count() / 1000000.;
205 0 : m_t0 = now;
206 :
207 0 : i.set_packet_rate_khz(new_packets / seconds / 1000.);
208 :
209 0 : publish(std::move(i));
210 0 : }
211 :
212 : void
213 0 : CRTBernReaderModule::run_produce()
214 : {
215 0 : TLOG() << "Producer thread started..."; // TODO (DTE): Debug log instead
216 :
217 0 : fddetdataformats::CRTBernFrame frame;
218 0 : uint64_t seq_id = 0;
219 0 : uint64_t timestamp = 0;
220 :
221 0 : datahandlinglibs::RateLimiter rate_limiter(m_configured_packet_rate_khz);
222 :
223 0 : while (m_run_marker.load()) {
224 0 : fake_data(frame, seq_id, timestamp); // TODO: To be filled by the CRT experts
225 :
226 0 : if (m_enable_flow.load()) [[likely]] {
227 0 : handle_eth_payload(reinterpret_cast<char*>(&frame), sizeof(frame));
228 0 : ++m_packet_count;
229 : }
230 :
231 0 : rate_limiter.limit();
232 : }
233 :
234 0 : TLOG() << "Producer thread joins... "; // TODO (DTE): Debug log instead
235 0 : }
236 :
237 : void
238 0 : CRTBernReaderModule::handle_eth_payload(char* payload, std::size_t size)
239 : {
240 : // Get DAQ Header and its StreamID
241 : //auto* daq_header = reinterpret_cast<dunedaq::detdataformats::DAQEthHeader*>(payload);
242 : //auto src_id = m_stream_id_to_source_id[src_rx_q][(unsigned)daq_header->stream_id];
243 :
244 0 : if ( auto src_it = m_sources.find(m_source_id); src_it != m_sources.end()) {
245 0 : src_it->second->handle_payload(payload, size);
246 : } else {
247 : // Really bad -> unexpeced StreamID in UDP Payload.
248 : // This check is needed in order to avoid dynamically add thousands
249 : // of Sources on the fly, in case the data corruption is extremely severe.
250 : //if (m_num_unexid_frames.count(0) == 0) {
251 : // m_num_unexid_frames[0] = 0;
252 : //}
253 : //m_num_unexid_frames[0]++;
254 : }
255 0 : }
256 :
257 : void
258 0 : CRTBernReaderModule::set_running(bool should_run)
259 : {
260 0 : bool was_running = m_run_marker.exchange(should_run);
261 0 : TLOG_DEBUG(5) << "Active state was toggled from " << was_running << " to " << should_run;
262 0 : }
263 :
264 : }
265 : }
266 :
267 0 : DEFINE_DUNE_DAQ_MODULE(dunedaq::crtmodules::CRTBernReaderModule)
|