18namespace fdreadoutlibs {
30const std::vector<TPGInternalStateHarvester::ProcRef>&
36 const std::vector<std::pair<trgdataformats::channel_t,int16_t>>& channel_plane_numbers,
37 uint8_t num_channels_per_pipeline,
38 uint8_t num_pipelines)
44 for (uint8_t p = 0; p < num_pipelines; ++p) {
45 auto begin = channel_plane_numbers.begin() + p * num_channels_per_pipeline;
46 auto end = begin + num_channels_per_pipeline;
48 std::vector<std::pair<trgdataformats::channel_t,int16_t>>(begin, end);
71 auto names = proc->get_requested_internal_state_names();
84 std::vector<std::pair<std::string,int16_t>>>
88 std::vector<std::pair<std::string,int16_t>>>
out;
109 std::vector<std::string> metric_names_cached;
114 metric_names_cached = proc->get_requested_internal_state_names();
118 const auto arr = proc->read_internal_states_as_integer_array();
121 if (metric_names_cached.size() != arr.m_size) {
122 TLOG_DEBUG(TLVL_BOOKKEEPING) <<
"Processor " << i <<
" size mismatch: metric_names="
123 << metric_names_cached.size() <<
" vs array=" << arr.m_size;
133 std::vector<
decltype(
out.begin())> iters_for_lanes;
134 iters_for_lanes.resize(chan_plane_vec.size());
136 const size_t expected_items_here =
139 : metric_names_cached.size();
141 for (
size_t lane = 0; lane < chan_plane_vec.size(); ++lane) {
142 const auto ch = chan_plane_vec[lane].first;
143 auto [it, inserted] =
out.try_emplace(ch, std::vector<std::pair<std::string,int16_t>>{});
146 it->second.reserve(expected_items_here);
148 iters_for_lanes[lane] = it;
152 for (
size_t item = 0; item < arr.m_size; ++item) {
153 const auto& lanes = arr.m_data[item];
154 const auto& name = metric_names_cached[item];
155 const size_t L = lanes.size();
157 const size_t up_to = std::min(L, chan_plane_vec.size());
158 for (
size_t lane = 0; lane < up_to; ++lane) {
160 iters_for_lanes[lane]->second.emplace_back(name, lanes[lane]);
177 TLOG_DEBUG(TLVL_BOOKKEEPING) <<
"Starting internal state collection thread";
199 TLOG_DEBUG(TLVL_BOOKKEEPING) <<
"Stopping internal state collection thread";
240 std::vector<std::pair<std::string,int16_t>>>
size_t m_expected_total_channels
std::condition_variable m_collection_cv
void stop_collection_thread()
Stop the background collection thread Blocks until thread is fully stopped.
bool is_collection_thread_running() const
Check if collection thread is running.
void start_collection_thread()
Start the background collection thread Must be called before using async collection features.
std::vector< std::vector< std::pair< trgdataformats::channel_t, int16_t > > > m_channel_plane_numbers_per_pipeline
std::atomic< bool > m_thread_running
uint8_t m_num_channels_per_pipeline
void trigger_harvest()
Signal the collection thread to perform one harvest cycle Non-blocking - returns immediately.
std::atomic< bool > m_harvest_requested
std::mutex m_config_mutex
std::unordered_map< trgdataformats::channel_t, std::vector< std::pair< std::string, int16_t > > > harvest_once()
Harvest once, outputs channel -> [(metric_name, value)...].
std::unordered_map< trgdataformats::channel_t, std::vector< std::pair< std::string, int16_t > > > get_latest_results() const
Get the latest collected results (thread-safe, non-blocking) Returns a copy of the most recent harves...
void collection_thread_worker_()
std::vector< size_t > m_expected_items_per_pipeline
std::vector< ProcRef > m_processor_references
std::atomic< bool > m_thread_should_stop
void set_processor_references(std::vector< ProcRef > refs)
ResultContainer m_latest_results
~TPGInternalStateHarvester()
std::thread m_collection_thread
std::mutex m_results_mutex
const std::vector< ProcRef > & get_processor_references() const
std::mutex m_collection_mutex
std::vector< std::vector< std::string > > m_metric_items_per_proc
void update_channel_plane_numbers(const std::vector< std::pair< trgdataformats::channel_t, int16_t > > &channel_plane_numbers, uint8_t num_channels_per_pipeline, uint8_t num_pipelines)
Cuts a full list of (channel, plane) into per-pipeline lists of 16 lanes each.
void rebuild_prealloc_caches_()
#define TLOG_DEBUG(lvl,...)
FELIX Initialization std::string initerror FELIX queue timed out