Line data Source code
1 : /**
2 : * @file DAPHNEFrameProcessor.hpp DAPHNE specific Task based raw processor
3 : * implementation
4 : *
5 : * This is part of the DUNE DAQ , copyright 2020.
6 : * Licensing/copyright details are in the COPYING file that you should have
7 : * received with this code.
8 : */
9 :
10 : #include "fddetdataformats/DAPHNEFrame.hpp"
11 : #include "trgdataformats/TriggerPrimitive.hpp"
12 : #include "fdreadoutlibs/daphne/DAPHNEFrameProcessor.hpp"
13 :
14 : #include "confmodel/GeoId.hpp"
15 :
16 : #include <atomic>
17 : #include <functional>
18 : #include <memory>
19 : #include <string>
20 :
21 : using dunedaq::datahandlinglibs::logging::TLVL_BOOKKEEPING;
22 : using dunedaq::datahandlinglibs::logging::TLVL_FRAME_RECEIVED;
23 :
24 : DUNE_DAQ_TYPESTRING(dunedaq::trigger::TriggerPrimitiveTypeAdapter, "TriggerPrimitive")
25 0 : DUNE_DAQ_TYPESTRING(std::vector<dunedaq::trigger::TriggerPrimitiveTypeAdapter>, "TriggerPrimitiveVector")
26 :
27 : namespace dunedaq {
28 : namespace fdreadoutlibs {
29 :
30 : void
31 0 : DAPHNEFrameProcessor::conf(const appmodel::DataHandlerModule* conf)
32 : {
33 0 : TLOG() << "Looking for TP sink...";
34 :
35 0 : for (auto output : conf->get_outputs()) {
36 0 : TLOG() << "On outputs... (" << output->UID() << "," << output->get_data_type() << ")";
37 0 : try {
38 0 : if (output->get_data_type() == "TriggerPrimitiveVector") {
39 0 : TLOG() << "Found TP sink.";
40 0 : m_tp_sink = get_iom_sender<std::vector<trigger::TriggerPrimitiveTypeAdapter>>(output->UID());
41 0 : TLOG() << " SINK INITIALIZED for TriggerPrimitives with UID : " << output->UID();
42 : }
43 0 : } catch (const ers::Issue& excpt) {
44 0 : ers::error(datahandlinglibs::ResourceQueueError(ERS_HERE, "tp", "DefaultRequestHandlerModel", excpt));
45 0 : }
46 : }
47 :
48 0 : TLOG() << "Registering processing tasks...";
49 0 : inherited::add_preprocess_task(std::bind(&DAPHNEFrameProcessor::timestamp_check, this, std::placeholders::_1));
50 :
51 0 : auto dp = conf->get_module_configuration()->get_data_processor();
52 0 : if (dp == nullptr) {
53 0 : TLOG()<< " PDS Data processor does not exist.";
54 : } else {
55 0 : auto proc_conf = dp->cast<appmodel::PDSRawDataProcessor>();
56 0 : if (proc_conf == nullptr) {
57 0 : TLOG()<< "PDS RawDataProcessor does not exist.";
58 : } else {
59 0 : m_def_adc_intg_thresh = proc_conf-> get_default_adc_intg_thresh();
60 :
61 0 : auto geo_id = conf->get_geo_id();
62 0 : if (geo_id != nullptr) {
63 0 : m_det_id = geo_id->get_detector_id();
64 0 : m_crate_id = geo_id->get_crate_id();
65 0 : m_slot_id = geo_id->get_slot_id();
66 0 : m_stream_id = geo_id->get_stream_id();
67 : }
68 :
69 0 : m_channel_map = dunedaq::detchannelmaps::make_pds_map(proc_conf->get_channel_map());
70 0 : const std::vector<unsigned int> channel_mask_vec = proc_conf->get_channel_mask();
71 :
72 0 : for (int chan = 0; chan < 48; chan++) {// 40 physical PDS channel 8 not. 0->7 contain light info, 8,9, additional info. 10-17 light, 18,19 not etc...
73 0 : trgdataformats::channel_t off_channel = m_channel_map->get_offline_channel_from_det_crate_slot_stream_chan(m_det_id, m_crate_id, m_slot_id, m_stream_id, chan);
74 0 : if (std::find(channel_mask_vec.begin(), channel_mask_vec.end(), off_channel) != channel_mask_vec.end())
75 0 : m_channel_mask_set.insert(off_channel);//m_channel_mask will be a vector fille with random chanel which need to be masked.
76 : }
77 :
78 0 : if (m_post_processing_enabled) {
79 : // Extract TPs back as a pre-processing task, due to LatencyBuffer post-proc issues using SkipList.
80 0 : inherited::add_preprocess_task(std::bind(&DAPHNEFrameProcessor::extract_tps, this, std::placeholders::_1));
81 : }
82 0 : }
83 : }
84 :
85 0 : TLOG() << "Calling parent conf.";
86 0 : inherited::conf(conf);
87 0 : }
88 :
89 0 : void DAPHNEFrameProcessor::start(const appfwk::DAQModule::CommandData_t& args)
90 : {
91 : // Reset timestamp check
92 0 : m_previous_ts = 0;
93 0 : m_current_ts = 0;
94 0 : m_first_ts_missmatch = true;
95 0 : m_ts_error_ctr = 0;
96 :
97 : // Reset stats
98 0 : m_t0 = std::chrono::high_resolution_clock::now();
99 0 : m_num_new_tps.exchange(0);
100 :
101 0 : inherited::start(args);
102 0 : }
103 : void
104 0 : DAPHNEFrameProcessor::stop(const appfwk::DAQModule::CommandData_t& args)
105 : {
106 0 : inherited::stop(args);
107 0 : }
108 : /**
109 : * Pipeline Stage 1.: Check proper timestamp increments in DAPHNE frame
110 : * */
111 : void
112 0 : DAPHNEFrameProcessor::timestamp_check(frameptr fp)
113 : {
114 : // Let Source Emulator deal with this
115 : /*
116 : // If EMU data, emulate perfectly incrementing timestamp
117 : if (inherited::m_emulator_mode) { // emulate perfectly incrementing timestamp
118 : // RS warning : not fixed rate!
119 : if (m_first_ts_fake) {
120 : fp->fake_timestamps(m_previous_ts, 16);
121 : m_first_ts_fake = false;
122 : } else {
123 : fp->fake_timestamps(m_previous_ts + 192, 16);
124 : }
125 : }*/
126 :
127 : // FIXME: This is a temporary fix to avoid frames with unphysical timestamp set to the far future to interfere with
128 : // the operations of the LB.
129 : // These frames are effectively "corrupted" or "invalid frames" and hould be handled as such.
130 :
131 :
132 0 : for (size_t i=0; i<types::kDAPHNENumFrames; i++){
133 0 : auto df_ptr = reinterpret_cast<dunedaq::fddetdataformats::DAPHNEFrame*>(fp);
134 :
135 0 : if(df_ptr[i].get_timestamp() > 0xFFFFFFFFFFFF0000 || df_ptr[i].get_timestamp() < 0xFFFF){
136 0 : ers::warning(PDSUnphysicalFrameTimestamp(ERS_HERE, df_ptr[i].get_timestamp(), df_ptr[i].get_channel(), i));
137 : // Force the TS to 0
138 0 : df_ptr[i].daq_header.timestamp_1 = df_ptr[i].daq_header.timestamp_2 = 0;
139 : }
140 : }
141 :
142 : // Acquire timestamp
143 0 : m_current_ts = fp->get_timestamp();
144 0 : uint64_t k_clock_frequency = 62500000; // NOLINT(build/unsigned)
145 0 : TLOG_DEBUG(TLVL_FRAME_RECEIVED) << "Received DAPHNE frame timestamp value of " << m_current_ts << " ticks (..." << std::fixed << std::setprecision(8) << (static_cast<double>(m_current_ts % (k_clock_frequency*1000)) / static_cast<double>(k_clock_frequency)) << " sec)";// NOLINT
146 :
147 :
148 0 : if (m_ts_error_ctr > 1000) {
149 0 : if (!m_problem_reported) {
150 0 : std::cout << "*** Data Integrity ERROR *** Timestamp continuity is completely broken! "
151 0 : << "Something is wrong with the FE source or with the configuration!\n";
152 0 : m_problem_reported = true;
153 : }
154 : }
155 :
156 0 : m_previous_ts = m_current_ts;
157 0 : m_last_processed_daq_ts = m_current_ts;
158 0 : }
159 :
160 : /**
161 : * Pipeline Stage 2.: Check DAPHNE headers for error flags
162 : * */
163 : void
164 0 : DAPHNEFrameProcessor::frame_error_check(frameptr /*fp*/)
165 : {
166 : // check error fields
167 0 : }
168 :
169 :
170 0 : void DAPHNEFrameProcessor::extract_tps(constframeptr fp)
171 : {
172 :
173 0 : if (!fp || fp==nullptr){
174 : return;
175 : }
176 :
177 :
178 0 : auto nonconstframeptr = const_cast<frameptr>(fp);
179 0 : auto df_ptr = reinterpret_cast<dunedaq::fddetdataformats::DAPHNEFrame*>((uint8_t*)nonconstframeptr); // NOLINT
180 0 : std::vector<trigger::TriggerPrimitiveTypeAdapter> ttpp;
181 :
182 0 : for (size_t i=0; i<types::kDAPHNENumFrames; i++)
183 : {
184 0 : for(size_t j=0; j<fddetdataformats::DAPHNEFrame::PeakDescriptorData::max_peaks;j++)
185 : {
186 0 : if(df_ptr[i].peaks_data.is_found(j))
187 : {
188 0 : int ch = m_channel_map->get_offline_channel_from_det_crate_slot_stream_chan(df_ptr[i].daq_header.det_id, df_ptr[i].daq_header.crate_id, df_ptr[i].daq_header.slot_id, df_ptr[i].daq_header.link_id, df_ptr[i].get_channel());
189 0 : if (std::binary_search(m_channel_mask_set.begin(), m_channel_mask_set.end(), ch)) continue;
190 0 : if (df_ptr[i].peaks_data.get_adc_integral(j) < m_def_adc_intg_thresh) continue;
191 :
192 :
193 0 : trigger::TriggerPrimitiveTypeAdapter tpa;
194 0 : tpa.tp = peak_to_tp(df_ptr[i],j);// this is the trigger primitive
195 : //check for timestamps that are due to frame timestamps ~ ts=0, and ignore these peaks
196 0 : if(tpa.tp.time_start > 0xFFFFFFFFFFFF0000 || tpa.tp.time_start < 0xFFFF){
197 0 : ers::warning(PDSPeakIgnored(ERS_HERE, tpa.tp.time_start, tpa.tp.channel, i, j));
198 0 : continue;
199 : }
200 :
201 0 : tpa.tp.detid = df_ptr->daq_header.det_id;
202 0 : ttpp.push_back(tpa);
203 : }
204 : }
205 : }
206 :
207 0 : int num_new_tps = ttpp.size();
208 0 : if (num_new_tps > 0) {
209 :
210 0 : const auto s_ts_begin = ttpp.front().tp.time_start;
211 0 : const auto channel_begin = ttpp.front().tp.channel;
212 0 : const auto s_ts_end = ttpp.back().tp.time_start;
213 0 : const auto channel_end = ttpp.back().tp.channel;
214 :
215 0 : if (!m_tp_sink->try_send(std::move(ttpp), iomanager::Sender::s_no_block)) {
216 0 : ers::warning(FailedToSendTPVector(ERS_HERE, s_ts_begin, channel_begin, s_ts_end, channel_end));
217 0 : m_tps_send_failed += num_new_tps;
218 : } else {
219 0 : m_num_new_tps += num_new_tps;
220 : }
221 : }
222 :
223 0 : return;
224 0 : }
225 :
226 : dunedaq::trgdataformats::TriggerPrimitive
227 0 : DAPHNEFrameProcessor::peak_to_tp(dunedaq::fddetdataformats::DAPHNEFrame &frame, int i)
228 : {
229 0 : dunedaq::trgdataformats::TriggerPrimitive tp;
230 : // TODO: add check on peak presence
231 0 : tp.time_start = frame.get_timestamp()+frame.peaks_data.get_sample_start(i);
232 0 : tp.samples_to_peak = frame.peaks_data.get_sample_max(i);
233 0 : tp.samples_over_threshold = frame.peaks_data.get_samples_over_baseline(i);
234 : // FIXME : hard-coded channel map
235 : // WARNING: slot ids in DAPHNEs are all 0!
236 0 : tp.channel = m_channel_map->get_offline_channel_from_det_crate_slot_stream_chan(frame.daq_header.det_id, frame.daq_header.crate_id, frame.daq_header.slot_id, frame.daq_header.link_id, frame.get_channel());
237 0 : tp.adc_integral = frame.peaks_data.get_adc_integral(i);
238 0 : tp.adc_peak = frame.peaks_data.get_adc_max(i);
239 0 : tp.detid = dunedaq::trgdataformats::INVALID_DETID;
240 0 : return tp;
241 : }
242 :
243 :
244 : void
245 0 : DAPHNEFrameProcessor::generate_opmon_data() {
246 :
247 : //right now, just fill some basic tp info...
248 0 : if (m_post_processing_enabled) {
249 0 : auto now = std::chrono::high_resolution_clock::now();
250 0 : int num_new_tps = m_num_new_tps.exchange(0);
251 0 : int new_tps_suppressed_too_long = 0; // not relevant for PDS TPs
252 0 : int new_tps_send_failed = m_tps_send_failed.exchange(0);
253 0 : double seconds = std::chrono::duration_cast<std::chrono::microseconds>(now - m_t0).count() / 1000000.;
254 0 : TLOG_DEBUG(TLVL_BOOKKEEPING) << "TP rate: " << std::to_string(num_new_tps / seconds / 1000.) << " [kHz]";
255 0 : TLOG_DEBUG(TLVL_BOOKKEEPING) << "Total new TPs: " << num_new_tps;
256 :
257 0 : datahandlinglibs::opmon::HitFindingInfo tp_info;
258 0 : tp_info.set_rate_tp_hits(num_new_tps / seconds / 1000.);
259 :
260 0 : tp_info.set_num_tps_sent(num_new_tps);
261 0 : tp_info.set_num_tps_suppressed_too_long(new_tps_suppressed_too_long);
262 0 : tp_info.set_num_tps_send_failed(new_tps_send_failed);
263 :
264 0 : publish(std::move(tp_info));
265 :
266 0 : m_t0 = now;
267 :
268 0 : }
269 :
270 0 : inherited::generate_opmon_data();
271 :
272 0 : }
273 :
274 : } // namespace fdreadoutlibs
275 : } // namespace dunedaq
|