LCOV - code coverage report
Current view: top level - fdreadoutlibs/src/daphne - DAPHNEFrameProcessor.cpp (source / functions) Coverage Total Hit
Test: code.result Lines: 0.0 % 130 0
Test Date: 2025-12-21 13:07:08 Functions: 0.0 % 20 0

            Line data    Source code
       1              : /**
       2              :  * @file DAPHNEFrameProcessor.hpp DAPHNE specific Task based raw processor
       3              :  * implementation
       4              :  *
       5              :  * This is part of the DUNE DAQ , copyright 2020.
       6              :  * Licensing/copyright details are in the COPYING file that you should have
       7              :  * received with this code.
       8              :  */
       9              : 
      10              : #include "fddetdataformats/DAPHNEFrame.hpp"
      11              : #include "trgdataformats/TriggerPrimitive.hpp"
      12              : #include "fdreadoutlibs/daphne/DAPHNEFrameProcessor.hpp"
      13              : 
      14              : #include "confmodel/GeoId.hpp"
      15              : 
      16              : #include <atomic>
      17              : #include <functional>
      18              : #include <memory>
      19              : #include <string>
      20              : 
      21              : using dunedaq::datahandlinglibs::logging::TLVL_BOOKKEEPING;
      22              : using dunedaq::datahandlinglibs::logging::TLVL_FRAME_RECEIVED;
      23              : 
      24              : DUNE_DAQ_TYPESTRING(dunedaq::trigger::TriggerPrimitiveTypeAdapter, "TriggerPrimitive")
      25            0 : DUNE_DAQ_TYPESTRING(std::vector<dunedaq::trigger::TriggerPrimitiveTypeAdapter>, "TriggerPrimitiveVector")
      26              : 
      27              : namespace dunedaq {
      28              : namespace fdreadoutlibs {
      29              : 
      30              : void 
      31            0 : DAPHNEFrameProcessor::conf(const appmodel::DataHandlerModule* conf)
      32              : {
      33            0 :   TLOG() << "Looking for TP sink...";
      34              : 
      35            0 :   for (auto output : conf->get_outputs()) {
      36            0 :     TLOG() << "On outputs... (" << output->UID() << "," << output->get_data_type() << ")";
      37            0 :     try {
      38            0 :       if (output->get_data_type() == "TriggerPrimitiveVector") {
      39            0 :          TLOG() << "Found TP sink.";
      40            0 :          m_tp_sink = get_iom_sender<std::vector<trigger::TriggerPrimitiveTypeAdapter>>(output->UID());
      41            0 :          TLOG() << " SINK INITIALIZED for TriggerPrimitives with UID : " << output->UID();
      42              :       }
      43            0 :     } catch (const ers::Issue& excpt) {
      44            0 :       ers::error(datahandlinglibs::ResourceQueueError(ERS_HERE, "tp", "DefaultRequestHandlerModel", excpt));
      45            0 :     }
      46              :   }
      47              : 
      48            0 :   TLOG() << "Registering processing tasks...";
      49            0 :   inherited::add_preprocess_task(std::bind(&DAPHNEFrameProcessor::timestamp_check, this, std::placeholders::_1));
      50              : 
      51            0 :   auto dp = conf->get_module_configuration()->get_data_processor();
      52            0 :   if (dp == nullptr) {
      53            0 :     TLOG()<< " PDS Data processor does not exist.";
      54              :   } else {
      55            0 :     auto proc_conf = dp->cast<appmodel::PDSRawDataProcessor>();
      56            0 :     if (proc_conf == nullptr) {
      57            0 :       TLOG()<< "PDS RawDataProcessor does not exist.";
      58              :     } else { 
      59            0 :       m_def_adc_intg_thresh = proc_conf-> get_default_adc_intg_thresh();
      60              :       
      61            0 :       auto geo_id = conf->get_geo_id();
      62            0 :       if (geo_id != nullptr) {
      63            0 :         m_det_id = geo_id->get_detector_id();
      64            0 :         m_crate_id = geo_id->get_crate_id();
      65            0 :         m_slot_id = geo_id->get_slot_id();
      66            0 :         m_stream_id = geo_id->get_stream_id();
      67              :       }    
      68              :     
      69            0 :       m_channel_map = dunedaq::detchannelmaps::make_pds_map(proc_conf->get_channel_map());
      70            0 :       const std::vector<unsigned int> channel_mask_vec = proc_conf->get_channel_mask();
      71              :     
      72            0 :       for (int chan = 0; chan < 48; chan++) {// 40 physical PDS channel 8 not. 0->7 contain light info, 8,9, additional info. 10-17 light, 18,19 not etc...  
      73            0 :         trgdataformats::channel_t off_channel = m_channel_map->get_offline_channel_from_det_crate_slot_stream_chan(m_det_id, m_crate_id, m_slot_id, m_stream_id, chan);
      74            0 :         if (std::find(channel_mask_vec.begin(), channel_mask_vec.end(), off_channel) != channel_mask_vec.end())
      75            0 :           m_channel_mask_set.insert(off_channel);//m_channel_mask will be a vector fille with random chanel which need to be masked.
      76              :       }
      77              :       
      78            0 :       if (m_post_processing_enabled) { 
      79              :         // Extract TPs back as a pre-processing task, due to LatencyBuffer post-proc issues using SkipList.
      80            0 :         inherited::add_preprocess_task(std::bind(&DAPHNEFrameProcessor::extract_tps, this, std::placeholders::_1));
      81              :       }
      82            0 :     }
      83              :   }
      84              : 
      85            0 :   TLOG() << "Calling parent conf.";
      86            0 :   inherited::conf(conf);
      87            0 : }
      88              : 
      89            0 : void DAPHNEFrameProcessor::start(const appfwk::DAQModule::CommandData_t& args)
      90              : {
      91              :   // Reset timestamp check
      92            0 :   m_previous_ts = 0;
      93            0 :   m_current_ts = 0;
      94            0 :   m_first_ts_missmatch = true;
      95            0 :   m_ts_error_ctr = 0;
      96              : 
      97              :   // Reset stats
      98            0 :   m_t0 = std::chrono::high_resolution_clock::now();
      99            0 :   m_num_new_tps.exchange(0);
     100              : 
     101            0 :   inherited::start(args);
     102            0 : }
     103              : void
     104            0 : DAPHNEFrameProcessor::stop(const appfwk::DAQModule::CommandData_t& args)
     105              : {
     106            0 :   inherited::stop(args);
     107            0 : }
     108              : /**
     109              :  * Pipeline Stage 1.: Check proper timestamp increments in DAPHNE frame
     110              :  * */
     111              : void 
     112            0 : DAPHNEFrameProcessor::timestamp_check(frameptr fp)
     113              : {
     114              :   // Let Source Emulator deal with this
     115              : /*
     116              :   // If EMU data, emulate perfectly incrementing timestamp
     117              :   if (inherited::m_emulator_mode) { // emulate perfectly incrementing timestamp
     118              :     // RS warning : not fixed rate!
     119              :     if (m_first_ts_fake) {
     120              :       fp->fake_timestamps(m_previous_ts, 16);
     121              :       m_first_ts_fake = false;
     122              :     } else {
     123              :       fp->fake_timestamps(m_previous_ts + 192, 16);
     124              :     }
     125              :   }*/
     126              : 
     127              :   // FIXME: This is a temporary fix to avoid frames with unphysical timestamp set to the far future to interfere with 
     128              :   // the operations of the LB.
     129              :   // These frames are effectively "corrupted" or "invalid frames" and hould be handled as such.
     130              : 
     131              : 
     132            0 :   for (size_t i=0; i<types::kDAPHNENumFrames; i++){
     133            0 :     auto df_ptr = reinterpret_cast<dunedaq::fddetdataformats::DAPHNEFrame*>(fp);
     134              : 
     135            0 :     if(df_ptr[i].get_timestamp() > 0xFFFFFFFFFFFF0000 || df_ptr[i].get_timestamp() < 0xFFFF){
     136            0 :       ers::warning(PDSUnphysicalFrameTimestamp(ERS_HERE, df_ptr[i].get_timestamp(), df_ptr[i].get_channel(), i));
     137              :       // Force the TS to 0
     138            0 :       df_ptr[i].daq_header.timestamp_1 = df_ptr[i].daq_header.timestamp_2 = 0;
     139              :     }
     140              :   }
     141              : 
     142              :   // Acquire timestamp
     143            0 :   m_current_ts = fp->get_timestamp();
     144            0 :   uint64_t k_clock_frequency = 62500000; // NOLINT(build/unsigned)
     145            0 :   TLOG_DEBUG(TLVL_FRAME_RECEIVED) << "Received DAPHNE frame timestamp value of " << m_current_ts << " ticks (..." << std::fixed << std::setprecision(8) << (static_cast<double>(m_current_ts % (k_clock_frequency*1000)) / static_cast<double>(k_clock_frequency)) << " sec)";// NOLINT
     146              : 
     147              : 
     148            0 :   if (m_ts_error_ctr > 1000) {
     149            0 :     if (!m_problem_reported) {
     150            0 :       std::cout << "*** Data Integrity ERROR *** Timestamp continuity is completely broken! "
     151            0 :              << "Something is wrong with the FE source or with the configuration!\n";
     152            0 :       m_problem_reported = true;
     153              :     }
     154              :   }
     155              : 
     156            0 :   m_previous_ts = m_current_ts;
     157            0 :   m_last_processed_daq_ts = m_current_ts;
     158            0 : }
     159              : 
     160              : /**
     161              :  * Pipeline Stage 2.: Check DAPHNE headers for error flags
     162              :  * */
     163              : void 
     164            0 : DAPHNEFrameProcessor::frame_error_check(frameptr /*fp*/)
     165              : {
     166              :   // check error fields
     167            0 : }
     168              : 
     169              : 
     170            0 : void DAPHNEFrameProcessor::extract_tps(constframeptr fp)
     171              : {
     172              : 
     173            0 :   if (!fp || fp==nullptr){
     174              :     return;
     175              :   }
     176              :     
     177              : 
     178            0 :   auto nonconstframeptr = const_cast<frameptr>(fp);
     179            0 :   auto df_ptr = reinterpret_cast<dunedaq::fddetdataformats::DAPHNEFrame*>((uint8_t*)nonconstframeptr); // NOLINT
     180            0 :   std::vector<trigger::TriggerPrimitiveTypeAdapter> ttpp;
     181              : 
     182            0 :   for (size_t i=0; i<types::kDAPHNENumFrames; i++)
     183              :   {
     184            0 :     for(size_t j=0; j<fddetdataformats::DAPHNEFrame::PeakDescriptorData::max_peaks;j++)
     185              :     {
     186            0 :       if(df_ptr[i].peaks_data.is_found(j))
     187              :       { 
     188            0 :         int ch =  m_channel_map->get_offline_channel_from_det_crate_slot_stream_chan(df_ptr[i].daq_header.det_id, df_ptr[i].daq_header.crate_id, df_ptr[i].daq_header.slot_id, df_ptr[i].daq_header.link_id, df_ptr[i].get_channel());
     189            0 :         if (std::binary_search(m_channel_mask_set.begin(), m_channel_mask_set.end(), ch)) continue;
     190            0 :         if (df_ptr[i].peaks_data.get_adc_integral(j) < m_def_adc_intg_thresh) continue;
     191              : 
     192              : 
     193            0 :         trigger::TriggerPrimitiveTypeAdapter tpa;
     194            0 :         tpa.tp = peak_to_tp(df_ptr[i],j);// this is the trigger primitive
     195              :         //check for timestamps that are due to frame timestamps ~ ts=0, and ignore these peaks
     196            0 :         if(tpa.tp.time_start > 0xFFFFFFFFFFFF0000 || tpa.tp.time_start < 0xFFFF){
     197            0 :           ers::warning(PDSPeakIgnored(ERS_HERE, tpa.tp.time_start, tpa.tp.channel, i, j));
     198            0 :           continue;
     199              :         }
     200              :         
     201            0 :         tpa.tp.detid = df_ptr->daq_header.det_id;
     202            0 :         ttpp.push_back(tpa);
     203              :       }
     204              :     }
     205              :   }
     206              : 
     207            0 :   int num_new_tps = ttpp.size();
     208            0 :   if (num_new_tps > 0) {
     209              : 
     210            0 :     const auto s_ts_begin = ttpp.front().tp.time_start;
     211            0 :     const auto channel_begin = ttpp.front().tp.channel;
     212            0 :     const auto s_ts_end = ttpp.back().tp.time_start;
     213            0 :     const auto channel_end = ttpp.back().tp.channel;      
     214              :     
     215            0 :     if (!m_tp_sink->try_send(std::move(ttpp), iomanager::Sender::s_no_block)) {
     216            0 :       ers::warning(FailedToSendTPVector(ERS_HERE, s_ts_begin, channel_begin, s_ts_end, channel_end));
     217            0 :       m_tps_send_failed += num_new_tps;
     218              :     } else {
     219            0 :       m_num_new_tps += num_new_tps;
     220              :     }
     221              :   }
     222              : 
     223            0 :   return;
     224            0 : }
     225              : 
     226              : dunedaq::trgdataformats::TriggerPrimitive 
     227            0 : DAPHNEFrameProcessor::peak_to_tp(dunedaq::fddetdataformats::DAPHNEFrame &frame, int i)
     228              : {
     229            0 :   dunedaq::trgdataformats::TriggerPrimitive tp;
     230              :   // TODO: add check on peak presence
     231            0 :   tp.time_start = frame.get_timestamp()+frame.peaks_data.get_sample_start(i);
     232            0 :   tp.samples_to_peak = frame.peaks_data.get_sample_max(i);
     233            0 :   tp.samples_over_threshold = frame.peaks_data.get_samples_over_baseline(i);
     234              :   // FIXME : hard-coded channel map
     235              :   // WARNING: slot ids in DAPHNEs are all 0!
     236            0 :   tp.channel = m_channel_map->get_offline_channel_from_det_crate_slot_stream_chan(frame.daq_header.det_id, frame.daq_header.crate_id, frame.daq_header.slot_id, frame.daq_header.link_id, frame.get_channel());
     237            0 :   tp.adc_integral = frame.peaks_data.get_adc_integral(i);
     238            0 :   tp.adc_peak = frame.peaks_data.get_adc_max(i);
     239            0 :   tp.detid = dunedaq::trgdataformats::INVALID_DETID;
     240            0 :   return tp;
     241              : }
     242              : 
     243              : 
     244              : void
     245            0 : DAPHNEFrameProcessor::generate_opmon_data() {
     246              : 
     247              :   //right now, just fill some basic tp info...
     248            0 :   if (m_post_processing_enabled) {
     249            0 :     auto now = std::chrono::high_resolution_clock::now();
     250            0 :     int num_new_tps = m_num_new_tps.exchange(0);
     251            0 :     int new_tps_suppressed_too_long = 0; // not relevant for PDS TPs
     252            0 :     int new_tps_send_failed = m_tps_send_failed.exchange(0);
     253            0 :     double seconds = std::chrono::duration_cast<std::chrono::microseconds>(now - m_t0).count() / 1000000.;
     254            0 :     TLOG_DEBUG(TLVL_BOOKKEEPING) << "TP rate: " << std::to_string(num_new_tps / seconds / 1000.) << " [kHz]";
     255            0 :     TLOG_DEBUG(TLVL_BOOKKEEPING) << "Total new TPs: " << num_new_tps;
     256              :     
     257            0 :     datahandlinglibs::opmon::HitFindingInfo tp_info;
     258            0 :     tp_info.set_rate_tp_hits(num_new_tps / seconds / 1000.);
     259              :     
     260            0 :     tp_info.set_num_tps_sent(num_new_tps);
     261            0 :     tp_info.set_num_tps_suppressed_too_long(new_tps_suppressed_too_long);
     262            0 :     tp_info.set_num_tps_send_failed(new_tps_send_failed);
     263              :     
     264            0 :     publish(std::move(tp_info));
     265              : 
     266            0 :     m_t0 = now;
     267              : 
     268            0 :   }
     269              : 
     270            0 :  inherited::generate_opmon_data();
     271              :   
     272            0 : }
     273              : 
     274              : } // namespace fdreadoutlibs
     275              : } // namespace dunedaq
        

Generated by: LCOV version 2.0-1