DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
DAPHNEFrameProcessor.cpp
Go to the documentation of this file.
1
12
13#include <atomic>
14#include <functional>
15#include <memory>
16#include <string>
17
20
22DUNE_DAQ_TYPESTRING(std::vector<dunedaq::trigger::TriggerPrimitiveTypeAdapter>, "TriggerPrimitiveVector")
23
24namespace dunedaq {
25namespace fdreadoutlibs {
26
27void
28DAPHNEFrameProcessor::conf(const appmodel::DataHandlerModule* conf)
29{
30 TLOG() << "Looking for TP sink...";
31
32 for (auto output : conf->get_outputs()) {
33 TLOG() << "On outputs... (" << output->UID() << "," << output->get_data_type() << ")";
34 try {
35 if (output->get_data_type() == "TriggerPrimitiveVector") {
36 TLOG() << "Found TP sink.";
37 m_tp_sink = get_iom_sender<std::vector<trigger::TriggerPrimitiveTypeAdapter>>(output->UID());
38 TLOG() << " SINK INITIALIZED for TriggerPrimitives with UID : " << output->UID();
39 }
40 } catch (const ers::Issue& excpt) {
41 ers::error(datahandlinglibs::ResourceQueueError(ERS_HERE, "tp", "DefaultRequestHandlerModel", excpt));
42 }
43 }
44
45 TLOG() << "Registering processing tasks...";
46 inherited::add_preprocess_task(std::bind(&DAPHNEFrameProcessor::timestamp_check, this, std::placeholders::_1));
47
48 if (m_post_processing_enabled) {
49 // Extract TPs back as a pre-processing task, due to LatencyBuffer post-proc issues using SkipList.
50 inherited::add_preprocess_task(std::bind(&DAPHNEFrameProcessor::extract_tps, this, std::placeholders::_1));
51 }
52
53 TLOG() << "Calling parent conf.";
54 inherited::conf(conf);
55}
56
57void DAPHNEFrameProcessor::start(const nlohmann::json& args)
58{
59 // Reset timestamp check
60 m_previous_ts = 0;
61 m_current_ts = 0;
62 m_first_ts_missmatch = true;
63 m_ts_error_ctr = 0;
64
65 // Reset stats
66 m_t0 = std::chrono::high_resolution_clock::now();
67 m_new_hits = 0;
68 m_new_tps = 0;
69 //m_tpg_hits_count.exchange(0);
70
71 inherited::start(args);
72}
73void DAPHNEFrameProcessor::stop(const nlohmann::json& args)
74{
75 inherited::stop(args);
76}
80void
81DAPHNEFrameProcessor::timestamp_check(frameptr fp)
82{
83 // Let Source Emulator deal with this
84/*
85 // If EMU data, emulate perfectly incrementing timestamp
86 if (inherited::m_emulator_mode) { // emulate perfectly incrementing timestamp
87 // RS warning : not fixed rate!
88 if (m_first_ts_fake) {
89 fp->fake_timestamps(m_previous_ts, 16);
90 m_first_ts_fake = false;
91 } else {
92 fp->fake_timestamps(m_previous_ts + 192, 16);
93 }
94 }*/
95
96 // FIXME: This is a temporary fix to avoid frames with unphysical timestamp set to the far future to interfere with
97 // the operations of the LB.
98 // These frames are effectively "corrupted" or "invalid frames" and hould be handled as such.
99
100
101 for (size_t i=0; i<types::kDAPHNENumFrames; i++){
102 auto df_ptr = reinterpret_cast<dunedaq::fddetdataformats::DAPHNEFrame*>(fp);
103
104 if(df_ptr[i].get_timestamp() > 0xFFFFFFFFFFFF0000 || df_ptr[i].get_timestamp() < 0xFFFF){
105 ers::warning(PDSUnphysicalFrameTimestamp(ERS_HERE, df_ptr[i].get_timestamp(), df_ptr[i].get_channel(), i));
106 // Force the TS to 0
107 df_ptr[i].daq_header.timestamp_1 = df_ptr[i].daq_header.timestamp_2 = 0;
108 }
109 }
110
111 // Acquire timestamp
112 m_current_ts = fp->get_timestamp();
113 uint64_t k_clock_frequency = 62500000; // NOLINT(build/unsigned)
114 TLOG_DEBUG(TLVL_FRAME_RECEIVED) << "Received DAPHNE frame timestamp value of " << m_current_ts << " ticks (..." << std::fixed << std::setprecision(8) << (static_cast<double>(m_current_ts % (k_clock_frequency*1000)) / static_cast<double>(k_clock_frequency)) << " sec)";// NOLINT
115
116 // Check timestamp
117 // RS warning : not fixed rate!
118 // if (m_current_ts - m_previous_ts != ???) {
119 // ++m_ts_error_ctr;
120 //}
121
122
123
124 if (m_ts_error_ctr > 1000) {
125 if (!m_problem_reported) {
126 std::cout << "*** Data Integrity ERROR *** Timestamp continuity is completely broken! "
127 << "Something is wrong with the FE source or with the configuration!\n";
128 m_problem_reported = true;
129 }
130 }
131
132 m_previous_ts = m_current_ts;
133 m_last_processed_daq_ts = m_current_ts;
134}
135
139void
140DAPHNEFrameProcessor::frame_error_check(frameptr /*fp*/)
141{
142 // check error fields
143}
144
145
146void DAPHNEFrameProcessor::extract_tps(constframeptr fp)
147{
148
149// size_t nhits = 0;
150 if (!fp || fp==nullptr)
151 return;
152
153 auto nonconstframeptr = const_cast<frameptr>(fp);
154 auto df_ptr = reinterpret_cast<dunedaq::fddetdataformats::DAPHNEFrame*>((uint8_t*)nonconstframeptr); // NOLINT
155
156 std::vector<trigger::TriggerPrimitiveTypeAdapter> ttpp;
157 for (size_t i=0; i<types::kDAPHNENumFrames; i++)
158 {
159 for(size_t j=0; j<fddetdataformats::DAPHNEFrame::PeakDescriptorData::max_peaks;j++)
160 {
161 if(df_ptr[i].peaks_data.is_found(j))
162 {
163 trigger::TriggerPrimitiveTypeAdapter tpa;
164 tpa.tp = peak_to_tp(df_ptr[i],j);
165
166 //check for timestamps that are due to frame timestamps ~ ts=0, and ignore these peaks
167 if(tpa.tp.time_start > 0xFFFFFFFFFFFF0000 || tpa.tp.time_start < 0xFFFF){
168 ers::warning(PDSPeakIgnored(ERS_HERE, tpa.tp.time_start, tpa.tp.channel, i, j));
169 continue;
170 }
171
172 tpa.tp.detid = df_ptr->daq_header.det_id;
173 ttpp.push_back(tpa);
174 }
175 }
176 }
177
178 int new_tps = ttpp.size();
179 if (new_tps > 0) {
180
181 const auto s_ts_begin = ttpp.front().tp.time_start;
182 const auto channel_begin = ttpp.front().tp.channel;
183 const auto s_ts_end = ttpp.back().tp.time_start;
184 const auto channel_end = ttpp.back().tp.channel;
185
186 if (!m_tp_sink->try_send(std::move(ttpp), iomanager::Sender::s_no_block)) {
187 ers::warning(FailedToSendTPVector(ERS_HERE, s_ts_begin, channel_begin, s_ts_end, channel_end));
188 m_tps_send_failed += new_tps;
189 } else {
190 m_new_tps += new_tps;
191 m_new_hits += new_tps;
192 }
193 }
194
195 return;
196}
197
199DAPHNEFrameProcessor::peak_to_tp(dunedaq::fddetdataformats::DAPHNEFrame &frame, int i)
200{
202 // TODO: add check on peak presence
206 // FIXME : hard-coded channel map
207 // WARNING: slot ids in DAPHNEs are all 0!
208 tp.channel = frame.daq_header.slot_id*100+frame.get_channel();
210 tp.adc_peak = frame.peaks_data.get_adc_max(i);
212 return tp;
213}
214
215
216void
217DAPHNEFrameProcessor::generate_opmon_data() {
218
219 //right now, just fill some basic tp info...
220 if (m_post_processing_enabled) {
221 auto now = std::chrono::high_resolution_clock::now();
222 int new_hits = m_new_hits.exchange(0);
223 int new_tps = m_new_tps.exchange(0);
224 int new_tps_suppressed_too_long = 0; // not relevant for PDS TPs
225 int new_tps_send_failed = m_tps_send_failed.exchange(0);
226 double seconds = std::chrono::duration_cast<std::chrono::microseconds>(now - m_t0).count() / 1000000.;
227 TLOG_DEBUG(TLVL_BOOKKEEPING) << "Hit rate: " << std::to_string(new_hits / seconds / 1000.) << " [kHz]";
228 TLOG_DEBUG(TLVL_BOOKKEEPING) << "Total new hits: " << new_hits << " new TPs: " << new_tps;
229
230 datahandlinglibs::opmon::HitFindingInfo tp_info;
231 tp_info.set_rate_tp_hits(new_hits / seconds / 1000.);
232
233 tp_info.set_num_tps_sent(new_tps);
234 tp_info.set_num_tps_suppressed_too_long(new_tps_suppressed_too_long);
235 tp_info.set_num_tps_send_failed(new_tps_send_failed);
236
237 publish(std::move(tp_info));
238
239 m_t0 = now;
240
241 }
242
243 inherited::generate_opmon_data();
244
245}
246
247} // namespace fdreadoutlibs
248} // namespace dunedaq
#define ERS_HERE
#define DUNE_DAQ_TYPESTRING(Type, typestring)
detdataformats::DAQHeader daq_header
uint64_t get_timestamp() const
Get the 64-bit timestamp of the frame.
Base class for any user define issue.
Definition Issue.hpp:69
static int64_t now()
#define TLOG_DEBUG(lvl,...)
Definition Logging.hpp:112
#define TLOG(...)
Definition macro.hpp:22
constexpr detid_t INVALID_DETID
Definition Types.hpp:38
Including Qt Headers.
void warning(const Issue &issue)
Definition ers.hpp:115
void error(const Issue &issue)
Definition ers.hpp:81
uint16_t get_adc_max(int idx) const
Get the ADC Max value for a specific peak. (Word 2*idx+1, bits [31:18])
uint16_t get_sample_start(int idx) const
Get the Time_Start value for a given index (0-4).
uint16_t get_sample_max(int idx) const
Get the Time_Peak value for a specific peak. (Word 2*idx+1, bits [17:9])
uint32_t get_adc_integral(int idx) const
Get the ADC_Integral value for a specific peak. (Word 2*idx, bits [30:8])
uint16_t get_samples_over_baseline(int idx) const
Get the Time_Over_Baseline value for a specific peak. (Word 2*idx+1, bits [8:0])
A single energy deposition on a TPC or PDS channel.