DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
dunedaq::fdreadoutlibs::TPGInternalStateHarvester Class Reference

#include <TPGInternalStateHarvester.hpp>

Public Types

using ProcRef = std::pair<std::shared_ptr<tpglibs::AbstractProcessor<__m256i>>, int >
 

Public Member Functions

 ~TPGInternalStateHarvester ()
 
void set_processor_references (std::vector< ProcRef > refs)
 
const std::vector< ProcRef > & get_processor_references () const
 
void update_channel_plane_numbers (const std::vector< std::pair< trgdataformats::channel_t, int16_t > > &channel_plane_numbers, uint8_t num_channels_per_pipeline, uint8_t num_pipelines)
 Cuts a full list of (channel, plane) into per-pipeline lists of 16 lanes each.
 
std::unordered_map< trgdataformats::channel_t, std::vector< std::pair< std::string, int16_t > > > harvest_once ()
 Harvest once, outputs channel -> [(metric_name, value)...].
 
void start_collection_thread ()
 Start the background collection thread Must be called before using async collection features.
 
void stop_collection_thread ()
 Stop the background collection thread Blocks until thread is fully stopped.
 
void trigger_harvest ()
 Signal the collection thread to perform one harvest cycle Non-blocking - returns immediately.
 
std::unordered_map< trgdataformats::channel_t, std::vector< std::pair< std::string, int16_t > > > get_latest_results () const
 Get the latest collected results (thread-safe, non-blocking) Returns a copy of the most recent harvest results.
 
bool is_collection_thread_running () const
 Check if collection thread is running.
 

Private Types

using ResultContainer
 

Private Member Functions

void collection_thread_worker_ ()
 
void rebuild_prealloc_caches_ ()
 

Private Attributes

std::vector< ProcRefm_processor_references
 
std::vector< std::vector< std::pair< trgdataformats::channel_t, int16_t > > > m_channel_plane_numbers_per_pipeline
 
uint8_t m_num_channels_per_pipeline
 
uint8_t m_num_pipelines
 
std::vector< std::vector< std::string > > m_metric_items_per_proc
 
std::vector< size_t > m_expected_items_per_pipeline
 
size_t m_expected_total_channels = 0
 
std::mutex m_results_mutex
 
ResultContainer m_latest_results
 
std::thread m_collection_thread
 
std::atomic< boolm_thread_should_stop {false}
 
std::atomic< boolm_thread_running {false}
 
std::atomic< boolm_harvest_requested {false}
 
std::mutex m_config_mutex
 
std::mutex m_collection_mutex
 
std::condition_variable m_collection_cv
 

Detailed Description

Definition at line 29 of file TPGInternalStateHarvester.hpp.

Member Typedef Documentation

◆ ProcRef

Definition at line 31 of file TPGInternalStateHarvester.hpp.

◆ ResultContainer

Initial value:
std::unordered_map<trgdataformats::channel_t,
std::vector<std::pair<std::string,int16_t>>>

Definition at line 109 of file TPGInternalStateHarvester.hpp.

Constructor & Destructor Documentation

◆ ~TPGInternalStateHarvester()

dunedaq::fdreadoutlibs::TPGInternalStateHarvester::~TPGInternalStateHarvester ( )

Definition at line 20 of file TPGInternalStateHarvester.cpp.

20 {
21 // Ensure thread is properly stopped
23}
void stop_collection_thread()
Stop the background collection thread Blocks until thread is fully stopped.

Member Function Documentation

◆ collection_thread_worker_()

void dunedaq::fdreadoutlibs::TPGInternalStateHarvester::collection_thread_worker_ ( )
private

Definition at line 250 of file TPGInternalStateHarvester.cpp.

