DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
TDEEthFrameProcessor.cpp
Go to the documentation of this file.
1
8#include "fdreadoutlibs/tde/TDEEthFrameProcessor.hpp" // NOLINT(build/include)
9#include "confmodel/GeoId.hpp"
13
18
20
23
24// THIS SHOULDN'T BE HERE!!!!! But it is necessary.....
26DUNE_DAQ_TYPESTRING(std::vector<dunedaq::trigger::TriggerPrimitiveTypeAdapter>, "TriggerPrimitiveVector")
27
28namespace dunedaq {
29namespace fdreadoutlibs {
30
31TDEEthFrameProcessor::TDEEthFrameProcessor(std::unique_ptr<datahandlinglibs::FrameErrorRegistry>& error_registry, bool processing_enabled)
32 : TaskRawDataProcessorModel<types::TDEEthTypeAdapter>(error_registry, processing_enabled)
33{
34}
35
36void
37TDEEthFrameProcessor::start(const nlohmann::json& args)
38{
39 // Reset software TPG resources
43 }
44
45 // Reset timestamp check
46 m_previous_ts = 0;
47 m_current_ts = 0;
50 m_ts_error_state = false;
52
57
58
59 // Reset stats
60 m_t0 = std::chrono::high_resolution_clock::now();
61 m_new_hits = 0;
62 m_new_tps = 0;
63 m_tpg_hits_count.exchange(0);
64 inherited::start(args);
65}
66
67void
68TDEEthFrameProcessor::stop(const nlohmann::json& args)
69{
70 inherited::stop(args);
72 // Clears the pipelines and resets with the given configs.
74 }
75}
76
77void
79{
80 size_t idx = 0;
81 for (auto output : conf->get_outputs()) {
82 try {
83 if (output->get_data_type() == "TriggerPrimitiveVector") {
85 }
86 } catch (const ers::Issue& excpt) {
87 ers::error(datahandlinglibs::ResourceQueueError(ERS_HERE, "tp", "DefaultRequestHandlerModel", excpt));
88 }
89 }
90
91 m_sourceid.id = conf->get_source_id();
93 auto geo_id = conf->get_geo_id();
94 if (geo_id != nullptr) {
95 m_det_id = geo_id->get_detector_id();
96 m_crate_id = geo_id->get_crate_id();
97 m_slot_id = geo_id->get_slot_id();
98 m_stream_id = geo_id->get_stream_id();
99 }
100 m_emulator_mode = conf->get_emulation_mode();
101
102 // Setup pre-processing pipeline
103 if (!m_emulator_mode)
104 inherited::add_preprocess_task(std::bind(&TDEEthFrameProcessor::sequence_check, this, std::placeholders::_1));
105
106 inherited::add_preprocess_task(std::bind(&TDEEthFrameProcessor::timestamp_check, this, std::placeholders::_1));
107
108 // Check it post-processing is active
109 auto dp = conf->get_module_configuration()->get_data_processor();
110 if (dp != nullptr) {
111 auto proc_conf = dp->cast<appmodel::RawDataProcessor>();
112 if (proc_conf != nullptr && m_post_processing_enabled) {
113 m_tp_generator = std::make_unique<tpglibs::TPGenerator>();
114
115 // Set the minimum TP samples over threshold.
116 auto conf_sot_minima = proc_conf->get_sot_minima();
117 std::vector<uint16_t> sot_minima{conf_sot_minima->get_sot_minimum_plane0(),
118 conf_sot_minima->get_sot_minimum_plane1(),
119 conf_sot_minima->get_sot_minimum_plane2()};
120 m_tp_generator->set_sot_minima(sot_minima);
121
122 const std::vector<unsigned int> channel_mask_vec = proc_conf->get_channel_mask();
123
124 std::vector<const appmodel::ProcessingStep*> processing_steps = proc_conf->get_processing_steps();
125 for (auto step : processing_steps) {
126 m_tpg_configs.push_back(std::make_pair(step->class_name(), step->to_json(false).back()));
127 }
128
129 // Setup post-processing pipeline
130 m_channel_map = dunedaq::detchannelmaps::make_tpc_map(proc_conf->get_channel_map());
131 for (int chan = 0; chan < 64; chan++) {
132 trgdataformats::channel_t off_channel = m_channel_map->get_offline_channel_from_det_crate_slot_stream_chan(m_det_id, m_crate_id, m_slot_id, m_stream_id, chan);
133 int16_t plane = m_channel_map->get_plane_from_offline_channel(off_channel);
134 m_channel_plane_numbers.push_back(std::make_pair(off_channel, plane));
135
136 // This processor only needs to handle some (maybe 0) of the masked channels.
137 // Only get those relevant channels for the later check.
138 if (std::find(channel_mask_vec.begin(), channel_mask_vec.end(), off_channel) != channel_mask_vec.end())
139 m_channel_mask_set.insert(off_channel);
140 }
141
143
144 inherited::add_postprocess_task(std::bind(&TDEEthFrameProcessor::find_hits, this, std::placeholders::_1));
145 }
146 }
148}
149
150void
152{
154
155 info.set_num_seq_id_errors(m_seq_id_error_ctr.load());
156 info.set_min_seq_id_jump(m_seq_id_min_jump.exchange(0));
157 info.set_max_seq_id_jump(m_seq_id_max_jump.exchange(0));
158
159 info.set_num_ts_errors(m_ts_error_ctr.load());
160
161 publish(std::move(info));
162
163 m_error_registry->log_registered_errors();
164
166 auto now = std::chrono::high_resolution_clock::now();
167 int new_hits = m_tpg_hits_count.exchange(0);
168 int new_tps = m_new_tps.exchange(0);
169 int new_tps_suppressed_too_long = m_tps_suppressed_too_long.exchange(0);
170 int new_tps_send_failed = m_tps_send_failed.exchange(0);
171 double seconds = std::chrono::duration_cast<std::chrono::microseconds>(now - m_t0).count() / 1000000.;
172 TLOG_DEBUG(TLVL_BOOKKEEPING) << "Hit rate: " << std::to_string(new_hits / seconds / 1000.) << " [kHz]";
173 TLOG_DEBUG(TLVL_BOOKKEEPING) << "Total new hits: " << new_hits << " new TPs: " << new_tps;
174
176 tp_info.set_rate_tp_hits(new_hits / seconds / 1000.);
177
178 tp_info.set_num_tps_sent(new_tps);
179 tp_info.set_num_tps_suppressed_too_long(new_tps_suppressed_too_long);
180 tp_info.set_num_tps_send_failed(new_tps_send_failed);
181
182 publish(std::move(tp_info));
183 // Find the channels with the top TP rates
184 // Create a vector of pairs to store the map elements
185 std::vector<std::pair<uint, int>> channel_tp_rate_vec(m_tp_channel_rate_map.begin(), m_tp_channel_rate_map.end());
186 // Sort the vector in descending order of the value of the pairs
187 sort(channel_tp_rate_vec.begin(), channel_tp_rate_vec.end(), [](std::pair<uint, int>& a, std::pair<uint, int>& b) { return a.second > b.second; });
188 // Add the metrics to opmon
189 // For convenience we are selecting only the top 10 elements
190 if (channel_tp_rate_vec.size() != 0) {
191 int top_highest_values = 10;
192 if (channel_tp_rate_vec.size() < 10) {
193 top_highest_values = channel_tp_rate_vec.size();
194 }
195 //datahandlinglibs::opmon::TPChannelsInfo channels_info;
196 for (int i = 0; i < top_highest_values; i++) {
198 tpc_info.set_number_of_tps(channel_tp_rate_vec[i].second);
199 tpc_info.set_channel_id(channel_tp_rate_vec[i].first);
200 publish(std::move(tpc_info), {{"channel", std::to_string(channel_tp_rate_vec[i].first)}});
201 }
202 }
203
204 // Reset the counter in the channel rate map
205 for (auto& el : m_tp_channel_rate_map) {
206 el.second = 0;
207 }
208 m_t0 = now;
209 }
211 }
212
213
217void
219{
220 // FIXME: Make source emulator deal with this! Hard to do since source emu is templated...
221 /* If EMU data, emulate perfectly incrementing timestamp
222 if (m_emulator_mode) {
223 // uint64_t ts_next = m_previous_seq_id + 1; // NOLINT(build/unsigned)
224 auto wf = reinterpret_cast<tdeframeptr>(((uint8_t*)fp)); // NOLINT
225 for (unsigned int i = 0; i < fp->get_num_frames(); ++i) { // NOLINT(build/unsigned)
226 //auto wfh = const_cast<dunedaq::fddetdataformats::TDEEthFrame*>(wf->header());
227 wf->daq_header.crate_id = m_crate_id;
228 wf->daq_header.slot_id = m_slot_id;
229 wf->daq_header.stream_id = m_stream_id;
230 wf->daq_header.seq_id = (m_previous_seq_id+i) & 0xfff;
231 wf++;
232 }
233 }
234 */
235
236 // Acquire timestamp
237 auto wfptr = reinterpret_cast<dunedaq::fddetdataformats::TDEEthFrame*>(fp); // NOLINT
239
240 // Check sequence id
241 // Calculate the next sequence id (12 bits)
242 uint16_t expected_seq_id = (m_previous_seq_id + fp->get_num_frames()) & 0xfff;
243 int16_t delta_seq_id = m_current_seq_id-expected_seq_id;
244 if ( delta_seq_id > 0x800) {
245 delta_seq_id -= 0x1000;
246 } else if ( delta_seq_id < -0x7ff) {
247 delta_seq_id += 0x1000;
248 }
249
250 if (delta_seq_id == 0) {
251 m_seq_id_error_state = false;
252 } else {
253 // uint16_t delta_seq_id = (m_current_seq_id-expected_seq_id);
255 m_seq_id_max_jump = std::max(delta_seq_id, m_seq_id_max_jump.load());
256 m_seq_id_min_jump = std::min(delta_seq_id, m_seq_id_min_jump.load());
257
258 if (m_first_seq_id_mismatch) { // log once
259 TLOG_DEBUG(TLVL_BOOKKEEPING) << "First sequence id MISMATCH! -> | previous: " << std::to_string(m_previous_seq_id) << " current: " + std::to_string(m_current_seq_id);
261 } else {
263 m_error_registry->add_error("Sequence ID jump", datahandlinglibs::FrameErrorRegistry::ErrorInterval(expected_seq_id, m_current_seq_id));
265 }
266 }
267 }
268
269 if (m_seq_id_error_ctr > 1000) {
271 TLOG() << "*** Data Integrity ERROR *** Sequence ID continuity is completely broken! "
272 << "Something is wrong with the FE source or with the configuration!";
274 }
275 }
276
278
279}
280
284void
286{
287
288 uint16_t tdeeth_tick_difference = types::TDEEthTypeAdapter::expected_tick_difference;
289 uint16_t tdeeth_frame_tick_difference = tdeeth_tick_difference * fp->get_num_frames();
290
291 // FIXME: let source emulator deal with this!
292 /* If EMU data, emulate perfectly incrementing timestamp
293 if (inherited::m_emulator_mode) { // emulate perfectly incrementing timestamp
294 uint64_t ts_next = m_previous_ts + tdeeth_frame_tick_difference; // NOLINT(build/unsigned)
295 auto tf = reinterpret_cast<tdeframeptr>(((uint8_t*)fp)); // NOLINT
296 for (unsigned int i = 0; i < fp->get_num_frames(); ++i) { // NOLINT(build/unsigned)
297 //auto wfh = const_cast<dunedaq::fddetdataformats::TDEEthFrame*>(tf->header());
298 tf->daq_header.crate_id = m_crate_id;
299 tf->daq_header.slot_id = m_slot_id;
300 tf->daq_header.stream_id = m_stream_id;
301 tf->set_timestamp(ts_next);
302 ts_next += tdeeth_tick_difference;
303 tf++;
304 }
305 }*/
306
307 auto wfptr = reinterpret_cast<dunedaq::fddetdataformats::TDEEthFrame*>(fp); // NOLINT
308 m_current_ts = wfptr->get_timestamp();
309
310 // Check timestamp
311 if (m_previous_ts > 0 &&
312 m_current_ts - m_previous_ts != tdeeth_frame_tick_difference) [[unlikely]] {
314 if (m_first_ts_missmatch) { // log once
315 TLOG_DEBUG(TLVL_BOOKKEEPING) << "First timestamp MISMATCH! -> | previous: " << std::to_string(m_previous_ts) << " current: " + std::to_string(m_current_ts);
316 m_first_ts_missmatch = false;
317 } else {
318 if (!m_ts_error_state) {
319 m_error_registry->add_error("Timestamp jump", datahandlinglibs::FrameErrorRegistry::ErrorInterval(m_previous_ts + tdeeth_frame_tick_difference, m_current_ts));
320 m_ts_error_state = true;
321 }
322 }
323 } else {
324 m_ts_error_state = false;
325 }
326
327 if (m_ts_error_ctr > 1000) {
329 TLOG() << "*** Data Integrity ERROR *** Timestamp continuity is completely broken! "
330 << "Something is wrong with the FE source or with the configuration!";
332 }
333 }
334
337}
338
342void
344{
345 size_t nhits = 0;
346 if (!fp)
347 return;
348 auto wfptr = reinterpret_cast<dunedaq::fddetdataformats::TDEEthFrame*>((uint8_t*)fp); // NOLINT
349
350 // Check that the system is properly configured from the first hit.
351 if (m_first_hit) {
352 if (wfptr->daq_header.crate_id != m_crate_id || wfptr->daq_header.slot_id != m_slot_id || wfptr->daq_header.stream_id != m_stream_id) {
353 ers::error(LinkMisconfiguration(ERS_HERE, wfptr->daq_header.crate_id, wfptr->daq_header.slot_id, wfptr->daq_header.stream_id, m_crate_id, m_slot_id, m_stream_id));
354 }
355
356 m_first_hit = false;
357 }
358
359 std::vector<trgdataformats::TriggerPrimitive> tps = (*m_tp_generator)(wfptr);
361
362 for (const auto& tp : tps) {
363 // If this TP is on a masked channel, skip it.
364 if (std::binary_search(m_channel_mask_set.begin(), m_channel_mask_set.end(), tp.channel))
365 continue;
366 // Need to move into a type adapter.
368 tpa.tp = tp;
369
370 tpa.tp.detid = m_det_id; // Last missing piece.
371 m_tpa_vectors[m_channel_map->get_plane_from_offline_channel(tp.channel)].push_back(tpa);
372 m_tp_channel_rate_map[tp.channel]++;
373 }
374
375 if (m_frame_counter >= 100) { // FIXME: Hard-coding 100 for now. This should be defined elsewhere or configurable.
376 for (int i = 0; i < 3; i++) {
377 int new_tps = m_tpa_vectors[i].size();
378 if (new_tps == 0) {
379 continue;
380 }
381 const auto s_ts_begin = m_tpa_vectors[i].front().tp.time_start;
382 const auto channel_begin = m_tpa_vectors[i].front().tp.channel;
383 const auto s_ts_end = m_tpa_vectors[i].back().tp.time_start;
384 const auto channel_end = m_tpa_vectors[i].back().tp.channel;
385 if (!m_tp_sink[i]->try_send(std::move(m_tpa_vectors[i]), iomanager::Sender::s_no_block)) {
386 ers::warning(FailedToSendTPVector(ERS_HERE, s_ts_begin, channel_begin, s_ts_end, channel_end));
388 } else {
389 m_new_tps += new_tps;
390 nhits += new_tps;
391 }
392 }
393 m_frame_counter = 0;
394 }
395
396 m_tpg_hits_count += nhits;
397 return;
398}
399
400} // namespace fdreadoutlibs
401} // namespace dunedaq
#define ERS_HERE
#define DUNE_DAQ_TYPESTRING(Type, typestring)
Class for accessing raw WIB eth frames, as used in ProtoDUNE-II.
detdataformats::DAQEthHeader daq_header
uint64_t get_timestamp() const
Get the starting 64-bit timestamp of the frame.
std::vector< std::pair< std::string, nlohmann::json > > m_tpg_configs
void conf(const appmodel::DataHandlerModule *conf) override
Set the emulator mode, if active, timestamps of processed packets are overwritten with new ones.
std::unique_ptr< tpglibs::TPGenerator > m_tp_generator
std::vector< std::pair< trgdataformats::channel_t, int16_t > > m_channel_plane_numbers
std::shared_ptr< detchannelmaps::TPCChannelMap > m_channel_map
dunedaq::daqdataformats::timestamp_t m_current_ts
void stop(const nlohmann::json &args) override
Stop operation.
std::chrono::time_point< std::chrono::high_resolution_clock > m_t0
dunedaq::daqdataformats::timestamp_t m_previous_ts
std::map< uint, std::atomic< int > > m_tp_channel_rate_map
void start(const nlohmann::json &args) override
Start operation.
std::vector< trigger::TriggerPrimitiveTypeAdapter > m_tpa_vectors[3]
std::shared_ptr< iomanager::SenderConcept< std::vector< trigger::TriggerPrimitiveTypeAdapter > > > m_tp_sink[3]
static constexpr timeout_t s_no_block
Definition Sender.hpp:26
void publish(google::protobuf::Message &&, CustomOrigin &&co={}, OpMonLevel l=to_level(EntryOpMonLevel::kDefault)) const noexcept
Base class for any user define issue.
Definition Issue.hpp:69
static int64_t now()
#define TLOG_DEBUG(lvl,...)
Definition Logging.hpp:112
#define TLOG(...)
Definition macro.hpp:22
Including Qt Headers.
static std::shared_ptr< iomanager::SenderConcept< Datatype > > get_iom_sender(iomanager::ConnectionId const &id)
void warning(const Issue &issue)
Definition ers.hpp:115
void error(const Issue &issue)
Definition ers.hpp:81
Subsystem subsystem
The general subsystem of the source of the data.
Definition SourceID.hpp:69
ID_t id
Unique identifier of the source of the data.
Definition SourceID.hpp:74
static const constexpr uint64_t samples_tick_difference
static const constexpr daqdataformats::SourceID::Subsystem subsystem
static const constexpr uint64_t expected_tick_difference