LCOV - code coverage report
Current view: top level - fdreadoutlibs/src/tpg - TPGInternalStateHarvester.cpp (source / functions) Coverage Total Hit
Test: code.result Lines: 95.0 % 140 133
Test Date: 2025-12-21 13:07:08 Functions: 100.0 % 16 16

            Line data    Source code
       1              : /**
       2              :  * @file TPGInternalStateHarvester.cpp TPG internal state harvester
       3              :  *
       4              :  * This is part of the DUNE DAQ , copyright 2023.
       5              :  * Licensing/copyright details are in the COPYING file that you should have
       6              :  * received with this code.
       7              :  */
       8              : #include "fdreadoutlibs/tpg/TPGInternalStateHarvester.hpp"
       9              : #include "datahandlinglibs/ReadoutLogging.hpp"
      10              : #include "logging/Logging.hpp"
      11              : #include <cassert>
      12              : #include <algorithm>
      13              : #include <iostream>
      14              : 
      15              : using dunedaq::datahandlinglibs::logging::TLVL_BOOKKEEPING;
      16              : 
      17              : namespace dunedaq {
      18              : namespace fdreadoutlibs {
      19              : 
      20           19 : TPGInternalStateHarvester::~TPGInternalStateHarvester() {
      21              :   // Ensure thread is properly stopped
      22           19 :   stop_collection_thread();
      23           19 : }
      24              : 
      25           15 : void TPGInternalStateHarvester::set_processor_references(std::vector<ProcRef> refs) {
      26           15 :   m_processor_references = std::move(refs);
      27           15 :   rebuild_prealloc_caches_(); // if num_pipelines is not set, this will gracefully get empty per-pipeline statistics, which will be filled later by update
      28           15 : }
      29              : 
      30              : const std::vector<TPGInternalStateHarvester::ProcRef>&
      31            1 : TPGInternalStateHarvester::get_processor_references() const {
      32            1 :   return m_processor_references;
      33              : }
      34              : 
      35           15 : void TPGInternalStateHarvester::update_channel_plane_numbers(
      36              :     const std::vector<std::pair<trgdataformats::channel_t,int16_t>>& channel_plane_numbers,
      37              :     uint8_t num_channels_per_pipeline,
      38              :     uint8_t num_pipelines)
      39              : {
      40           15 :   m_channel_plane_numbers_per_pipeline.clear();
      41           15 :   m_channel_plane_numbers_per_pipeline.resize(num_pipelines);
      42              : 
      43              :   // Cut in order: each pipeline has num_channels_per_pipeline channels
      44           37 :   for (uint8_t p = 0; p < num_pipelines; ++p) {
      45           22 :     auto begin = channel_plane_numbers.begin() + p * num_channels_per_pipeline;
      46           22 :     auto end   = begin + num_channels_per_pipeline;
      47           22 :     m_channel_plane_numbers_per_pipeline[p] =
      48           22 :       std::vector<std::pair<trgdataformats::channel_t,int16_t>>(begin, end);
      49              :   }
      50           15 :   m_num_channels_per_pipeline = num_channels_per_pipeline;
      51           15 :   m_num_pipelines = num_pipelines;
      52           15 :   m_expected_total_channels = static_cast<size_t>(m_num_channels_per_pipeline) * m_num_pipelines;
      53              : 
      54           15 :   rebuild_prealloc_caches_(); // now pipelines number is known, can accurately calculate the expected number of metric items per pipeline
      55           15 : }
      56              : 
      57           30 : void TPGInternalStateHarvester::rebuild_prealloc_caches_()
      58              : {
      59              :   // pre-clean
      60           30 :   m_metric_items_per_proc.clear();
      61           30 :   m_metric_items_per_proc.reserve(m_processor_references.size());
      62              : 
      63              :   // first clear/reset the expected number of metric items per pipeline
      64           30 :   m_expected_items_per_pipeline.clear();
      65           30 :   m_expected_items_per_pipeline.resize(m_num_pipelines, 0);
      66              : 
      67              :   // cache the metric names for each processor and accumulate to the corresponding pipeline
      68           68 :   for (const auto& [proc, pipeline_id] : m_processor_references) {
      69           38 :     if (proc) {
      70              :       // Use the processor's interface method which delegates to the registry
      71           38 :       auto names = proc->get_requested_internal_state_names();
      72           38 :       if (static_cast<size_t>(pipeline_id) < m_expected_items_per_pipeline.size()) {
      73           37 :         m_expected_items_per_pipeline[pipeline_id] += names.size();
      74              :       }
      75           38 :       m_metric_items_per_proc.emplace_back(std::move(names));
      76           38 :     } else {
      77            0 :       m_metric_items_per_proc.emplace_back(); // empty
      78              :     }
      79              :   }
      80           30 : }
      81              : 
      82              : 
      83              : std::unordered_map<trgdataformats::channel_t,
      84              :                    std::vector<std::pair<std::string,int16_t>>>
      85          136 : TPGInternalStateHarvester::harvest_once()
      86              : {
      87          136 :   std::unordered_map<trgdataformats::channel_t,
      88          136 :                      std::vector<std::pair<std::string,int16_t>>> out;
      89              : 
      90              :   // 1. pre-estimate the capacity of the buckets (usually 64)
      91          136 :   if (m_expected_total_channels > 0) {
      92          136 :     out.reserve(m_expected_total_channels);
      93              :   } else {
      94            0 :     out.reserve(static_cast<size_t>(m_num_channels_per_pipeline) * m_num_pipelines);
      95              :   }
      96              : 
      97              :   // defensive: if the cache is not complete for the current pipelines, rebuild it
      98          136 :   if (m_expected_items_per_pipeline.size() != m_num_pipelines) {
      99            0 :     rebuild_prealloc_caches_();
     100              :   }
     101              :   
     102          573 :   for (size_t i = 0; i < m_processor_references.size(); ++i) {
     103          437 :     const auto& [proc, pipeline_id] = m_processor_references[i];
     104          437 :     if (!proc) {
     105            1 :       continue;
     106              :     }
     107              : 
     108              :     // 1) get the cached metric names; if empty, fall back to pulling directly from processor
     109          437 :     std::vector<std::string> metric_names_cached;
     110          437 :     if (m_metric_items_per_proc.size() > i) {
     111          437 :       metric_names_cached = m_metric_items_per_proc[i];
     112              :     } else {
     113              :       // Fallback: use processor's interface method
     114            0 :       metric_names_cached = proc->get_requested_internal_state_names();
     115              :     }
     116              : 
     117              :     // 2) get the current snapshot
     118          437 :     const auto arr = proc->read_internal_states_as_integer_array();
     119              : 
     120              :     // basic consistency: the number of items should be the same as the number of snapshot items
     121          437 :     if (metric_names_cached.size() != arr.m_size) {
     122            1 :       TLOG_DEBUG(TLVL_BOOKKEEPING) << "Processor " << i << " size mismatch: metric_names=" 
     123            1 :                                    << metric_names_cached.size() << " vs array=" << arr.m_size;
     124            1 :       continue;
     125            1 :     }
     126              : 
     127              :     // the current pipeline's lane -> (channel, plane)
     128          436 :     assert(static_cast<size_t>(pipeline_id) < m_channel_plane_numbers_per_pipeline.size());
     129          436 :     const auto& chan_plane_vec = m_channel_plane_numbers_per_pipeline[pipeline_id];
     130              : 
     131              :     // 3) first reserve the capacity for all channels in the current pipeline in out
     132              :     //    so that subsequent emplace_back will not trigger allocation
     133          436 :     std::vector<decltype(out.begin())> iters_for_lanes;
     134          436 :     iters_for_lanes.resize(chan_plane_vec.size());
     135              : 
     136          436 :     const size_t expected_items_here =
     137          436 :       (pipeline_id < m_expected_items_per_pipeline.size())
     138          436 :       ? m_expected_items_per_pipeline[pipeline_id]
     139            0 :       : metric_names_cached.size(); // fallback, use the number of items of the current processor
     140              : 
     141         7412 :     for (size_t lane = 0; lane < chan_plane_vec.size(); ++lane) {
     142         6976 :       const auto ch = chan_plane_vec[lane].first; // offline channel id
     143         6976 :       auto [it, inserted] = out.try_emplace(ch, std::vector<std::pair<std::string,int16_t>>{});
     144         6976 :       if (inserted) {
     145              :         // only reserve the capacity for the first time the channel is encountered
     146         6976 :         it->second.reserve(expected_items_here);
     147              :       }
     148         6976 :       iters_for_lanes[lane] = it;
     149              :     }
     150              : 
     151              :     // 4) map the 16-lane array of each metric to the corresponding channel
     152         1678 :     for (size_t item = 0; item < arr.m_size; ++item) {
     153         1242 :       const auto& lanes = arr.m_data[item]; // std::array<int16_t, 16>
     154         1242 :       const auto& name  = metric_names_cached[item];
     155         1242 :       const size_t L    = lanes.size();     // e.g. 16
     156              : 
     157         1242 :       const size_t up_to = std::min(L, chan_plane_vec.size());
     158        21114 :       for (size_t lane = 0; lane < up_to; ++lane) {
     159              :         // directly use the iterator, avoid the repeated lookup of the map
     160        19872 :         iters_for_lanes[lane]->second.emplace_back(name, lanes[lane]);
     161              :       }
     162              :     }
     163          437 :   }
     164              : 
     165          136 :   return out;
     166            0 : }
     167              : 
     168              : // --- Multi-threaded implementation ---
     169              : 
     170            4 : void TPGInternalStateHarvester::start_collection_thread() {
     171            4 :   std::lock_guard<std::mutex> lock(m_config_mutex);
     172              :   
     173            4 :   if (m_thread_running.load()) {
     174            0 :     return; // Already running
     175              :   }
     176              :   
     177            4 :   TLOG_DEBUG(TLVL_BOOKKEEPING) << "Starting internal state collection thread";
     178              :   
     179              :   // Initialize result container
     180            4 :   m_latest_results.clear();
     181              :   
     182              :   // Reset thread control flags
     183            4 :   m_thread_should_stop.store(false);
     184            4 :   m_harvest_requested.store(false);
     185              :   
     186              :   // Start the collection thread
     187            4 :   m_collection_thread = std::thread(&TPGInternalStateHarvester::collection_thread_worker_, this);
     188            4 :   m_thread_running.store(true);
     189            4 : }
     190              : 
     191           23 : void TPGInternalStateHarvester::stop_collection_thread() {
     192           23 :   {
     193           23 :     std::lock_guard<std::mutex> config_lock(m_config_mutex);
     194              :     
     195           23 :     if (!m_thread_running.load()) {
     196           19 :       return; // Already stopped
     197              :     }
     198              :     
     199            4 :     TLOG_DEBUG(TLVL_BOOKKEEPING) << "Stopping internal state collection thread";
     200              :     
     201              :     // Signal thread to stop
     202            4 :     m_thread_should_stop.store(true);
     203           23 :   }
     204              :   
     205              :   // Notify the collection thread using the correct mutex
     206            4 :   {
     207            4 :     std::lock_guard<std::mutex> collection_lock(m_collection_mutex);
     208            4 :     m_collection_cv.notify_all();
     209            4 :   }
     210              :   
     211              :   // Wait for thread to finish
     212            4 :   if (m_collection_thread.joinable()) {
     213            4 :     m_collection_thread.join();
     214              :   }
     215              :   
     216            4 :   m_thread_running.store(false);
     217              :   
     218              :   // Clear results
     219            4 :   {
     220            4 :     std::lock_guard<std::mutex> lock(m_results_mutex);
     221            4 :     m_latest_results.clear();
     222            4 :   }
     223              : }
     224              : 
     225           17 : void TPGInternalStateHarvester::trigger_harvest() {
     226           17 :   if (!m_thread_running.load()) {
     227              :     return; // Thread not running
     228              :   }
     229              :   
     230           17 :   m_harvest_requested.store(true);
     231              :   
     232              :   // Notify the collection thread using the correct mutex
     233           17 :   {
     234           17 :     std::lock_guard<std::mutex> lock(m_collection_mutex);
     235           17 :     m_collection_cv.notify_all();
     236           17 :   }
     237              : }
     238              : 
     239              : std::unordered_map<trgdataformats::channel_t,
     240              :                    std::vector<std::pair<std::string,int16_t>>>
     241           18 : TPGInternalStateHarvester::get_latest_results() const {
     242           18 :   std::lock_guard<std::mutex> lock(m_results_mutex);
     243           36 :   return m_latest_results; // Return copy under lock (blocking read is acceptable)
     244           18 : }
     245              : 
     246            3 : bool TPGInternalStateHarvester::is_collection_thread_running() const {
     247            3 :   return m_thread_running.load();
     248              : }
     249              : 
     250            4 : void TPGInternalStateHarvester::collection_thread_worker_() {
     251           21 :   while (!m_thread_should_stop.load()) {
     252           21 :     std::unique_lock<std::mutex> lock(m_collection_mutex);
     253              :     
     254              :     // Wait for harvest request or stop signal
     255           21 :     m_collection_cv.wait(lock, [this] {
     256           42 :       return m_harvest_requested.load() || m_thread_should_stop.load();
     257              :     });
     258              :     
     259           21 :     if (m_thread_should_stop.load()) {
     260              :       break;
     261              :     }
     262              :     
     263              :     // Reset the harvest request flag
     264           17 :     m_harvest_requested.store(false);
     265              :     
     266              :     // Release lock during collection to allow concurrent reads
     267           17 :     lock.unlock();
     268              :     
     269              :     // Perform the actual harvest (expensive operation in background)
     270           17 :     auto new_results = harvest_once();
     271              :     
     272              :     // Update results with simple mutex protection
     273           17 :     {
     274           17 :       std::lock_guard<std::mutex> results_lock(m_results_mutex);
     275           17 :       m_latest_results = std::move(new_results);
     276           17 :     }
     277           21 :   }
     278            4 : }
     279              : 
     280              : } // namespace fdreadoutlibs
     281              : } // namespace dunedaq
        

Generated by: LCOV version 2.0-1