250 {
251 while (!m_thread_should_stop.load()) {
252 std::unique_lock<std::mutex> lock(m_collection_mutex);
253
254 // Wait for harvest request or stop signal
255 m_collection_cv.wait(lock, [this] {
256 return m_harvest_requested.load() || m_thread_should_stop.load();
257 });
258
259 if (m_thread_should_stop.load()) {
260 break;
261 }
262
263 // Reset the harvest request flag
264 m_harvest_requested.store(false);
265
266 // Release lock during collection to allow concurrent reads
267 lock.unlock();
268
269 // Perform the actual harvest (expensive operation in background)
270 auto new_results = harvest_once();
271
272 // Update results with simple mutex protection
273 {
274 std::lock_guard<std::mutex> results_lock(m_results_mutex);
275 m_latest_results = std::move(new_results);
276 }
277 }
278}
std::unordered_map< trgdataformats::channel_t, std::vector< std::pair< std::string, int16_t > > > harvest_once()
Harvest once, outputs channel -> [(metric_name, value)...].

◆ get_latest_results()

std::unordered_map< trgdataformats::channel_t, std::vector< std::pair< std::string, int16_t > > > dunedaq::fdreadoutlibs::TPGInternalStateHarvester::get_latest_results ( ) const

Get the latest collected results (thread-safe, non-blocking) Returns a copy of the most recent harvest results.

Returns
std::unordered_map<trgdataformats::channel_t, std::vector<std::pair<std::string,int16_t>>>

Definition at line 241 of file TPGInternalStateHarvester.cpp.

241 {
242 std::lock_guard<std::mutex> lock(m_results_mutex);
243 return m_latest_results; // Return copy under lock (blocking read is acceptable)
244}

◆ get_processor_references()

const std::vector< TPGInternalStateHarvester::ProcRef > & dunedaq::fdreadoutlibs::TPGInternalStateHarvester::get_processor_references ( ) const

Definition at line 31 of file TPGInternalStateHarvester.cpp.

◆ harvest_once()

std::unordered_map< trgdataformats::channel_t, std::vector< std::pair< std::string, int16_t > > > dunedaq::fdreadoutlibs::TPGInternalStateHarvester::harvest_once ( )

Harvest once, outputs channel -> [(metric_name, value)...].

Returns
std::unordered_map<trgdataformats::channel_t, std::vector<std::pair<std::string,int16_t>>>

Definition at line 85 of file TPGInternalStateHarvester.cpp.

