Line data Source code
1 : /**
2 : * @file TPGInternalStateHarvester.cpp TPG internal state harvester
3 : *
4 : * This is part of the DUNE DAQ , copyright 2023.
5 : * Licensing/copyright details are in the COPYING file that you should have
6 : * received with this code.
7 : */
8 : #include "fdreadoutlibs/tpg/TPGInternalStateHarvester.hpp"
9 : #include "datahandlinglibs/ReadoutLogging.hpp"
10 : #include "logging/Logging.hpp"
11 : #include <cassert>
12 : #include <algorithm>
13 : #include <iostream>
14 :
15 : using dunedaq::datahandlinglibs::logging::TLVL_BOOKKEEPING;
16 :
17 : namespace dunedaq {
18 : namespace fdreadoutlibs {
19 :
20 19 : TPGInternalStateHarvester::~TPGInternalStateHarvester() {
21 : // Ensure thread is properly stopped
22 19 : stop_collection_thread();
23 19 : }
24 :
25 15 : void TPGInternalStateHarvester::set_processor_references(std::vector<ProcRef> refs) {
26 15 : m_processor_references = std::move(refs);
27 15 : rebuild_prealloc_caches_(); // if num_pipelines is not set, this will gracefully get empty per-pipeline statistics, which will be filled later by update
28 15 : }
29 :
30 : const std::vector<TPGInternalStateHarvester::ProcRef>&
31 1 : TPGInternalStateHarvester::get_processor_references() const {
32 1 : return m_processor_references;
33 : }
34 :
35 15 : void TPGInternalStateHarvester::update_channel_plane_numbers(
36 : const std::vector<std::pair<trgdataformats::channel_t,int16_t>>& channel_plane_numbers,
37 : uint8_t num_channels_per_pipeline,
38 : uint8_t num_pipelines)
39 : {
40 15 : m_channel_plane_numbers_per_pipeline.clear();
41 15 : m_channel_plane_numbers_per_pipeline.resize(num_pipelines);
42 :
43 : // Cut in order: each pipeline has num_channels_per_pipeline channels
44 37 : for (uint8_t p = 0; p < num_pipelines; ++p) {
45 22 : auto begin = channel_plane_numbers.begin() + p * num_channels_per_pipeline;
46 22 : auto end = begin + num_channels_per_pipeline;
47 22 : m_channel_plane_numbers_per_pipeline[p] =
48 22 : std::vector<std::pair<trgdataformats::channel_t,int16_t>>(begin, end);
49 : }
50 15 : m_num_channels_per_pipeline = num_channels_per_pipeline;
51 15 : m_num_pipelines = num_pipelines;
52 15 : m_expected_total_channels = static_cast<size_t>(m_num_channels_per_pipeline) * m_num_pipelines;
53 :
54 15 : rebuild_prealloc_caches_(); // now pipelines number is known, can accurately calculate the expected number of metric items per pipeline
55 15 : }
56 :
57 30 : void TPGInternalStateHarvester::rebuild_prealloc_caches_()
58 : {
59 : // pre-clean
60 30 : m_metric_items_per_proc.clear();
61 30 : m_metric_items_per_proc.reserve(m_processor_references.size());
62 :
63 : // first clear/reset the expected number of metric items per pipeline
64 30 : m_expected_items_per_pipeline.clear();
65 30 : m_expected_items_per_pipeline.resize(m_num_pipelines, 0);
66 :
67 : // cache the metric names for each processor and accumulate to the corresponding pipeline
68 68 : for (const auto& [proc, pipeline_id] : m_processor_references) {
69 38 : if (proc) {
70 : // Use the processor's interface method which delegates to the registry
71 38 : auto names = proc->get_requested_internal_state_names();
72 38 : if (static_cast<size_t>(pipeline_id) < m_expected_items_per_pipeline.size()) {
73 37 : m_expected_items_per_pipeline[pipeline_id] += names.size();
74 : }
75 38 : m_metric_items_per_proc.emplace_back(std::move(names));
76 38 : } else {
77 0 : m_metric_items_per_proc.emplace_back(); // empty
78 : }
79 : }
80 30 : }
81 :
82 :
83 : std::unordered_map<trgdataformats::channel_t,
84 : std::vector<std::pair<std::string,int16_t>>>
85 136 : TPGInternalStateHarvester::harvest_once()
86 : {
87 136 : std::unordered_map<trgdataformats::channel_t,
88 136 : std::vector<std::pair<std::string,int16_t>>> out;
89 :
90 : // 1. pre-estimate the capacity of the buckets (usually 64)
91 136 : if (m_expected_total_channels > 0) {
92 136 : out.reserve(m_expected_total_channels);
93 : } else {
94 0 : out.reserve(static_cast<size_t>(m_num_channels_per_pipeline) * m_num_pipelines);
95 : }
96 :
97 : // defensive: if the cache is not complete for the current pipelines, rebuild it
98 136 : if (m_expected_items_per_pipeline.size() != m_num_pipelines) {
99 0 : rebuild_prealloc_caches_();
100 : }
101 :
102 573 : for (size_t i = 0; i < m_processor_references.size(); ++i) {
103 437 : const auto& [proc, pipeline_id] = m_processor_references[i];
104 437 : if (!proc) {
105 1 : continue;
106 : }
107 :
108 : // 1) get the cached metric names; if empty, fall back to pulling directly from processor
109 437 : std::vector<std::string> metric_names_cached;
110 437 : if (m_metric_items_per_proc.size() > i) {
111 437 : metric_names_cached = m_metric_items_per_proc[i];
112 : } else {
113 : // Fallback: use processor's interface method
114 0 : metric_names_cached = proc->get_requested_internal_state_names();
115 : }
116 :
117 : // 2) get the current snapshot
118 437 : const auto arr = proc->read_internal_states_as_integer_array();
119 :
120 : // basic consistency: the number of items should be the same as the number of snapshot items
121 437 : if (metric_names_cached.size() != arr.m_size) {
122 1 : TLOG_DEBUG(TLVL_BOOKKEEPING) << "Processor " << i << " size mismatch: metric_names="
123 1 : << metric_names_cached.size() << " vs array=" << arr.m_size;
124 1 : continue;
125 1 : }
126 :
127 : // the current pipeline's lane -> (channel, plane)
128 436 : assert(static_cast<size_t>(pipeline_id) < m_channel_plane_numbers_per_pipeline.size());
129 436 : const auto& chan_plane_vec = m_channel_plane_numbers_per_pipeline[pipeline_id];
130 :
131 : // 3) first reserve the capacity for all channels in the current pipeline in out
132 : // so that subsequent emplace_back will not trigger allocation
133 436 : std::vector<decltype(out.begin())> iters_for_lanes;
134 436 : iters_for_lanes.resize(chan_plane_vec.size());
135 :
136 436 : const size_t expected_items_here =
137 436 : (pipeline_id < m_expected_items_per_pipeline.size())
138 436 : ? m_expected_items_per_pipeline[pipeline_id]
139 0 : : metric_names_cached.size(); // fallback, use the number of items of the current processor
140 :
141 7412 : for (size_t lane = 0; lane < chan_plane_vec.size(); ++lane) {
142 6976 : const auto ch = chan_plane_vec[lane].first; // offline channel id
143 6976 : auto [it, inserted] = out.try_emplace(ch, std::vector<std::pair<std::string,int16_t>>{});
144 6976 : if (inserted) {
145 : // only reserve the capacity for the first time the channel is encountered
146 6976 : it->second.reserve(expected_items_here);
147 : }
148 6976 : iters_for_lanes[lane] = it;
149 : }
150 :
151 : // 4) map the 16-lane array of each metric to the corresponding channel
152 1678 : for (size_t item = 0; item < arr.m_size; ++item) {
153 1242 : const auto& lanes = arr.m_data[item]; // std::array<int16_t, 16>
154 1242 : const auto& name = metric_names_cached[item];
155 1242 : const size_t L = lanes.size(); // e.g. 16
156 :
157 1242 : const size_t up_to = std::min(L, chan_plane_vec.size());
158 21114 : for (size_t lane = 0; lane < up_to; ++lane) {
159 : // directly use the iterator, avoid the repeated lookup of the map
160 19872 : iters_for_lanes[lane]->second.emplace_back(name, lanes[lane]);
161 : }
162 : }
163 437 : }
164 :
165 136 : return out;
166 0 : }
167 :
168 : // --- Multi-threaded implementation ---
169 :
170 4 : void TPGInternalStateHarvester::start_collection_thread() {
171 4 : std::lock_guard<std::mutex> lock(m_config_mutex);
172 :
173 4 : if (m_thread_running.load()) {
174 0 : return; // Already running
175 : }
176 :
177 4 : TLOG_DEBUG(TLVL_BOOKKEEPING) << "Starting internal state collection thread";
178 :
179 : // Initialize result container
180 4 : m_latest_results.clear();
181 :
182 : // Reset thread control flags
183 4 : m_thread_should_stop.store(false);
184 4 : m_harvest_requested.store(false);
185 :
186 : // Start the collection thread
187 4 : m_collection_thread = std::thread(&TPGInternalStateHarvester::collection_thread_worker_, this);
188 4 : m_thread_running.store(true);
189 4 : }
190 :
191 23 : void TPGInternalStateHarvester::stop_collection_thread() {
192 23 : {
193 23 : std::lock_guard<std::mutex> config_lock(m_config_mutex);
194 :
195 23 : if (!m_thread_running.load()) {
196 19 : return; // Already stopped
197 : }
198 :
199 4 : TLOG_DEBUG(TLVL_BOOKKEEPING) << "Stopping internal state collection thread";
200 :
201 : // Signal thread to stop
202 4 : m_thread_should_stop.store(true);
203 23 : }
204 :
205 : // Notify the collection thread using the correct mutex
206 4 : {
207 4 : std::lock_guard<std::mutex> collection_lock(m_collection_mutex);
208 4 : m_collection_cv.notify_all();
209 4 : }
210 :
211 : // Wait for thread to finish
212 4 : if (m_collection_thread.joinable()) {
213 4 : m_collection_thread.join();
214 : }
215 :
216 4 : m_thread_running.store(false);
217 :
218 : // Clear results
219 4 : {
220 4 : std::lock_guard<std::mutex> lock(m_results_mutex);
221 4 : m_latest_results.clear();
222 4 : }
223 : }
224 :
225 17 : void TPGInternalStateHarvester::trigger_harvest() {
226 17 : if (!m_thread_running.load()) {
227 : return; // Thread not running
228 : }
229 :
230 17 : m_harvest_requested.store(true);
231 :
232 : // Notify the collection thread using the correct mutex
233 17 : {
234 17 : std::lock_guard<std::mutex> lock(m_collection_mutex);
235 17 : m_collection_cv.notify_all();
236 17 : }
237 : }
238 :
239 : std::unordered_map<trgdataformats::channel_t,
240 : std::vector<std::pair<std::string,int16_t>>>
241 18 : TPGInternalStateHarvester::get_latest_results() const {
242 18 : std::lock_guard<std::mutex> lock(m_results_mutex);
243 36 : return m_latest_results; // Return copy under lock (blocking read is acceptable)
244 18 : }
245 :
246 3 : bool TPGInternalStateHarvester::is_collection_thread_running() const {
247 3 : return m_thread_running.load();
248 : }
249 :
250 4 : void TPGInternalStateHarvester::collection_thread_worker_() {
251 21 : while (!m_thread_should_stop.load()) {
252 21 : std::unique_lock<std::mutex> lock(m_collection_mutex);
253 :
254 : // Wait for harvest request or stop signal
255 21 : m_collection_cv.wait(lock, [this] {
256 42 : return m_harvest_requested.load() || m_thread_should_stop.load();
257 : });
258 :
259 21 : if (m_thread_should_stop.load()) {
260 : break;
261 : }
262 :
263 : // Reset the harvest request flag
264 17 : m_harvest_requested.store(false);
265 :
266 : // Release lock during collection to allow concurrent reads
267 17 : lock.unlock();
268 :
269 : // Perform the actual harvest (expensive operation in background)
270 17 : auto new_results = harvest_once();
271 :
272 : // Update results with simple mutex protection
273 17 : {
274 17 : std::lock_guard<std::mutex> results_lock(m_results_mutex);
275 17 : m_latest_results = std::move(new_results);
276 17 : }
277 21 : }
278 4 : }
279 :
280 : } // namespace fdreadoutlibs
281 : } // namespace dunedaq
|