28namespace fdreadoutlibs {
31DAPHNEFrameProcessor::conf(
const appmodel::DataHandlerModule* conf)
33 TLOG() <<
"Looking for TP sink...";
35 for (
auto output : conf->get_outputs()) {
36 TLOG() <<
"On outputs... (" << output->UID() <<
"," << output->get_data_type() <<
")";
38 if (output->get_data_type() ==
"TriggerPrimitiveVector") {
39 TLOG() <<
"Found TP sink.";
40 m_tp_sink = get_iom_sender<std::vector<trigger::TriggerPrimitiveTypeAdapter>>(output->UID());
41 TLOG() <<
" SINK INITIALIZED for TriggerPrimitives with UID : " << output->UID();
44 ers::error(datahandlinglibs::ResourceQueueError(
ERS_HERE,
"tp",
"DefaultRequestHandlerModel", excpt));
48 TLOG() <<
"Registering processing tasks...";
49 inherited::add_preprocess_task(std::bind(&DAPHNEFrameProcessor::timestamp_check,
this, std::placeholders::_1));
51 auto dp = conf->get_module_configuration()->get_data_processor();
53 TLOG()<<
" PDS Data processor does not exist.";
55 auto proc_conf = dp->cast<appmodel::PDSRawDataProcessor>();
56 if (proc_conf ==
nullptr) {
57 TLOG()<<
"PDS RawDataProcessor does not exist.";
59 m_def_adc_intg_thresh = proc_conf-> get_default_adc_intg_thresh();
61 auto geo_id = conf->get_geo_id();
62 if (geo_id !=
nullptr) {
63 m_det_id = geo_id->get_detector_id();
64 m_crate_id = geo_id->get_crate_id();
65 m_slot_id = geo_id->get_slot_id();
66 m_stream_id = geo_id->get_stream_id();
69 m_channel_map = dunedaq::detchannelmaps::make_pds_map(proc_conf->get_channel_map());
70 const std::vector<unsigned int> channel_mask_vec = proc_conf->get_channel_mask();
72 for (
int chan = 0; chan < 48; chan++) {
73 trgdataformats::channel_t off_channel = m_channel_map->get_offline_channel_from_det_crate_slot_stream_chan(m_det_id, m_crate_id, m_slot_id, m_stream_id, chan);
74 if (std::find(channel_mask_vec.begin(), channel_mask_vec.end(), off_channel) != channel_mask_vec.end())
75 m_channel_mask_set.insert(off_channel);
78 if (m_post_processing_enabled) {
80 inherited::add_preprocess_task(std::bind(&DAPHNEFrameProcessor::extract_tps,
this, std::placeholders::_1));
85 TLOG() <<
"Calling parent conf.";
86 inherited::conf(conf);
89void DAPHNEFrameProcessor::start(
const nlohmann::json& args)
94 m_first_ts_missmatch =
true;
98 m_t0 = std::chrono::high_resolution_clock::now();
103 inherited::start(args);
105void DAPHNEFrameProcessor::stop(
const nlohmann::json& args)
107 inherited::stop(args);
113DAPHNEFrameProcessor::timestamp_check(frameptr fp)
133 for (
size_t i=0; i<types::kDAPHNENumFrames; i++){
136 if(df_ptr[i].get_timestamp() > 0xFFFFFFFFFFFF0000 || df_ptr[i].get_timestamp() < 0xFFFF){
139 df_ptr[i].daq_header.timestamp_1 = df_ptr[i].daq_header.timestamp_2 = 0;
144 m_current_ts = fp->get_timestamp();
145 uint64_t k_clock_frequency = 62500000;
146 TLOG_DEBUG(TLVL_FRAME_RECEIVED) <<
"Received DAPHNE frame timestamp value of " << m_current_ts <<
" ticks (..." << std::fixed << std::setprecision(8) << (static_cast<double>(m_current_ts % (k_clock_frequency*1000)) /
static_cast<double>(k_clock_frequency)) <<
" sec)";
149 if (m_ts_error_ctr > 1000) {
150 if (!m_problem_reported) {
151 std::cout <<
"*** Data Integrity ERROR *** Timestamp continuity is completely broken! "
152 <<
"Something is wrong with the FE source or with the configuration!\n";
153 m_problem_reported =
true;
157 m_previous_ts = m_current_ts;
158 m_last_processed_daq_ts = m_current_ts;
165DAPHNEFrameProcessor::frame_error_check(frameptr )
171void DAPHNEFrameProcessor::extract_tps(constframeptr fp)
175 if (!fp || fp==
nullptr){
180 auto nonconstframeptr =
const_cast<frameptr
>(fp);
182 std::vector<trigger::TriggerPrimitiveTypeAdapter> ttpp;
184 for (
size_t i=0; i<types::kDAPHNENumFrames; i++)
186 for(
size_t j=0; j<fddetdataformats::DAPHNEFrame::PeakDescriptorData::max_peaks;j++)
188 if(df_ptr[i].peaks_data.is_found(j))
190 int ch = m_channel_map->get_offline_channel_from_det_crate_slot_stream_chan(df_ptr[i].daq_header.det_id, df_ptr[i].daq_header.crate_id, df_ptr[i].daq_header.slot_id, df_ptr[i].daq_header.link_id, df_ptr[i].get_channel());
191 if (std::binary_search(m_channel_mask_set.begin(), m_channel_mask_set.end(), ch))
continue;
192 if (df_ptr[i].peaks_data.get_adc_integral(j) < m_def_adc_intg_thresh)
continue;
195 trigger::TriggerPrimitiveTypeAdapter tpa;
196 tpa.tp = peak_to_tp(df_ptr[i],j);
198 if(tpa.tp.time_start > 0xFFFFFFFFFFFF0000 || tpa.tp.time_start < 0xFFFF){
203 tpa.tp.detid = df_ptr->daq_header.det_id;
209 int new_tps = ttpp.size();
212 const auto s_ts_begin = ttpp.front().tp.time_start;
213 const auto channel_begin = ttpp.front().tp.channel;
214 const auto s_ts_end = ttpp.back().tp.time_start;
217 if (!m_tp_sink->try_send(std::move(ttpp), iomanager::Sender::s_no_block)) {
219 m_tps_send_failed += new_tps;
221 m_new_tps += new_tps;
222 m_new_hits += new_tps;
248DAPHNEFrameProcessor::generate_opmon_data() {
251 if (m_post_processing_enabled) {
252 auto now = std::chrono::high_resolution_clock::now();
253 int new_hits = m_new_hits.exchange(0);
254 int new_tps = m_new_tps.exchange(0);
255 int new_tps_suppressed_too_long = 0;
256 int new_tps_send_failed = m_tps_send_failed.exchange(0);
257 double seconds = std::chrono::duration_cast<std::chrono::microseconds>(
now - m_t0).count() / 1000000.;
258 TLOG_DEBUG(TLVL_BOOKKEEPING) <<
"Hit rate: " << std::to_string(new_hits / seconds / 1000.) <<
" [kHz]";
259 TLOG_DEBUG(TLVL_BOOKKEEPING) <<
"Total new hits: " << new_hits <<
" new TPs: " << new_tps;
261 datahandlinglibs::opmon::HitFindingInfo tp_info;
262 tp_info.set_rate_tp_hits(new_hits / seconds / 1000.);
264 tp_info.set_num_tps_sent(new_tps);
265 tp_info.set_num_tps_suppressed_too_long(new_tps_suppressed_too_long);
266 tp_info.set_num_tps_send_failed(new_tps_send_failed);
268 publish(std::move(tp_info));
274 inherited::generate_opmon_data();
#define DUNE_DAQ_TYPESTRING(Type, typestring)
Declare the datatype_to_string method for the given type.
Base class for any user define issue.
#define TLOG_DEBUG(lvl,...)
PDSUnphysicalFrameTimestamp
PDS Frame with unphysical timestamp detected with ch
void warning(const Issue &issue)
void error(const Issue &issue)