86{
87 std::unordered_map<trgdataformats::channel_t,
88 std::vector<std::pair<std::string,int16_t>>> out;
89
90 // 1. pre-estimate the capacity of the buckets (usually 64)
93 } else {
94 out.reserve(static_cast<size_t>(m_num_channels_per_pipeline) * m_num_pipelines);
95 }
96
97 // defensive: if the cache is not complete for the current pipelines, rebuild it
100 }
101
102 for (size_t i = 0; i < m_processor_references.size(); ++i) {
103 const auto& [proc, pipeline_id] = m_processor_references[i];
104 if (!proc) {
105 continue;
106 }
107
108 // 1) get the cached metric names; if empty, fall back to pulling directly from processor
109 std::vector<std::string> metric_names_cached;
110 if (m_metric_items_per_proc.size() > i) {
111 metric_names_cached = m_metric_items_per_proc[i];
112 } else {
113 // Fallback: use processor's interface method
114 metric_names_cached = proc->get_requested_internal_state_names();
115 }
116
117 // 2) get the current snapshot
118 const auto arr = proc->read_internal_states_as_integer_array();
119
120 // basic consistency: the number of items should be the same as the number of snapshot items
121 if (metric_names_cached.size() != arr.m_size) {
122 TLOG_DEBUG(TLVL_BOOKKEEPING) << "Processor " << i << " size mismatch: metric_names="
123 << metric_names_cached.size() << " vs array=" << arr.m_size;
124 continue;
125 }
126
127 // the current pipeline's lane -> (channel, plane)
128 assert(static_cast<size_t>(pipeline_id) < m_channel_plane_numbers_per_pipeline.size());
129 const auto& chan_plane_vec = m_channel_plane_numbers_per_pipeline[pipeline_id];
130
131 // 3) first reserve the capacity for all channels in the current pipeline in out
132 // so that subsequent emplace_back will not trigger allocation
133 std::vector<decltype(out.begin())> iters_for_lanes;
134 iters_for_lanes.resize(chan_plane_vec.size());
135
136 const size_t expected_items_here =
137 (pipeline_id < m_expected_items_per_pipeline.size())
138 ? m_expected_items_per_pipeline[pipeline_id]
139 : metric_names_cached.size(); // fallback, use the number of items of the current processor
140
141 for (size_t lane = 0; lane < chan_plane_vec.size(); ++lane) {
142 const auto ch = chan_plane_vec[lane].first; // offline channel id
143 auto [it, inserted] = out.try_emplace(ch, std::vector<std::pair<std::string,int16_t>>{});
144 if (inserted) {
145 // only reserve the capacity for the first time the channel is encountered
146 it->second.reserve(expected_items_here);
147 }
148 iters_for_lanes[lane] = it;
149 }
150
151 // 4) map the 16-lane array of each metric to the corresponding channel
152 for (size_t item = 0; item < arr.m_size; ++item) {
153 const auto& lanes = arr.m_data[item]; // std::array<int16_t, 16>
154 const auto& name = metric_names_cached[item];
155 const size_t L = lanes.size(); // e.g. 16
156
157 const size_t up_to = std::min(L, chan_plane_vec.size());
158 for (size_t lane = 0; lane < up_to; ++lane) {
159 // directly use the iterator, avoid the repeated lookup of the map
160 iters_for_lanes[lane]->second.emplace_back(name, lanes[lane]);
161 }
162 }
163 }
164
165 return out;
166}
std::vector< std::vector< std::pair< trgdataformats::channel_t, int16_t > > > m_channel_plane_numbers_per_pipeline
std::vector< std::vector< std::string > > m_metric_items_per_proc
#define TLOG_DEBUG(lvl,...)
Definition Logging.hpp:112
FELIX Initialization std::string initerror FELIX queue timed out

◆ is_collection_thread_running()

bool dunedaq::fdreadoutlibs::TPGInternalStateHarvester::is_collection_thread_running ( ) const

Check if collection thread is running.

Returns
true if thread is active, false otherwise

Definition at line 246 of file TPGInternalStateHarvester.cpp.

246 {
247 return m_thread_running.load();
248}

◆ rebuild_prealloc_caches_()

void dunedaq::fdreadoutlibs::TPGInternalStateHarvester::rebuild_prealloc_caches_ ( )
private

Definition at line 57 of file TPGInternalStateHarvester.cpp.

58{
59 // pre-clean
62
63 // first clear/reset the expected number of metric items per pipeline
66
67 // cache the metric names for each processor and accumulate to the corresponding pipeline
68 for (const auto& [proc, pipeline_id] : m_processor_references) {
69 if (proc) {
70 // Use the processor's interface method which delegates to the registry
71 auto names = proc->get_requested_internal_state_names();
72 if (static_cast<size_t>(pipeline_id) < m_expected_items_per_pipeline.size()) {
73 m_expected_items_per_pipeline[pipeline_id] += names.size();
74 }
75 m_metric_items_per_proc.emplace_back(std::move(names));
76 } else {
77 m_metric_items_per_proc.emplace_back(); // empty
78 }
79 }
80}

◆ set_processor_references()

void dunedaq::fdreadoutlibs::TPGInternalStateHarvester::set_processor_references ( std::vector< ProcRef > refs)

Definition at line 25 of file TPGInternalStateHarvester.cpp.

25 {
26 m_processor_references = std::move(refs);
27 rebuild_prealloc_caches_(); // if num_pipelines is not set, this will gracefully get empty per-pipeline statistics, which will be filled later by update
28}

