DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
DAPHNEFrameProcessor.cpp
Go to the documentation of this file.
1
13
14#include "confmodel/GeoId.hpp"
15
16#include <atomic>
17#include <functional>
18#include <memory>
19#include <string>
20
23
25DUNE_DAQ_TYPESTRING(std::vector<dunedaq::trigger::TriggerPrimitiveTypeAdapter>, "TriggerPrimitiveVector")
26
27namespace dunedaq {
28namespace fdreadoutlibs {
29
30void
31DAPHNEFrameProcessor::conf(const appmodel::DataHandlerModule* conf)
32{
33 TLOG() << "Looking for TP sink...";
34
35 for (auto output : conf->get_outputs()) {
36 TLOG() << "On outputs... (" << output->UID() << "," << output->get_data_type() << ")";
37 try {
38 if (output->get_data_type() == "TriggerPrimitiveVector") {
39 TLOG() << "Found TP sink.";
40 m_tp_sink = get_iom_sender<std::vector<trigger::TriggerPrimitiveTypeAdapter>>(output->UID());
41 TLOG() << " SINK INITIALIZED for TriggerPrimitives with UID : " << output->UID();
42 }
43 } catch (const ers::Issue& excpt) {
44 ers::error(datahandlinglibs::ResourceQueueError(ERS_HERE, "tp", "DefaultRequestHandlerModel", excpt));
45 }
46 }
47
48 TLOG() << "Registering processing tasks...";
49 inherited::add_preprocess_task(std::bind(&DAPHNEFrameProcessor::timestamp_check, this, std::placeholders::_1));
50
51 auto dp = conf->get_module_configuration()->get_data_processor();
52 if (dp == nullptr) {
53 TLOG()<< " PDS Data processor does not exist.";
54 } else {
55 auto proc_conf = dp->cast<appmodel::PDSRawDataProcessor>();
56 if (proc_conf == nullptr) {
57 TLOG()<< "PDS RawDataProcessor does not exist.";
58 } else {
59 m_def_adc_intg_thresh = proc_conf-> get_default_adc_intg_thresh();
60
61 auto geo_id = conf->get_geo_id();
62 if (geo_id != nullptr) {
63 m_det_id = geo_id->get_detector_id();
64 m_crate_id = geo_id->get_crate_id();
65 m_slot_id = geo_id->get_slot_id();
66 m_stream_id = geo_id->get_stream_id();
67 }
68
69 m_channel_map = dunedaq::detchannelmaps::make_pds_map(proc_conf->get_channel_map());
70 const std::vector<unsigned int> channel_mask_vec = proc_conf->get_channel_mask();
71
72 for (int chan = 0; chan < 48; chan++) {// 40 physical PDS channel 8 not. 0->7 contain light info, 8,9, additional info. 10-17 light, 18,19 not etc...
73 trgdataformats::channel_t off_channel = m_channel_map->get_offline_channel_from_det_crate_slot_stream_chan(m_det_id, m_crate_id, m_slot_id, m_stream_id, chan);
74 if (std::find(channel_mask_vec.begin(), channel_mask_vec.end(), off_channel) != channel_mask_vec.end())
75 m_channel_mask_set.insert(off_channel);//m_channel_mask will be a vector fille with random chanel which need to be masked.
76 }
77
78 if (m_post_processing_enabled) {
79 // Extract TPs back as a pre-processing task, due to LatencyBuffer post-proc issues using SkipList.
80 inherited::add_preprocess_task(std::bind(&DAPHNEFrameProcessor::extract_tps, this, std::placeholders::_1));
81 }
82 }
83 }
84
85 TLOG() << "Calling parent conf.";
86 inherited::conf(conf);
87}
88
89void DAPHNEFrameProcessor::start(const nlohmann::json& args)
90{
91 // Reset timestamp check
92 m_previous_ts = 0;
93 m_current_ts = 0;
94 m_first_ts_missmatch = true;
95 m_ts_error_ctr = 0;
96
97 // Reset stats
98 m_t0 = std::chrono::high_resolution_clock::now();
99 m_new_hits = 0;
100 m_new_tps = 0;
101 //m_tpg_hits_count.exchange(0);
102
103 inherited::start(args);
104}
105void DAPHNEFrameProcessor::stop(const nlohmann::json& args)
106{
107 inherited::stop(args);
108}
112void
113DAPHNEFrameProcessor::timestamp_check(frameptr fp)
114{
115 // Let Source Emulator deal with this
116/*
117 // If EMU data, emulate perfectly incrementing timestamp
118 if (inherited::m_emulator_mode) { // emulate perfectly incrementing timestamp
119 // RS warning : not fixed rate!
120 if (m_first_ts_fake) {
121 fp->fake_timestamps(m_previous_ts, 16);
122 m_first_ts_fake = false;
123 } else {
124 fp->fake_timestamps(m_previous_ts + 192, 16);
125 }
126 }*/
127
128 // FIXME: This is a temporary fix to avoid frames with unphysical timestamp set to the far future to interfere with
129 // the operations of the LB.
130 // These frames are effectively "corrupted" or "invalid frames" and hould be handled as such.
131
132
133 for (size_t i=0; i<types::kDAPHNENumFrames; i++){
134 auto df_ptr = reinterpret_cast<dunedaq::fddetdataformats::DAPHNEFrame*>(fp);
135
136 if(df_ptr[i].get_timestamp() > 0xFFFFFFFFFFFF0000 || df_ptr[i].get_timestamp() < 0xFFFF){
137 ers::warning(PDSUnphysicalFrameTimestamp(ERS_HERE, df_ptr[i].get_timestamp(), df_ptr[i].get_channel(), i));
138 // Force the TS to 0
139 df_ptr[i].daq_header.timestamp_1 = df_ptr[i].daq_header.timestamp_2 = 0;
140 }
141 }
142
143 // Acquire timestamp
144 m_current_ts = fp->get_timestamp();
145 uint64_t k_clock_frequency = 62500000; // NOLINT(build/unsigned)
146 TLOG_DEBUG(TLVL_FRAME_RECEIVED) << "Received DAPHNE frame timestamp value of " << m_current_ts << " ticks (..." << std::fixed << std::setprecision(8) << (static_cast<double>(m_current_ts % (k_clock_frequency*1000)) / static_cast<double>(k_clock_frequency)) << " sec)";// NOLINT
147
148
149 if (m_ts_error_ctr > 1000) {
150 if (!m_problem_reported) {
151 std::cout << "*** Data Integrity ERROR *** Timestamp continuity is completely broken! "
152 << "Something is wrong with the FE source or with the configuration!\n";
153 m_problem_reported = true;
154 }
155 }
156
157 m_previous_ts = m_current_ts;
158 m_last_processed_daq_ts = m_current_ts;
159}
160
164void
165DAPHNEFrameProcessor::frame_error_check(frameptr /*fp*/)
166{
167 // check error fields
168}
169
170
171void DAPHNEFrameProcessor::extract_tps(constframeptr fp)
172{
173
174 // size_t nhits = 0;
175 if (!fp || fp==nullptr){
176 return;
177 }
178
179
180 auto nonconstframeptr = const_cast<frameptr>(fp);
181 auto df_ptr = reinterpret_cast<dunedaq::fddetdataformats::DAPHNEFrame*>((uint8_t*)nonconstframeptr); // NOLINT
182 std::vector<trigger::TriggerPrimitiveTypeAdapter> ttpp;
183
184 for (size_t i=0; i<types::kDAPHNENumFrames; i++)
185 {
186 for(size_t j=0; j<fddetdataformats::DAPHNEFrame::PeakDescriptorData::max_peaks;j++)
187 {
188 if(df_ptr[i].peaks_data.is_found(j))
189 {
190 int ch = m_channel_map->get_offline_channel_from_det_crate_slot_stream_chan(df_ptr[i].daq_header.det_id, df_ptr[i].daq_header.crate_id, df_ptr[i].daq_header.slot_id, df_ptr[i].daq_header.link_id, df_ptr[i].get_channel());
191 if (std::binary_search(m_channel_mask_set.begin(), m_channel_mask_set.end(), ch)) continue;
192 if (df_ptr[i].peaks_data.get_adc_integral(j) < m_def_adc_intg_thresh) continue;
193
194
195 trigger::TriggerPrimitiveTypeAdapter tpa;
196 tpa.tp = peak_to_tp(df_ptr[i],j);// this is the trigger primitive
197 //check for timestamps that are due to frame timestamps ~ ts=0, and ignore these peaks
198 if(tpa.tp.time_start > 0xFFFFFFFFFFFF0000 || tpa.tp.time_start < 0xFFFF){
199 ers::warning(PDSPeakIgnored(ERS_HERE, tpa.tp.time_start, tpa.tp.channel, i, j));
200 continue;
201 }
202
203 tpa.tp.detid = df_ptr->daq_header.det_id;
204 ttpp.push_back(tpa);
205 }
206 }
207 }
208
209 int new_tps = ttpp.size();
210 if (new_tps > 0) {
211
212 const auto s_ts_begin = ttpp.front().tp.time_start;
213 const auto channel_begin = ttpp.front().tp.channel;
214 const auto s_ts_end = ttpp.back().tp.time_start;
215 const auto channel_end = ttpp.back().tp.channel;
216
217 if (!m_tp_sink->try_send(std::move(ttpp), iomanager::Sender::s_no_block)) {
218 ers::warning(FailedToSendTPVector(ERS_HERE, s_ts_begin, channel_begin, s_ts_end, channel_end));
219 m_tps_send_failed += new_tps;
220 } else {
221 m_new_tps += new_tps;
222 m_new_hits += new_tps;
223 }
224 }
225
226 return;
227}
228
230DAPHNEFrameProcessor::peak_to_tp(dunedaq::fddetdataformats::DAPHNEFrame &frame, int i)
231{
233 // TODO: add check on peak presence
237 // FIXME : hard-coded channel map
238 // WARNING: slot ids in DAPHNEs are all 0!
239 tp.channel = m_channel_map->get_offline_channel_from_det_crate_slot_stream_chan(frame.daq_header.det_id, frame.daq_header.crate_id, frame.daq_header.slot_id, frame.daq_header.link_id, frame.get_channel());
241 tp.adc_peak = frame.peaks_data.get_adc_max(i);
243 return tp;
244}
245
246
247void
248DAPHNEFrameProcessor::generate_opmon_data() {
249
250 //right now, just fill some basic tp info...
251 if (m_post_processing_enabled) {
252 auto now = std::chrono::high_resolution_clock::now();
253 int new_hits = m_new_hits.exchange(0);
254 int new_tps = m_new_tps.exchange(0);
255 int new_tps_suppressed_too_long = 0; // not relevant for PDS TPs
256 int new_tps_send_failed = m_tps_send_failed.exchange(0);
257 double seconds = std::chrono::duration_cast<std::chrono::microseconds>(now - m_t0).count() / 1000000.;
258 TLOG_DEBUG(TLVL_BOOKKEEPING) << "Hit rate: " << std::to_string(new_hits / seconds / 1000.) << " [kHz]";
259 TLOG_DEBUG(TLVL_BOOKKEEPING) << "Total new hits: " << new_hits << " new TPs: " << new_tps;
260
261 datahandlinglibs::opmon::HitFindingInfo tp_info;
262 tp_info.set_rate_tp_hits(new_hits / seconds / 1000.);
263
264 tp_info.set_num_tps_sent(new_tps);
265 tp_info.set_num_tps_suppressed_too_long(new_tps_suppressed_too_long);
266 tp_info.set_num_tps_send_failed(new_tps_send_failed);
267
268 publish(std::move(tp_info));
269
270 m_t0 = now;
271
272 }
273
274 inherited::generate_opmon_data();
275
276}
277
278} // namespace fdreadoutlibs
279} // namespace dunedaq
#define ERS_HERE
#define DUNE_DAQ_TYPESTRING(Type, typestring)
Declare the datatype_to_string method for the given type.
detdataformats::DAQHeader daq_header
uint64_t get_timestamp() const
Get the 64-bit timestamp of the frame.
Base class for any user define issue.
Definition Issue.hpp:69
static int64_t now()
#define TLOG_DEBUG(lvl,...)
Definition Logging.hpp:112
#define TLOG(...)
Definition macro.hpp:22
constexpr detid_t INVALID_DETID
Definition Types.hpp:38
The DUNE-DAQ namespace.
Definition DataStore.hpp:57
PDS Frame with unphysical timestamp detected with ch
void warning(const Issue &issue)
Definition ers.hpp:115
void error(const Issue &issue)
Definition ers.hpp:81
uint16_t get_adc_max(int idx) const
Get the ADC Max value for a specific peak. (Word 2*idx+1, bits [31:18])
uint16_t get_sample_start(int idx) const
Get the Time_Start value for a given index (0-4).
uint16_t get_sample_max(int idx) const
Get the Time_Peak value for a specific peak. (Word 2*idx+1, bits [17:9])
uint32_t get_adc_integral(int idx) const
Get the ADC_Integral value for a specific peak. (Word 2*idx, bits [30:8])
uint16_t get_samples_over_baseline(int idx) const
Get the Time_Over_Baseline value for a specific peak. (Word 2*idx+1, bits [8:0])
A single energy deposition on a TPC or PDS channel.