Line data Source code
1 : /**
2 : * @file CRTGrenobleReaderModule.cpp
3 : *
4 : * Reads data from the HW then puts it in a queue
5 : *
6 : * This is part of the DUNE DAQ Software Suite, copyright 2020.
7 : * Licensing/copyright details are in the COPYING file that you should have
8 : * received with this code.
9 : */
10 :
11 : #include "CRTGrenobleReaderModule.hpp"
12 :
13 : #include "CreateSource.hpp"
14 :
15 : #include "crtmodules/opmon/CRTGrenobleReaderModule.pb.h"
16 :
17 : #include "datahandlinglibs/utils/RateLimiter.hpp"
18 :
19 : #include "appmodel/DataReaderModule.hpp"
20 :
21 : #include "confmodel/QueueWithSourceId.hpp"
22 :
23 : #include "fddetdataformats/CRTGrenobleFrame.hpp"
24 :
25 : #include "detdataformats/DetID.hpp"
26 :
27 : namespace dunedaq {
28 : namespace crtmodules{
29 :
30 : /**
31 : * @brief Maximum packet sequence ID before reset
32 : */
33 : constexpr uint64_t max_seq_id = 4095;
34 :
35 : /**
36 : * @brief Fake packet detector ID
37 : */
38 : constexpr uint8_t fake_det_id = (uint8_t)detdataformats::DetID::Subdetector::kVD_GrenobleCRT;
39 :
40 : /**
41 : * @brief Fake packet stream ID
42 : */
43 : constexpr uint64_t fake_stream_id = 0;
44 :
45 : /**
46 : * @brief Fake packet block length
47 : */
48 : constexpr uint64_t fake_block_length = 0x382;
49 :
50 : /**
51 : * @brief Calculate the next fake sequence ID for a packet
52 : * @param seq_id Fake packet sequence ID
53 : */
54 : void
55 0 : fake_sequence_id(uint64_t& seq_id)
56 : {
57 0 : seq_id = (seq_id == max_seq_id ? 0 : seq_id+1);
58 0 : }
59 :
60 : /**
61 : * @brief Calculate the next fake timestamp for a packet
62 : * @param timestamp Fake packet timestamp
63 : */
64 : void
65 0 : fake_timestamp(uint64_t& timestamp)
66 : {
67 0 : auto time_now = std::chrono::steady_clock::now().time_since_epoch();
68 0 : uint64_t current_time = // NOLINT (build/unsigned)
69 0 : std::chrono::duration_cast<std::chrono::nanoseconds>(time_now).count();
70 0 : timestamp = current_time / 16; // 625/10000 (same as 625*us/10)
71 0 : }
72 :
73 : /**
74 : * @brief Fake ADC of the given packet
75 : * @param frame Fake packet
76 : */
77 : void
78 0 : fake_adc(fddetdataformats::CRTGrenobleFrame& frame)
79 : {
80 0 : for (int channel = 0; channel < fddetdataformats::CRTGrenobleFrame::s_num_channels; ++channel) {
81 0 : frame.set_adc(channel, 0);
82 : }
83 0 : }
84 :
85 : /**
86 : * @brief Create a fake packet
87 : * @param frame Fake packet
88 : * @param seq_id Fake packet sequence ID
89 : * @param timestamp Fake packet timestamp
90 : */
91 : void
92 0 : fake_data(fddetdataformats::CRTGrenobleFrame& frame, uint64_t& seq_id, uint64_t& timestamp)
93 : {
94 0 : frame.daq_header.det_id = fake_det_id & 0x3f; //6 bits for det id
95 0 : frame.daq_header.crate_id = 1;
96 0 : frame.daq_header.slot_id = 1;
97 0 : frame.daq_header.stream_id = fake_stream_id;
98 0 : fake_sequence_id(seq_id);
99 0 : frame.daq_header.seq_id = seq_id;
100 0 : frame.daq_header.block_length = fake_block_length;
101 0 : fake_timestamp(timestamp);
102 0 : frame.daq_header.timestamp = timestamp;
103 0 : fake_adc(frame);
104 0 : }
105 :
106 0 : CRTGrenobleReaderModule::CRTGrenobleReaderModule(const std::string& name)
107 0 : : DAQModule(name)
108 : {
109 0 : register_command("conf", &CRTGrenobleReaderModule::do_conf);
110 0 : register_command("start", &CRTGrenobleReaderModule::do_start);
111 0 : register_command("stop_trigger_sources", &CRTGrenobleReaderModule::do_stop);
112 0 : register_command("scrap", &CRTGrenobleReaderModule::do_scrap);
113 0 : }
114 :
115 : inline void
116 0 : tokenize(std::string const& str, const char delim, std::vector<std::string>& out)
117 : {
118 0 : std::size_t start;
119 0 : std::size_t end = 0;
120 0 : while ((start = str.find_first_not_of(delim, end)) != std::string::npos) {
121 0 : end = str.find(delim, start);
122 0 : out.push_back(str.substr(start, end - start));
123 : }
124 0 : }
125 :
126 : void
127 0 : CRTGrenobleReaderModule::init(const std::shared_ptr<appfwk::ConfigurationManager> mfcg)
128 : {
129 0 : auto* mdal = mfcg->get_dal<appmodel::DataReaderModule>(get_name());
130 :
131 0 : if (mdal->get_outputs().empty()) {
132 0 : auto err = dunedaq::datahandlinglibs::InitializationError(ERS_HERE,
133 0 : "No outputs defined for CRT Grenoble reader in configuration.");
134 0 : ers::fatal(err);
135 0 : throw err;
136 0 : }
137 :
138 0 : for (auto* con : mdal->get_outputs()) {
139 0 : auto* queue = con->cast<confmodel::QueueWithSourceId>();
140 0 : if (queue == nullptr) {
141 0 : auto err = dunedaq::datahandlinglibs::InitializationError(ERS_HERE, "Outputs are not of type QueueWithGeoId.");
142 0 : ers::fatal(err);
143 0 : throw err;
144 0 : }
145 :
146 : // Check for CB prefix indicating Callback use
147 0 : const char delim = '_';
148 0 : const std::string target = queue->UID();
149 0 : std::vector<std::string> words;
150 0 : tokenize(target, delim, words);
151 :
152 0 : bool callback_mode = false; // TODO (DTE) : Make callback mode work?
153 0 : if (words.front() == "cb") {
154 : callback_mode = true;
155 : }
156 :
157 0 : m_source_id = queue->get_source_id();
158 0 : auto ptr = m_sources[queue->get_source_id()] = createSourceModel(queue->UID(), callback_mode);
159 0 : register_node(queue->UID(), ptr);
160 0 : }
161 0 : }
162 :
163 : void
164 0 : CRTGrenobleReaderModule::do_conf(const CommandData_t& /*obj*/)
165 : {
166 : // Configure HW interface?
167 0 : if (!m_run_marker.load()) {
168 0 : set_running(true);
169 : } else {
170 0 : TLOG_DEBUG(5) << "Already running!";
171 : }
172 0 : }
173 :
174 : void
175 0 : CRTGrenobleReaderModule::do_scrap(const CommandData_t& /*obj*/)
176 : {
177 0 : if (m_run_marker.load()) {
178 0 : TLOG() << "Raising stop through variables!";
179 0 : set_running(false);
180 : // if (!m_callback_mode) {
181 0 : while (!m_producer_thread.get_readiness()) {
182 0 : std::this_thread::sleep_for(std::chrono::milliseconds(10));
183 : }
184 : // }
185 : } else {
186 0 : TLOG_DEBUG(5) << "Already stopped!";
187 : }
188 0 : }
189 :
190 : void
191 0 : CRTGrenobleReaderModule::do_start(const CommandData_t& /*startobj*/)
192 : {
193 : // Setup callbacks on all sourcemodels
194 0 : for (auto& [sourceid, source] : m_sources) {
195 0 : source->acquire_callback();
196 : }
197 :
198 0 : m_packet_count = 0;
199 :
200 0 : m_t0 = std::chrono::high_resolution_clock::now();
201 :
202 0 : enable_flow();
203 :
204 : //if (!m_callback_mode) {
205 0 : m_producer_thread.set_work(&CRTGrenobleReaderModule::run_produce, this);
206 : //}
207 0 : }
208 :
209 : void
210 0 : CRTGrenobleReaderModule::do_stop(const CommandData_t& /*stopobj*/)
211 : {
212 0 : disable_flow();
213 0 : }
214 :
215 : void
216 0 : CRTGrenobleReaderModule::generate_opmon_data()
217 : {
218 0 : opmon::CRTGrenobleReaderInfo i;
219 :
220 0 : auto now = std::chrono::high_resolution_clock::now();
221 0 : int new_packets = m_packet_count.exchange(0);
222 0 : double seconds = std::chrono::duration_cast<std::chrono::microseconds>(now - m_t0).count() / 1000000.;
223 0 : m_t0 = now;
224 :
225 0 : i.set_packet_rate_khz(new_packets / seconds / 1000.);
226 :
227 0 : publish(std::move(i));
228 0 : }
229 :
230 : void
231 0 : CRTGrenobleReaderModule::run_produce()
232 : {
233 0 : TLOG() << "Producer thread started..."; // TODO (DTE): Debug log instead
234 :
235 0 : fddetdataformats::CRTGrenobleFrame frame;
236 0 : uint64_t seq_id = 0;
237 0 : uint64_t timestamp = 0;
238 :
239 0 : datahandlinglibs::RateLimiter rate_limiter(m_configured_packet_rate_khz);
240 :
241 0 : while (m_run_marker.load()) {
242 0 : fake_data(frame, seq_id, timestamp); // TODO: To be filled by the CRT experts
243 :
244 0 : if (m_enable_flow.load()) [[likely]] {
245 0 : handle_eth_payload(reinterpret_cast<char*>(&frame), sizeof(frame));
246 0 : ++m_packet_count;
247 : }
248 :
249 0 : rate_limiter.limit();
250 : }
251 :
252 0 : TLOG() << "Producer thread joins... "; // TODO (DTE): Debug log instead
253 0 : }
254 :
255 : void
256 0 : CRTGrenobleReaderModule::handle_eth_payload(char* payload, std::size_t size)
257 : {
258 : // Get DAQ Header and its StreamID
259 : //auto* daq_header = reinterpret_cast<dunedaq::detdataformats::DAQEthHeader*>(payload);
260 : //auto src_id = m_stream_id_to_source_id[src_rx_q][(unsigned)daq_header->stream_id];
261 :
262 0 : if ( auto src_it = m_sources.find(m_source_id); src_it != m_sources.end()) {
263 0 : src_it->second->handle_payload(payload, size);
264 : } else {
265 : // Really bad -> unexpeced StreamID in UDP Payload.
266 : // This check is needed in order to avoid dynamically add thousands
267 : // of Sources on the fly, in case the data corruption is extremely severe.
268 : //if (m_num_unexid_frames.count(0) == 0) {
269 : // m_num_unexid_frames[0] = 0;
270 : //}
271 : //m_num_unexid_frames[0]++;
272 : }
273 0 : }
274 :
275 : void
276 0 : CRTGrenobleReaderModule::set_running(bool should_run)
277 : {
278 0 : bool was_running = m_run_marker.exchange(should_run);
279 0 : TLOG_DEBUG(5) << "Active state was toggled from " << was_running << " to " << should_run;
280 0 : }
281 :
282 : }
283 : }
284 :
285 0 : DEFINE_DUNE_DAQ_MODULE(dunedaq::crtmodules::CRTGrenobleReaderModule)
|