LCOV - code coverage report
Current view: top level - fdreadoutlibs/src/daphneeth - DAPHNEEthFrameProcessor.cpp (source / functions) Coverage Total Hit
Test: code.result Lines: 0.0 % 87 0
Test Date: 2025-12-21 13:07:08 Functions: 0.0 % 19 0

            Line data    Source code
       1              : /**
       2              :  * @file DAPHNEEthFrameProcessor.hpp DAPHNE specific Task based raw processor
       3              :  * implementation
       4              :  *
       5              :  * This is part of the DUNE DAQ , copyright 2020.
       6              :  * Licensing/copyright details are in the COPYING file that you should have
       7              :  * received with this code.
       8              :  */
       9              : 
      10              : #include "fddetdataformats/DAPHNEEthFrame.hpp"
      11              : #include "trgdataformats/TriggerPrimitive.hpp"
      12              : #include "fdreadoutlibs/daphneeth/DAPHNEEthFrameProcessor.hpp"
      13              : 
      14              : #include "confmodel/GeoId.hpp"
      15              : 
      16              : #include <atomic>
      17              : #include <functional>
      18              : #include <memory>
      19              : #include <string>
      20              : 
      21              : using dunedaq::datahandlinglibs::logging::TLVL_BOOKKEEPING;
      22              : using dunedaq::datahandlinglibs::logging::TLVL_FRAME_RECEIVED;
      23              : 
      24              : DUNE_DAQ_TYPESTRING(dunedaq::trigger::TriggerPrimitiveTypeAdapter, "TriggerPrimitive")
      25            0 : DUNE_DAQ_TYPESTRING(std::vector<dunedaq::trigger::TriggerPrimitiveTypeAdapter>, "TriggerPrimitiveVector")
      26              : 
      27              : namespace dunedaq {
      28              : namespace fdreadoutlibs {
      29              : 
      30              : void 
      31            0 : DAPHNEEthFrameProcessor::conf(const appmodel::DataHandlerModule* conf)
      32              : {
      33            0 :   TLOG() << "Looking for TP sink...";
      34              : 
      35            0 :   for (auto output : conf->get_outputs()) {
      36            0 :     TLOG() << "On outputs... (" << output->UID() << "," << output->get_data_type() << ")";
      37            0 :     try {
      38            0 :       if (output->get_data_type() == "TriggerPrimitiveVector") {
      39            0 :          TLOG() << "Found TP sink.";
      40            0 :          m_tp_sink = get_iom_sender<std::vector<trigger::TriggerPrimitiveTypeAdapter>>(output->UID());
      41            0 :          TLOG() << " SINK INITIALIZED for TriggerPrimitives with UID : " << output->UID();
      42              :       }
      43            0 :     } catch (const ers::Issue& excpt) {
      44            0 :       ers::error(datahandlinglibs::ResourceQueueError(ERS_HERE, "tp", "DefaultRequestHandlerModel", excpt));
      45            0 :     }
      46              :   }
      47              : 
      48            0 :   TLOG() << "Registering processing tasks...";
      49            0 :   inherited::add_preprocess_task(std::bind(&DAPHNEEthFrameProcessor::timestamp_check, this, std::placeholders::_1));
      50              : 
      51            0 :   auto dp = conf->get_module_configuration()->get_data_processor();
      52            0 :   if (dp == nullptr) {
      53            0 :     TLOG()<< " PDS Data processor does not exist.";
      54              :   } else {
      55            0 :     auto proc_conf = dp->cast<appmodel::PDSRawDataProcessor>();
      56            0 :     if (proc_conf == nullptr) {
      57            0 :       TLOG()<< "PDS RawDataProcessor does not exist.";
      58              :     } else { 
      59            0 :       m_def_adc_intg_thresh = proc_conf-> get_default_adc_intg_thresh();
      60              :       
      61            0 :       auto geo_id = conf->get_geo_id();
      62            0 :       if (geo_id != nullptr) {
      63            0 :         m_det_id = geo_id->get_detector_id();
      64            0 :         m_crate_id = geo_id->get_crate_id();
      65            0 :         m_slot_id = geo_id->get_slot_id();
      66            0 :         m_stream_id = geo_id->get_stream_id();
      67              :       }    
      68              :     
      69            0 :       m_channel_map = dunedaq::detchannelmaps::make_pds_map(proc_conf->get_channel_map());
      70            0 :       const std::vector<unsigned int> channel_mask_vec = proc_conf->get_channel_mask();
      71              :     
      72            0 :       for (int chan = 0; chan < 48; chan++) {// 40 physical PDS channel 8 not. 0->7 contain light info, 8,9, additional info. 10-17 light, 18,19 not etc...  
      73            0 :         trgdataformats::channel_t off_channel = m_channel_map->get_offline_channel_from_det_crate_slot_stream_chan(m_det_id, m_crate_id, m_slot_id, m_stream_id, chan);
      74            0 :         if (std::find(channel_mask_vec.begin(), channel_mask_vec.end(), off_channel) != channel_mask_vec.end())
      75            0 :           m_channel_mask_set.insert(off_channel);//m_channel_mask will be a vector fille with random chanel which need to be masked.
      76              :       }
      77              :       
      78            0 :       if (m_post_processing_enabled) { 
      79              :         // Extract TPs back as a pre-processing task, due to LatencyBuffer post-proc issues using SkipList.
      80            0 :         inherited::add_preprocess_task(std::bind(&DAPHNEEthFrameProcessor::extract_tps, this, std::placeholders::_1));
      81              :       }
      82            0 :     }
      83              :   }
      84              : 
      85            0 :   TLOG() << "Calling parent conf.";
      86            0 :   inherited::conf(conf);
      87            0 : }
      88              : 
      89            0 : void DAPHNEEthFrameProcessor::start(const appfwk::DAQModule::CommandData_t& args)
      90              : {
      91              :   // Reset timestamp check
      92            0 :   m_previous_ts = 0;
      93            0 :   m_current_ts = 0;
      94            0 :   m_first_ts_missmatch = true;
      95            0 :   m_ts_error_ctr = 0;
      96              : 
      97              :   // Reset stats
      98            0 :   m_t0 = std::chrono::high_resolution_clock::now();
      99            0 :   m_num_new_tps.exchange(0);
     100              : 
     101            0 :   inherited::start(args);
     102            0 : }
     103              : void
     104            0 : DAPHNEEthFrameProcessor::stop(const appfwk::DAQModule::CommandData_t& args)
     105              : {
     106            0 :   inherited::stop(args);
     107            0 : }
     108              : /**
     109              :  * Pipeline Stage 1.: Check proper timestamp increments in DAPHNE frame
     110              :  * */
     111              : void 
     112            0 : DAPHNEEthFrameProcessor::timestamp_check(frameptr fp)
     113              : {
     114              : 
     115              : /*
     116              :   for (size_t i=0; i<types::kDAPHNENumFrames; i++){
     117              :     auto df_ptr = reinterpret_cast<dunedaq::fddetdataformats::DAPHNEEthFrame*>(fp);
     118              : 
     119              :     if(df_ptr[i].get_timestamp() > 0xFFFFFFFFFFFF0000 || df_ptr[i].get_timestamp() < 0xFFFF){
     120              :       ers::warning(PDSUnphysicalFrameTimestamp(ERS_HERE, df_ptr[i].get_timestamp(), df_ptr[i].get_channel(), i));
     121              :       // Force the TS to 0
     122              :       df_ptr[i].daq_header.timestamp_1 = df_ptr[i].daq_header.timestamp_2 = 0;
     123              :     }
     124              :   }
     125              : */
     126              : 
     127              :   // Acquire timestamp
     128            0 :   m_current_ts = fp->get_timestamp();
     129            0 :   uint64_t k_clock_frequency = 62500000; // NOLINT(build/unsigned)
     130            0 :   TLOG_DEBUG(TLVL_FRAME_RECEIVED) << "Received DAPHNE frame timestamp value of " << m_current_ts << " ticks (..." << std::fixed << std::setprecision(8) << (static_cast<double>(m_current_ts % (k_clock_frequency*1000)) / static_cast<double>(k_clock_frequency)) << " sec)";// NOLINT
     131              : 
     132              : 
     133            0 :   if (m_ts_error_ctr > 1000) {
     134            0 :     if (!m_problem_reported) {
     135            0 :       std::cout << "*** Data Integrity ERROR *** Timestamp continuity is completely broken! "
     136            0 :              << "Something is wrong with the FE source or with the configuration!\n";
     137            0 :       m_problem_reported = true;
     138              :     }
     139              :   }
     140              : 
     141            0 :   m_previous_ts = m_current_ts;
     142            0 :   m_last_processed_daq_ts = m_current_ts;
     143            0 : }
     144              : 
     145              : /**
     146              :  * Pipeline Stage 2.: Check DAPHNE headers for error flags
     147              :  * */
     148              : void 
     149            0 : DAPHNEEthFrameProcessor::frame_error_check(frameptr /*fp*/)
     150              : {
     151              :   // check error fields
     152            0 : }
     153              : 
     154              : 
     155            0 : void DAPHNEEthFrameProcessor::extract_tps(constframeptr fp)
     156              : {
     157              : 
     158            0 :   if (!fp || fp==nullptr){
     159              :     return;
     160              :   }
     161              :     
     162              : /*
     163              :   auto nonconstframeptr = const_cast<frameptr>(fp);
     164              :   auto df_ptr = reinterpret_cast<dunedaq::fddetdataformats::DAPHNEEthFrame*>((uint8_t*)nonconstframeptr); // NOLINT
     165              :   std::vector<trigger::TriggerPrimitiveTypeAdapter> ttpp;
     166              : 
     167              :   for (size_t i=0; i<types::kDAPHNENumFrames; i++)
     168              :   {
     169              :     for(size_t j=0; j<fddetdataformats::DAPHNEEthFrame::PeakDescriptorData::max_peaks;j++)
     170              :     {
     171              :       if(df_ptr[i].peaks_data.is_found(j))
     172              :       { 
     173              :         int ch =  m_channel_map->get_offline_channel_from_det_crate_slot_stream_chan(df_ptr[i].daq_header.det_id, df_ptr[i].daq_header.crate_id, df_ptr[i].daq_header.slot_id, df_ptr[i].daq_header.link_id, df_ptr[i].get_channel());
     174              :         if (std::binary_search(m_channel_mask_set.begin(), m_channel_mask_set.end(), ch)) continue;
     175              :         if (df_ptr[i].peaks_data.get_adc_integral(j) < m_def_adc_intg_thresh) continue;
     176              : 
     177              : 
     178              :         trigger::TriggerPrimitiveTypeAdapter tpa;
     179              :         tpa.tp = peak_to_tp(df_ptr[i],j);// this is the trigger primitive
     180              :         //check for timestamps that are due to frame timestamps ~ ts=0, and ignore these peaks
     181              :         if(tpa.tp.time_start > 0xFFFFFFFFFFFF0000 || tpa.tp.time_start < 0xFFFF){
     182              :           ers::warning(PDSPeakIgnored(ERS_HERE, tpa.tp.time_start, tpa.tp.channel, i, j));
     183              :           continue;
     184              :         }
     185              :         
     186              :         tpa.tp.detid = df_ptr->daq_header.det_id;
     187              :         ttpp.push_back(tpa);
     188              :       }
     189              :     }
     190              :   }
     191              : 
     192              :   int num_new_tps = ttpp.size();
     193              :   if (num_new_tps > 0) {
     194              : 
     195              :     const auto s_ts_begin = ttpp.front().tp.time_start;
     196              :     const auto channel_begin = ttpp.front().tp.channel;
     197              :     const auto s_ts_end = ttpp.back().tp.time_start;
     198              :     const auto channel_end = ttpp.back().tp.channel;      
     199              :     
     200              :     if (!m_tp_sink->try_send(std::move(ttpp), iomanager::Sender::s_no_block)) {
     201              :       ers::warning(FailedToSendTPVector(ERS_HERE, s_ts_begin, channel_begin, s_ts_end, channel_end));
     202              :       m_tps_send_failed += num_new_tps;
     203              :     } else {
     204              :       m_num_new_tps += num_new_tps;
     205              :     }
     206              :   }
     207              : */
     208              :   return;
     209              : }
     210              : 
     211              : void
     212            0 : DAPHNEEthFrameProcessor::generate_opmon_data() {
     213              : 
     214              :   //right now, just fill some basic tp info...
     215            0 :   if (m_post_processing_enabled) {
     216            0 :     auto now = std::chrono::high_resolution_clock::now();
     217            0 :     int num_new_tps = m_num_new_tps.exchange(0);
     218            0 :     int num_new_tps_suppressed_too_long = 0; // not relevant for PDS TPs
     219            0 :     int num_new_tps_send_failed = m_tps_send_failed.exchange(0);
     220            0 :     double seconds = std::chrono::duration_cast<std::chrono::microseconds>(now - m_t0).count() / 1000000.;
     221            0 :     TLOG_DEBUG(TLVL_BOOKKEEPING) << "TP rate: " << std::to_string(num_new_tps / seconds / 1000.) << " [kHz]";
     222            0 :     TLOG_DEBUG(TLVL_BOOKKEEPING) << "Total new TPs: " << num_new_tps;
     223              :     
     224            0 :     datahandlinglibs::opmon::HitFindingInfo tp_info;
     225            0 :     tp_info.set_rate_tp_hits(num_new_tps / seconds / 1000.);
     226              :     
     227            0 :     tp_info.set_num_tps_sent(num_new_tps);
     228            0 :     tp_info.set_num_tps_suppressed_too_long(num_new_tps_suppressed_too_long);
     229            0 :     tp_info.set_num_tps_send_failed(num_new_tps_send_failed);
     230              :     
     231            0 :     publish(std::move(tp_info));
     232              : 
     233            0 :     m_t0 = now;
     234              : 
     235            0 :   }
     236              : 
     237            0 :  inherited::generate_opmon_data();
     238              :   
     239            0 : }
     240              : 
     241              : } // namespace fdreadoutlibs
     242              : } // namespace dunedaq
        

Generated by: LCOV version 2.0-1