25namespace fdreadoutlibs {
28DAPHNEFrameProcessor::conf(
const appmodel::DataHandlerModule* conf)
30 TLOG() <<
"Looking for TP sink...";
32 for (
auto output : conf->get_outputs()) {
33 TLOG() <<
"On outputs... (" << output->UID() <<
"," << output->get_data_type() <<
")";
35 if (output->get_data_type() ==
"TriggerPrimitiveVector") {
36 TLOG() <<
"Found TP sink.";
37 m_tp_sink = get_iom_sender<std::vector<trigger::TriggerPrimitiveTypeAdapter>>(output->UID());
38 TLOG() <<
" SINK INITIALIZED for TriggerPrimitives with UID : " << output->UID();
41 ers::error(datahandlinglibs::ResourceQueueError(
ERS_HERE,
"tp",
"DefaultRequestHandlerModel", excpt));
45 TLOG() <<
"Registering processing tasks...";
46 inherited::add_preprocess_task(std::bind(&DAPHNEFrameProcessor::timestamp_check,
this, std::placeholders::_1));
48 if (m_post_processing_enabled) {
50 inherited::add_preprocess_task(std::bind(&DAPHNEFrameProcessor::extract_tps,
this, std::placeholders::_1));
53 TLOG() <<
"Calling parent conf.";
54 inherited::conf(conf);
57void DAPHNEFrameProcessor::start(
const nlohmann::json& args)
62 m_first_ts_missmatch =
true;
66 m_t0 = std::chrono::high_resolution_clock::now();
71 inherited::start(args);
73void DAPHNEFrameProcessor::stop(
const nlohmann::json& args)
75 inherited::stop(args);
81DAPHNEFrameProcessor::timestamp_check(frameptr fp)
101 for (
size_t i=0; i<types::kDAPHNENumFrames; i++){
104 if(df_ptr[i].get_timestamp() > 0xFFFFFFFFFFFF0000 || df_ptr[i].get_timestamp() < 0xFFFF){
107 df_ptr[i].daq_header.timestamp_1 = df_ptr[i].daq_header.timestamp_2 = 0;
112 m_current_ts = fp->get_timestamp();
113 uint64_t k_clock_frequency = 62500000;
114 TLOG_DEBUG(TLVL_FRAME_RECEIVED) <<
"Received DAPHNE frame timestamp value of " << m_current_ts <<
" ticks (..." << std::fixed << std::setprecision(8) << (static_cast<double>(m_current_ts % (k_clock_frequency*1000)) /
static_cast<double>(k_clock_frequency)) <<
" sec)";
124 if (m_ts_error_ctr > 1000) {
125 if (!m_problem_reported) {
126 std::cout <<
"*** Data Integrity ERROR *** Timestamp continuity is completely broken! "
127 <<
"Something is wrong with the FE source or with the configuration!\n";
128 m_problem_reported =
true;
132 m_previous_ts = m_current_ts;
133 m_last_processed_daq_ts = m_current_ts;
140DAPHNEFrameProcessor::frame_error_check(frameptr )
146void DAPHNEFrameProcessor::extract_tps(constframeptr fp)
150 if (!fp || fp==
nullptr)
153 auto nonconstframeptr =
const_cast<frameptr
>(fp);
156 std::vector<trigger::TriggerPrimitiveTypeAdapter> ttpp;
157 for (
size_t i=0; i<types::kDAPHNENumFrames; i++)
159 for(
size_t j=0; j<fddetdataformats::DAPHNEFrame::PeakDescriptorData::max_peaks;j++)
161 if(df_ptr[i].peaks_data.is_found(j))
163 trigger::TriggerPrimitiveTypeAdapter tpa;
164 tpa.tp = peak_to_tp(df_ptr[i],j);
167 if(tpa.tp.time_start > 0xFFFFFFFFFFFF0000 || tpa.tp.time_start < 0xFFFF){
172 tpa.tp.detid = df_ptr->daq_header.det_id;
178 int new_tps = ttpp.size();
181 const auto s_ts_begin = ttpp.front().tp.time_start;
182 const auto channel_begin = ttpp.front().tp.channel;
183 const auto s_ts_end = ttpp.back().tp.time_start;
186 if (!m_tp_sink->try_send(std::move(ttpp), iomanager::Sender::s_no_block)) {
188 m_tps_send_failed += new_tps;
190 m_new_tps += new_tps;
191 m_new_hits += new_tps;
217DAPHNEFrameProcessor::generate_opmon_data() {
220 if (m_post_processing_enabled) {
221 auto now = std::chrono::high_resolution_clock::now();
222 int new_hits = m_new_hits.exchange(0);
223 int new_tps = m_new_tps.exchange(0);
224 int new_tps_suppressed_too_long = 0;
225 int new_tps_send_failed = m_tps_send_failed.exchange(0);
226 double seconds = std::chrono::duration_cast<std::chrono::microseconds>(
now - m_t0).count() / 1000000.;
227 TLOG_DEBUG(TLVL_BOOKKEEPING) <<
"Hit rate: " << std::to_string(new_hits / seconds / 1000.) <<
" [kHz]";
228 TLOG_DEBUG(TLVL_BOOKKEEPING) <<
"Total new hits: " << new_hits <<
" new TPs: " << new_tps;
230 datahandlinglibs::opmon::HitFindingInfo tp_info;
231 tp_info.set_rate_tp_hits(new_hits / seconds / 1000.);
233 tp_info.set_num_tps_sent(new_tps);
234 tp_info.set_num_tps_suppressed_too_long(new_tps_suppressed_too_long);
235 tp_info.set_num_tps_send_failed(new_tps_send_failed);
237 publish(std::move(tp_info));
243 inherited::generate_opmon_data();
#define DUNE_DAQ_TYPESTRING(Type, typestring)
Base class for any user define issue.
#define TLOG_DEBUG(lvl,...)
PDSUnphysicalFrameTimestamp
void warning(const Issue &issue)
void error(const Issue &issue)