◆ start_collection_thread()

void dunedaq::fdreadoutlibs::TPGInternalStateHarvester::start_collection_thread ( )

Start the background collection thread Must be called before using async collection features.

Definition at line 170 of file TPGInternalStateHarvester.cpp.

170 {
171 std::lock_guard<std::mutex> lock(m_config_mutex);
172
173 if (m_thread_running.load()) {
174 return; // Already running
175 }
176
177 TLOG_DEBUG(TLVL_BOOKKEEPING) << "Starting internal state collection thread";
178
179 // Initialize result container
180 m_latest_results.clear();
181
182 // Reset thread control flags
183 m_thread_should_stop.store(false);
184 m_harvest_requested.store(false);
185
186 // Start the collection thread
188 m_thread_running.store(true);
189}

◆ stop_collection_thread()

void dunedaq::fdreadoutlibs::TPGInternalStateHarvester::stop_collection_thread ( )

Stop the background collection thread Blocks until thread is fully stopped.

Definition at line 191 of file TPGInternalStateHarvester.cpp.

191 {
192 {
193 std::lock_guard<std::mutex> config_lock(m_config_mutex);
194
195 if (!m_thread_running.load()) {
196 return; // Already stopped
197 }
198
199 TLOG_DEBUG(TLVL_BOOKKEEPING) << "Stopping internal state collection thread";
200
201 // Signal thread to stop
202 m_thread_should_stop.store(true);
203 }
204
205 // Notify the collection thread using the correct mutex
206 {
207 std::lock_guard<std::mutex> collection_lock(m_collection_mutex);
208 m_collection_cv.notify_all();
209 }
210
211 // Wait for thread to finish
212 if (m_collection_thread.joinable()) {
213 m_collection_thread.join();
214 }
215
216 m_thread_running.store(false);
217
218 // Clear results
219 {
220 std::lock_guard<std::mutex> lock(m_results_mutex);
221 m_latest_results.clear();
222 }
223}

◆ trigger_harvest()

void dunedaq::fdreadoutlibs::TPGInternalStateHarvester::trigger_harvest ( )

Signal the collection thread to perform one harvest cycle Non-blocking - returns immediately.

Definition at line 225 of file TPGInternalStateHarvester.cpp.

225 {
226 if (!m_thread_running.load()) {
227 return; // Thread not running
228 }
229
230 m_harvest_requested.store(true);
231
232 // Notify the collection thread using the correct mutex
233 {
234 std::lock_guard<std::mutex> lock(m_collection_mutex);
235 m_collection_cv.notify_all();
236 }
237}

◆ update_channel_plane_numbers()

void dunedaq::fdreadoutlibs::TPGInternalStateHarvester::update_channel_plane_numbers ( const std::vector< std::pair< trgdataformats::channel_t, int16_t > > & channel_plane_numbers,
uint8_t num_channels_per_pipeline,
uint8_t num_pipelines )

Cuts a full list of (channel, plane) into per-pipeline lists of 16 lanes each.

Parameters
channel_plane_numbers
num_channels_per_pipeline
num_pipelines

Definition at line 35 of file TPGInternalStateHarvester.cpp.

39{
41 m_channel_plane_numbers_per_pipeline.resize(num_pipelines);
42
43 // Cut in order: each pipeline has num_channels_per_pipeline channels
44 for (uint8_t p = 0; p < num_pipelines; ++p) {
45 auto begin = channel_plane_numbers.begin() + p * num_channels_per_pipeline;
46 auto end = begin + num_channels_per_pipeline;
48 std::vector<std::pair<trgdataformats::channel_t,int16_t>>(begin, end);
49 }
50 m_num_channels_per_pipeline = num_channels_per_pipeline;
51 m_num_pipelines = num_pipelines;
53
54 rebuild_prealloc_caches_(); // now pipelines number is known, can accurately calculate the expected number of metric items per pipeline
55}

