DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
TPGInternalStateHarvester.cpp
Go to the documentation of this file.
1
10#include "logging/Logging.hpp"
11#include <cassert>
12#include <algorithm>
13#include <iostream>
14
16
17namespace dunedaq {
18namespace fdreadoutlibs {
19
21 // Ensure thread is properly stopped
23}
24
26 m_processor_references = std::move(refs);
27 rebuild_prealloc_caches_(); // if num_pipelines is not set, this will gracefully get empty per-pipeline statistics, which will be filled later by update
28}
29
30const std::vector<TPGInternalStateHarvester::ProcRef>&
34
36 const std::vector<std::pair<trgdataformats::channel_t,int16_t>>& channel_plane_numbers,
37 uint8_t num_channels_per_pipeline,
38 uint8_t num_pipelines)
39{
41 m_channel_plane_numbers_per_pipeline.resize(num_pipelines);
42
43 // Cut in order: each pipeline has num_channels_per_pipeline channels
44 for (uint8_t p = 0; p < num_pipelines; ++p) {
45 auto begin = channel_plane_numbers.begin() + p * num_channels_per_pipeline;
46 auto end = begin + num_channels_per_pipeline;
48 std::vector<std::pair<trgdataformats::channel_t,int16_t>>(begin, end);
49 }
50 m_num_channels_per_pipeline = num_channels_per_pipeline;
51 m_num_pipelines = num_pipelines;
53
54 rebuild_prealloc_caches_(); // now pipelines number is known, can accurately calculate the expected number of metric items per pipeline
55}
56
58{
59 // pre-clean
62
63 // first clear/reset the expected number of metric items per pipeline
66
67 // cache the metric names for each processor and accumulate to the corresponding pipeline
68 for (const auto& [proc, pipeline_id] : m_processor_references) {
69 if (proc) {
70 // Use the processor's interface method which delegates to the registry
71 auto names = proc->get_requested_internal_state_names();
72 if (static_cast<size_t>(pipeline_id) < m_expected_items_per_pipeline.size()) {
73 m_expected_items_per_pipeline[pipeline_id] += names.size();
74 }
75 m_metric_items_per_proc.emplace_back(std::move(names));
76 } else {
77 m_metric_items_per_proc.emplace_back(); // empty
78 }
79 }
80}
81
82
83std::unordered_map<trgdataformats::channel_t,
84 std::vector<std::pair<std::string,int16_t>>>
86{
87 std::unordered_map<trgdataformats::channel_t,
88 std::vector<std::pair<std::string,int16_t>>> out;
89
90 // 1. pre-estimate the capacity of the buckets (usually 64)
93 } else {
94 out.reserve(static_cast<size_t>(m_num_channels_per_pipeline) * m_num_pipelines);
95 }
96
97 // defensive: if the cache is not complete for the current pipelines, rebuild it
100 }
101
102 for (size_t i = 0; i < m_processor_references.size(); ++i) {
103 const auto& [proc, pipeline_id] = m_processor_references[i];
104 if (!proc) {
105 continue;
106 }
107
108 // 1) get the cached metric names; if empty, fall back to pulling directly from processor
109 std::vector<std::string> metric_names_cached;
110 if (m_metric_items_per_proc.size() > i) {
111 metric_names_cached = m_metric_items_per_proc[i];
112 } else {
113 // Fallback: use processor's interface method
114 metric_names_cached = proc->get_requested_internal_state_names();
115 }
116
117 // 2) get the current snapshot
118 const auto arr = proc->read_internal_states_as_integer_array();
119
120 // basic consistency: the number of items should be the same as the number of snapshot items
121 if (metric_names_cached.size() != arr.m_size) {
122 TLOG_DEBUG(TLVL_BOOKKEEPING) << "Processor " << i << " size mismatch: metric_names="
123 << metric_names_cached.size() << " vs array=" << arr.m_size;
124 continue;
125 }
126
127 // the current pipeline's lane -> (channel, plane)
128 assert(static_cast<size_t>(pipeline_id) < m_channel_plane_numbers_per_pipeline.size());
129 const auto& chan_plane_vec = m_channel_plane_numbers_per_pipeline[pipeline_id];
130
131 // 3) first reserve the capacity for all channels in the current pipeline in out
132 // so that subsequent emplace_back will not trigger allocation
133 std::vector<decltype(out.begin())> iters_for_lanes;
134 iters_for_lanes.resize(chan_plane_vec.size());
135
136 const size_t expected_items_here =
137 (pipeline_id < m_expected_items_per_pipeline.size())
138 ? m_expected_items_per_pipeline[pipeline_id]
139 : metric_names_cached.size(); // fallback, use the number of items of the current processor
140
141 for (size_t lane = 0; lane < chan_plane_vec.size(); ++lane) {
142 const auto ch = chan_plane_vec[lane].first; // offline channel id
143 auto [it, inserted] = out.try_emplace(ch, std::vector<std::pair<std::string,int16_t>>{});
144 if (inserted) {
145 // only reserve the capacity for the first time the channel is encountered
146 it->second.reserve(expected_items_here);
147 }
148 iters_for_lanes[lane] = it;
149 }
150
151 // 4) map the 16-lane array of each metric to the corresponding channel
152 for (size_t item = 0; item < arr.m_size; ++item) {
153 const auto& lanes = arr.m_data[item]; // std::array<int16_t, 16>
154 const auto& name = metric_names_cached[item];
155 const size_t L = lanes.size(); // e.g. 16
156
157 const size_t up_to = std::min(L, chan_plane_vec.size());
158 for (size_t lane = 0; lane < up_to; ++lane) {
159 // directly use the iterator, avoid the repeated lookup of the map
160 iters_for_lanes[lane]->second.emplace_back(name, lanes[lane]);
161 }
162 }
163 }
164
165 return out;
166}
167
168// --- Multi-threaded implementation ---
169
171 std::lock_guard<std::mutex> lock(m_config_mutex);
172
173 if (m_thread_running.load()) {
174 return; // Already running
175 }
176
177 TLOG_DEBUG(TLVL_BOOKKEEPING) << "Starting internal state collection thread";
178
179 // Initialize result container
180 m_latest_results.clear();
181
182 // Reset thread control flags
183 m_thread_should_stop.store(false);
184 m_harvest_requested.store(false);
185
186 // Start the collection thread
188 m_thread_running.store(true);
189}
190
192 {
193 std::lock_guard<std::mutex> config_lock(m_config_mutex);
194
195 if (!m_thread_running.load()) {
196 return; // Already stopped
197 }
198
199 TLOG_DEBUG(TLVL_BOOKKEEPING) << "Stopping internal state collection thread";
200
201 // Signal thread to stop
202 m_thread_should_stop.store(true);
203 }
204
205 // Notify the collection thread using the correct mutex
206 {
207 std::lock_guard<std::mutex> collection_lock(m_collection_mutex);
208 m_collection_cv.notify_all();
209 }
210
211 // Wait for thread to finish
212 if (m_collection_thread.joinable()) {
213 m_collection_thread.join();
214 }
215
216 m_thread_running.store(false);
217
218 // Clear results
219 {
220 std::lock_guard<std::mutex> lock(m_results_mutex);
221 m_latest_results.clear();
222 }
223}
224
226 if (!m_thread_running.load()) {
227 return; // Thread not running
228 }
229
230 m_harvest_requested.store(true);
231
232 // Notify the collection thread using the correct mutex
233 {
234 std::lock_guard<std::mutex> lock(m_collection_mutex);
235 m_collection_cv.notify_all();
236 }
237}
238
239std::unordered_map<trgdataformats::channel_t,
240 std::vector<std::pair<std::string,int16_t>>>
242 std::lock_guard<std::mutex> lock(m_results_mutex);
243 return m_latest_results; // Return copy under lock (blocking read is acceptable)
244}
245
249
251 while (!m_thread_should_stop.load()) {
252 std::unique_lock<std::mutex> lock(m_collection_mutex);
253
254 // Wait for harvest request or stop signal
255 m_collection_cv.wait(lock, [this] {
256 return m_harvest_requested.load() || m_thread_should_stop.load();
257 });
258
259 if (m_thread_should_stop.load()) {
260 break;
261 }
262
263 // Reset the harvest request flag
264 m_harvest_requested.store(false);
265
266 // Release lock during collection to allow concurrent reads
267 lock.unlock();
268
269 // Perform the actual harvest (expensive operation in background)
270 auto new_results = harvest_once();
271
272 // Update results with simple mutex protection
273 {
274 std::lock_guard<std::mutex> results_lock(m_results_mutex);
275 m_latest_results = std::move(new_results);
276 }
277 }
278}
279
280} // namespace fdreadoutlibs
281} // namespace dunedaq
void stop_collection_thread()
Stop the background collection thread Blocks until thread is fully stopped.
bool is_collection_thread_running() const
Check if collection thread is running.
void start_collection_thread()
Start the background collection thread Must be called before using async collection features.
std::vector< std::vector< std::pair< trgdataformats::channel_t, int16_t > > > m_channel_plane_numbers_per_pipeline
void trigger_harvest()
Signal the collection thread to perform one harvest cycle Non-blocking - returns immediately.
std::unordered_map< trgdataformats::channel_t, std::vector< std::pair< std::string, int16_t > > > harvest_once()
Harvest once, outputs channel -> [(metric_name, value)...].
std::unordered_map< trgdataformats::channel_t, std::vector< std::pair< std::string, int16_t > > > get_latest_results() const
Get the latest collected results (thread-safe, non-blocking) Returns a copy of the most recent harves...
const std::vector< ProcRef > & get_processor_references() const
std::vector< std::vector< std::string > > m_metric_items_per_proc
void update_channel_plane_numbers(const std::vector< std::pair< trgdataformats::channel_t, int16_t > > &channel_plane_numbers, uint8_t num_channels_per_pipeline, uint8_t num_pipelines)
Cuts a full list of (channel, plane) into per-pipeline lists of 16 lanes each.
#define TLOG_DEBUG(lvl,...)
Definition Logging.hpp:112
The DUNE-DAQ namespace.
FELIX Initialization std::string initerror FELIX queue timed out