Member Data Documentation

◆ m_channel_plane_numbers_per_pipeline

std::vector<std::vector<std::pair<trgdataformats::channel_t,int16_t> > > dunedaq::fdreadoutlibs::TPGInternalStateHarvester::m_channel_plane_numbers_per_pipeline
private

Definition at line 99 of file TPGInternalStateHarvester.hpp.

◆ m_collection_cv

std::condition_variable dunedaq::fdreadoutlibs::TPGInternalStateHarvester::m_collection_cv
private

Definition at line 125 of file TPGInternalStateHarvester.hpp.

◆ m_collection_mutex

std::mutex dunedaq::fdreadoutlibs::TPGInternalStateHarvester::m_collection_mutex
private

Definition at line 124 of file TPGInternalStateHarvester.hpp.

◆ m_collection_thread

std::thread dunedaq::fdreadoutlibs::TPGInternalStateHarvester::m_collection_thread
private

Definition at line 117 of file TPGInternalStateHarvester.hpp.

◆ m_config_mutex

std::mutex dunedaq::fdreadoutlibs::TPGInternalStateHarvester::m_config_mutex
mutableprivate

Definition at line 123 of file TPGInternalStateHarvester.hpp.

◆ m_expected_items_per_pipeline

std::vector<size_t> dunedaq::fdreadoutlibs::TPGInternalStateHarvester::m_expected_items_per_pipeline
private

Definition at line 105 of file TPGInternalStateHarvester.hpp.

◆ m_expected_total_channels

size_t dunedaq::fdreadoutlibs::TPGInternalStateHarvester::m_expected_total_channels = 0
private

Definition at line 106 of file TPGInternalStateHarvester.hpp.

◆ m_harvest_requested

std::atomic<bool> dunedaq::fdreadoutlibs::TPGInternalStateHarvester::m_harvest_requested {false}
private

Definition at line 120 of file TPGInternalStateHarvester.hpp.

120{false};

◆ m_latest_results

ResultContainer dunedaq::fdreadoutlibs::TPGInternalStateHarvester::m_latest_results
private

Definition at line 114 of file TPGInternalStateHarvester.hpp.

◆ m_metric_items_per_proc

std::vector<std::vector<std::string> > dunedaq::fdreadoutlibs::TPGInternalStateHarvester::m_metric_items_per_proc
private

Definition at line 104 of file TPGInternalStateHarvester.hpp.

◆ m_num_channels_per_pipeline

uint8_t dunedaq::fdreadoutlibs::TPGInternalStateHarvester::m_num_channels_per_pipeline
private

Definition at line 100 of file TPGInternalStateHarvester.hpp.

◆ m_num_pipelines

uint8_t dunedaq::fdreadoutlibs::TPGInternalStateHarvester::m_num_pipelines
private

Definition at line 101 of file TPGInternalStateHarvester.hpp.

◆ m_processor_references

std::vector<ProcRef> dunedaq::fdreadoutlibs::TPGInternalStateHarvester::m_processor_references
private

Definition at line 97 of file TPGInternalStateHarvester.hpp.

◆ m_results_mutex

std::mutex dunedaq::fdreadoutlibs::TPGInternalStateHarvester::m_results_mutex
mutableprivate

Definition at line 113 of file TPGInternalStateHarvester.hpp.

◆ m_thread_running

std::atomic<bool> dunedaq::fdreadoutlibs::TPGInternalStateHarvester::m_thread_running {false}
private

Definition at line 119 of file TPGInternalStateHarvester.hpp.

119{false};

◆ m_thread_should_stop

std::atomic<bool> dunedaq::fdreadoutlibs::TPGInternalStateHarvester::m_thread_should_stop {false}
private

Definition at line 118 of file TPGInternalStateHarvester.hpp.

118{false};

The documentation for this class was generated from the following files: