Line data Source code
1 : /**
2 : * @file TPGInternalStateHarvester_test.cxx TPGInternalStateHarvester class Unit Tests
3 : *
4 : * This is part of the DUNE DAQ Application Framework, copyright 2025.
5 : * Licensing/copyright details are in the COPYING file that you should have
6 : * received with this code.
7 : */
8 :
9 : #include "fdreadoutlibs/tpg/TPGInternalStateHarvester.hpp"
10 : #include "tpglibs/AbstractProcessor.hpp"
11 : #include "tpglibs/ProcessorMetricArray.hpp"
12 :
13 : #define BOOST_TEST_MODULE TPGInternalStateHarvester_test // NOLINT
14 :
15 : #include "boost/test/unit_test.hpp"
16 : #include <thread>
17 : #include <chrono>
18 : #include <random>
19 :
20 : using namespace dunedaq::fdreadoutlibs;
21 : using namespace dunedaq::trgdataformats;
22 :
23 : // Mock processor for testing
24 : class MockProcessor : public tpglibs::AbstractProcessor<__m256i> {
25 : public:
26 20 : MockProcessor(const std::vector<std::string>& metric_names,
27 : const std::vector<std::array<int16_t, 16>>& metric_values)
28 20 : : m_metric_names(metric_names), m_metric_values(metric_values) {}
29 :
30 0 : void configure(const nlohmann::json& config, const int16_t* plane_numbers) override {
31 : // Mock implementation - do nothing
32 0 : }
33 :
34 38 : std::vector<std::string> get_requested_internal_state_names() const override {
35 38 : return m_metric_names;
36 : }
37 :
38 437 : tpglibs::ProcessorMetricArray<std::array<int16_t, 16>> read_internal_states_as_integer_array() override {
39 437 : tpglibs::ProcessorMetricArray<std::array<int16_t, 16>> result;
40 437 : result.m_size = m_metric_values.size();
41 437 : result.m_data = m_metric_values.data();
42 437 : return result;
43 : }
44 :
45 : private:
46 : std::vector<std::string> m_metric_names;
47 : std::vector<std::array<int16_t, 16>> m_metric_values;
48 : };
49 :
50 : BOOST_AUTO_TEST_SUITE(TPGInternalStateHarvester_test)
51 :
52 2 : BOOST_AUTO_TEST_CASE(ConstructorDestructor)
53 : {
54 1 : TPGInternalStateHarvester harvester;
55 : // Test that constructor and destructor work without issues
56 1 : BOOST_REQUIRE(true);
57 1 : }
58 :
59 2 : BOOST_AUTO_TEST_CASE(SetGetProcessorReferences)
60 : {
61 1 : TPGInternalStateHarvester harvester;
62 :
63 : // Create mock processors
64 1 : std::vector<std::string> metric_names = {"baseline", "accumulator"};
65 1 : std::vector<std::array<int16_t, 16>> metric_values = {
66 : {{100, 101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, 115}},
67 : {{200, 201, 202, 203, 204, 205, 206, 207, 208, 209, 210, 211, 212, 213, 214, 215}}
68 1 : };
69 :
70 1 : auto proc1 = std::make_shared<MockProcessor>(metric_names, metric_values);
71 1 : auto proc2 = std::make_shared<MockProcessor>(metric_names, metric_values);
72 :
73 1 : std::vector<TPGInternalStateHarvester::ProcRef> refs = {
74 3 : {proc1, 0}, {proc2, 1}
75 3 : };
76 :
77 1 : harvester.set_processor_references(refs);
78 :
79 1 : const auto& retrieved_refs = harvester.get_processor_references();
80 1 : BOOST_REQUIRE_EQUAL(retrieved_refs.size(), 2);
81 1 : BOOST_REQUIRE_EQUAL(retrieved_refs[0].second, 0);
82 1 : BOOST_REQUIRE_EQUAL(retrieved_refs[1].second, 1);
83 2 : }
84 :
85 2 : BOOST_AUTO_TEST_CASE(UpdateChannelPlaneNumbers)
86 : {
87 1 : TPGInternalStateHarvester harvester;
88 :
89 1 : std::vector<std::pair<channel_t, int16_t>> channel_plane_numbers = {
90 : {100, 0}, {101, 0}, {102, 1}, {103, 1}, {104, 2}, {105, 2}
91 1 : };
92 :
93 1 : harvester.update_channel_plane_numbers(channel_plane_numbers, 2, 3);
94 :
95 : // Test that the method completes without throwing
96 1 : BOOST_REQUIRE(true);
97 1 : }
98 :
99 2 : BOOST_AUTO_TEST_CASE(HarvestOnceBasic)
100 : {
101 1 : TPGInternalStateHarvester harvester;
102 :
103 : // Setup mock processors
104 1 : std::vector<std::string> metric_names = {"baseline", "accumulator"};
105 1 : std::vector<std::array<int16_t, 16>> metric_values = {
106 : {{100, 101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, 115}},
107 : {{200, 201, 202, 203, 204, 205, 206, 207, 208, 209, 210, 211, 212, 213, 214, 215}}
108 1 : };
109 :
110 1 : auto proc1 = std::make_shared<MockProcessor>(metric_names, metric_values);
111 2 : std::vector<TPGInternalStateHarvester::ProcRef> refs = {{proc1, 0}};
112 1 : harvester.set_processor_references(refs);
113 :
114 : // Setup channel-plane mapping
115 1 : std::vector<std::pair<channel_t, int16_t>> channel_plane_numbers = {
116 : {100, 0}, {101, 0}, {102, 0}, {103, 0}, {104, 0}, {105, 0}, {106, 0}, {107, 0},
117 : {108, 0}, {109, 0}, {110, 0}, {111, 0}, {112, 0}, {113, 0}, {114, 0}, {115, 0}
118 1 : };
119 1 : harvester.update_channel_plane_numbers(channel_plane_numbers, 16, 1);
120 :
121 : // Perform harvest
122 1 : auto results = harvester.harvest_once();
123 :
124 : // Verify results
125 1 : BOOST_REQUIRE_EQUAL(results.size(), 16); // 16 channels
126 :
127 : // Check first channel
128 1 : auto it = results.find(100);
129 1 : BOOST_REQUIRE(it != results.end());
130 1 : BOOST_REQUIRE_EQUAL(it->second.size(), 2); // 2 metrics
131 :
132 : // Check metric values
133 1 : BOOST_REQUIRE_EQUAL(it->second[0].first, "baseline");
134 1 : BOOST_REQUIRE_EQUAL(it->second[0].second, 100);
135 1 : BOOST_REQUIRE_EQUAL(it->second[1].first, "accumulator");
136 1 : BOOST_REQUIRE_EQUAL(it->second[1].second, 200);
137 2 : }
138 :
139 2 : BOOST_AUTO_TEST_CASE(HarvestOnceMultiplePipelines)
140 : {
141 1 : TPGInternalStateHarvester harvester;
142 :
143 : // Setup mock processors for 2 pipelines
144 1 : std::vector<std::string> metric_names = {"baseline"};
145 1 : std::vector<std::array<int16_t, 16>> metric_values_pipeline0 = {
146 : {{100, 101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, 115}}
147 1 : };
148 1 : std::vector<std::array<int16_t, 16>> metric_values_pipeline1 = {
149 : {{200, 201, 202, 203, 204, 205, 206, 207, 208, 209, 210, 211, 212, 213, 214, 215}}
150 1 : };
151 :
152 1 : auto proc0 = std::make_shared<MockProcessor>(metric_names, metric_values_pipeline0);
153 1 : auto proc1 = std::make_shared<MockProcessor>(metric_names, metric_values_pipeline1);
154 :
155 1 : std::vector<TPGInternalStateHarvester::ProcRef> refs = {
156 3 : {proc0, 0}, {proc1, 1}
157 3 : };
158 1 : harvester.set_processor_references(refs);
159 :
160 : // Setup channel-plane mapping for 2 pipelines of 16 channels each
161 1 : std::vector<std::pair<channel_t, int16_t>> channel_plane_numbers;
162 33 : for (int i = 0; i < 32; ++i) {
163 32 : channel_plane_numbers.push_back({100 + i, i / 16}); // First 16 in plane 0, next 16 in plane 1
164 : }
165 1 : harvester.update_channel_plane_numbers(channel_plane_numbers, 16, 2);
166 :
167 : // Perform harvest
168 1 : auto results = harvester.harvest_once();
169 :
170 : // Verify results
171 1 : BOOST_REQUIRE_EQUAL(results.size(), 32); // 32 channels total
172 :
173 : // Check first channel from pipeline 0
174 1 : auto it0 = results.find(100);
175 1 : BOOST_REQUIRE(it0 != results.end());
176 1 : BOOST_REQUIRE_EQUAL(it0->second[0].second, 100);
177 :
178 : // Check first channel from pipeline 1
179 1 : auto it1 = results.find(116);
180 1 : BOOST_REQUIRE(it1 != results.end());
181 1 : BOOST_REQUIRE_EQUAL(it1->second[0].second, 200);
182 2 : }
183 :
184 2 : BOOST_AUTO_TEST_CASE(ThreadManagement)
185 : {
186 1 : TPGInternalStateHarvester harvester;
187 :
188 : // Test thread state before starting
189 1 : BOOST_REQUIRE(!harvester.is_collection_thread_running());
190 :
191 : // Start collection thread
192 1 : harvester.start_collection_thread();
193 1 : BOOST_REQUIRE(harvester.is_collection_thread_running());
194 :
195 : // Give the thread a moment to fully start
196 1 : std::this_thread::sleep_for(std::chrono::milliseconds(10));
197 :
198 : // Stop collection thread
199 1 : harvester.stop_collection_thread();
200 1 : BOOST_REQUIRE(!harvester.is_collection_thread_running());
201 1 : }
202 :
203 2 : BOOST_AUTO_TEST_CASE(TriggerHarvest)
204 : {
205 1 : TPGInternalStateHarvester harvester;
206 :
207 : // Setup mock processor
208 1 : std::vector<std::string> metric_names = {"baseline"};
209 1 : std::vector<std::array<int16_t, 16>> metric_values = {
210 : {{100, 101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, 115}}
211 1 : };
212 :
213 1 : auto proc = std::make_shared<MockProcessor>(metric_names, metric_values);
214 2 : std::vector<TPGInternalStateHarvester::ProcRef> refs = {{proc, 0}};
215 1 : harvester.set_processor_references(refs);
216 :
217 1 : std::vector<std::pair<channel_t, int16_t>> channel_plane_numbers;
218 17 : for (int i = 0; i < 16; ++i) {
219 16 : channel_plane_numbers.push_back({100 + i, 0});
220 : }
221 1 : harvester.update_channel_plane_numbers(channel_plane_numbers, 16, 1);
222 :
223 : // Start collection thread
224 1 : harvester.start_collection_thread();
225 :
226 : // Give thread time to start
227 1 : std::this_thread::sleep_for(std::chrono::milliseconds(10));
228 :
229 : // Trigger harvest
230 1 : harvester.trigger_harvest();
231 :
232 : // Wait a bit for processing
233 1 : std::this_thread::sleep_for(std::chrono::milliseconds(50));
234 :
235 : // Get results
236 1 : auto results = harvester.get_latest_results();
237 1 : BOOST_REQUIRE_EQUAL(results.size(), 16);
238 :
239 : // Stop collection thread
240 1 : harvester.stop_collection_thread();
241 2 : }
242 :
243 2 : BOOST_AUTO_TEST_CASE(GetLatestResultsEmpty)
244 : {
245 1 : TPGInternalStateHarvester harvester;
246 :
247 : // Get results without starting thread or triggering harvest
248 1 : auto results = harvester.get_latest_results();
249 1 : BOOST_REQUIRE(results.empty());
250 1 : }
251 :
252 2 : BOOST_AUTO_TEST_CASE(ConcurrentAccess)
253 : {
254 1 : TPGInternalStateHarvester harvester;
255 :
256 : // Setup mock processor
257 1 : std::vector<std::string> metric_names = {"baseline"};
258 1 : std::vector<std::array<int16_t, 16>> metric_values = {
259 : {{100, 101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, 115}}
260 1 : };
261 :
262 1 : auto proc = std::make_shared<MockProcessor>(metric_names, metric_values);
263 2 : std::vector<TPGInternalStateHarvester::ProcRef> refs = {{proc, 0}};
264 1 : harvester.set_processor_references(refs);
265 :
266 1 : std::vector<std::pair<channel_t, int16_t>> channel_plane_numbers;
267 17 : for (int i = 0; i < 16; ++i) {
268 16 : channel_plane_numbers.push_back({100 + i, 0});
269 : }
270 1 : harvester.update_channel_plane_numbers(channel_plane_numbers, 16, 1);
271 :
272 : // Start collection thread
273 1 : harvester.start_collection_thread();
274 :
275 : // Give thread time to start
276 1 : std::this_thread::sleep_for(std::chrono::milliseconds(10));
277 :
278 : // Spawn multiple threads to trigger harvests and read results
279 1 : std::vector<std::thread> threads;
280 1 : std::atomic<int> success_count{0};
281 :
282 4 : for (int i = 0; i < 3; ++i) {
283 6 : threads.emplace_back([&harvester, &success_count]() {
284 18 : for (int j = 0; j < 5; ++j) {
285 15 : harvester.trigger_harvest();
286 15 : std::this_thread::sleep_for(std::chrono::milliseconds(20));
287 15 : auto results = harvester.get_latest_results();
288 15 : if (!results.empty()) {
289 15 : success_count++;
290 : }
291 15 : }
292 3 : });
293 : }
294 :
295 : // Wait for all threads to complete
296 4 : for (auto& t : threads) {
297 3 : t.join();
298 : }
299 :
300 : // Stop collection thread
301 1 : harvester.stop_collection_thread();
302 :
303 : // Verify that at least some operations succeeded
304 2 : BOOST_REQUIRE(success_count.load() > 0);
305 2 : }
306 :
307 2 : BOOST_AUTO_TEST_CASE(EmptyProcessorReferences)
308 : {
309 1 : TPGInternalStateHarvester harvester;
310 :
311 : // Set empty processor references
312 1 : std::vector<TPGInternalStateHarvester::ProcRef> empty_refs;
313 1 : harvester.set_processor_references(empty_refs);
314 :
315 : // Setup channel-plane mapping
316 1 : std::vector<std::pair<channel_t, int16_t>> channel_plane_numbers;
317 17 : for (int i = 0; i < 16; ++i) {
318 16 : channel_plane_numbers.push_back({100 + i, 0});
319 : }
320 1 : harvester.update_channel_plane_numbers(channel_plane_numbers, 16, 1);
321 :
322 : // Perform harvest - should handle empty processors gracefully
323 1 : auto results = harvester.harvest_once();
324 1 : BOOST_REQUIRE(results.empty());
325 1 : }
326 :
327 2 : BOOST_AUTO_TEST_CASE(MismatchedMetricSizes)
328 : {
329 1 : TPGInternalStateHarvester harvester;
330 :
331 : // Create processor with mismatched metric names and values (3 names, 2 values)
332 1 : std::vector<std::string> metric_names = {"baseline", "accumulator", "threshold"};
333 1 : std::vector<std::array<int16_t, 16>> metric_values = {
334 : {{100, 101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, 115}},
335 : {{200, 201, 202, 203, 204, 205, 206, 207, 208, 209, 210, 211, 212, 213, 214, 215}}
336 : // Missing third metric value - this causes size mismatch
337 1 : };
338 :
339 1 : auto proc = std::make_shared<MockProcessor>(metric_names, metric_values);
340 2 : std::vector<TPGInternalStateHarvester::ProcRef> refs = {{proc, 0}};
341 1 : harvester.set_processor_references(refs);
342 :
343 1 : std::vector<std::pair<channel_t, int16_t>> channel_plane_numbers;
344 17 : for (int i = 0; i < 16; ++i) {
345 16 : channel_plane_numbers.push_back({100 + i, 0});
346 : }
347 1 : harvester.update_channel_plane_numbers(channel_plane_numbers, 16, 1);
348 :
349 : // Perform harvest - should skip processor when sizes don't match
350 1 : auto results = harvester.harvest_once();
351 : // When metric names and values don't match, the entire processor is skipped
352 1 : BOOST_REQUIRE_EQUAL(results.size(), 0);
353 2 : }
354 :
355 2 : BOOST_AUTO_TEST_CASE(StressTest)
356 : {
357 1 : TPGInternalStateHarvester harvester;
358 :
359 : // Create multiple processors with random data
360 1 : std::vector<std::string> metric_names = {"baseline", "accumulator", "threshold"};
361 1 : std::random_device rd;
362 1 : std::mt19937 gen(rd());
363 1 : std::uniform_int_distribution<> dis(0, 1000);
364 :
365 1 : std::vector<TPGInternalStateHarvester::ProcRef> refs;
366 1 : std::vector<std::pair<channel_t, int16_t>> channel_plane_numbers;
367 :
368 5 : for (int pipeline = 0; pipeline < 4; ++pipeline) {
369 4 : std::vector<std::array<int16_t, 16>> metric_values;
370 16 : for (const auto& name : metric_names) {
371 : std::array<int16_t, 16> values;
372 204 : for (int i = 0; i < 16; ++i) {
373 192 : values[i] = dis(gen);
374 : }
375 12 : metric_values.push_back(values);
376 : }
377 :
378 4 : auto proc = std::make_shared<MockProcessor>(metric_names, metric_values);
379 4 : refs.push_back({proc, pipeline});
380 :
381 68 : for (int i = 0; i < 16; ++i) {
382 64 : channel_plane_numbers.push_back({100 + pipeline * 16 + i, pipeline % 3});
383 : }
384 4 : }
385 :
386 1 : harvester.set_processor_references(refs);
387 1 : harvester.update_channel_plane_numbers(channel_plane_numbers, 16, 4);
388 :
389 : // Perform multiple harvests
390 101 : for (int i = 0; i < 100; ++i) {
391 100 : auto results = harvester.harvest_once();
392 100 : BOOST_REQUIRE_EQUAL(results.size(), 64); // 4 pipelines * 16 channels
393 :
394 : // Verify some random channels
395 1100 : for (int j = 0; j < 10; ++j) {
396 1000 : channel_t test_channel = 100 + (j % 64);
397 1000 : auto it = results.find(test_channel);
398 1000 : BOOST_REQUIRE(it != results.end());
399 1000 : BOOST_REQUIRE_EQUAL(it->second.size(), 3); // 3 metrics
400 : }
401 100 : }
402 1 : }
403 :
404 2 : BOOST_AUTO_TEST_CASE(ExactValueCollectionVerification)
405 : {
406 1 : TPGInternalStateHarvester harvester;
407 :
408 : // Create processor with known, specific values
409 1 : std::vector<std::string> metric_names = {"baseline", "accumulator", "threshold"};
410 1 : std::vector<std::array<int16_t, 16>> metric_values = {
411 : {{-100, -99, -98, -97, -96, -95, -94, -93, -92, -91, -90, -89, -88, -87, -86, -85}}, // baseline
412 : {{1000, 1001, 1002, 1003, 1004, 1005, 1006, 1007, 1008, 1009, 1010, 1011, 1012, 1013, 1014, 1015}}, // accumulator
413 : {{50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65}} // threshold
414 1 : };
415 :
416 1 : auto proc = std::make_shared<MockProcessor>(metric_names, metric_values);
417 2 : std::vector<TPGInternalStateHarvester::ProcRef> refs = {{proc, 0}};
418 1 : harvester.set_processor_references(refs);
419 :
420 : // Setup channel mapping: channels 100-115 map to lanes 0-15
421 1 : std::vector<std::pair<channel_t, int16_t>> channel_plane_numbers;
422 17 : for (int i = 0; i < 16; ++i) {
423 16 : channel_plane_numbers.push_back({100 + i, 0});
424 : }
425 1 : harvester.update_channel_plane_numbers(channel_plane_numbers, 16, 1);
426 :
427 : // Harvest and verify exact values
428 1 : auto results = harvester.harvest_once();
429 1 : BOOST_REQUIRE_EQUAL(results.size(), 16);
430 :
431 : // Verify each channel has exactly 3 metrics
432 17 : for (int i = 0; i < 16; ++i) {
433 16 : channel_t channel = 100 + i;
434 16 : auto it = results.find(channel);
435 16 : BOOST_REQUIRE(it != results.end());
436 16 : BOOST_REQUIRE_EQUAL(it->second.size(), 3);
437 :
438 : // Verify metric names and exact values
439 16 : const auto& metrics = it->second;
440 :
441 : // Find each metric by name and verify its value
442 64 : for (const auto& [name, value] : metrics) {
443 48 : if (name == "baseline") {
444 16 : BOOST_REQUIRE_EQUAL(value, -100 + i); // lane i corresponds to channel 100+i
445 32 : } else if (name == "accumulator") {
446 16 : BOOST_REQUIRE_EQUAL(value, 1000 + i);
447 16 : } else if (name == "threshold") {
448 16 : BOOST_REQUIRE_EQUAL(value, 50 + i);
449 : } else {
450 0 : BOOST_FAIL("Unexpected metric name: " << name);
451 : }
452 : }
453 : }
454 2 : }
455 :
456 2 : BOOST_AUTO_TEST_CASE(BoundaryValueCollection)
457 : {
458 1 : TPGInternalStateHarvester harvester;
459 :
460 : // Test with boundary values (min/max int16_t)
461 1 : std::vector<std::string> metric_names = {"extreme_values"};
462 1 : std::vector<std::array<int16_t, 16>> metric_values = {
463 : {{INT16_MIN, INT16_MIN + 1, -1, 0, 1, INT16_MAX - 1, INT16_MAX,
464 : INT16_MIN, INT16_MIN + 1, -1, 0, 1, INT16_MAX - 1, INT16_MAX, INT16_MIN, INT16_MAX}}
465 1 : };
466 :
467 1 : auto proc = std::make_shared<MockProcessor>(metric_names, metric_values);
468 2 : std::vector<TPGInternalStateHarvester::ProcRef> refs = {{proc, 0}};
469 1 : harvester.set_processor_references(refs);
470 :
471 1 : std::vector<std::pair<channel_t, int16_t>> channel_plane_numbers;
472 17 : for (int i = 0; i < 16; ++i) {
473 16 : channel_plane_numbers.push_back({200 + i, 0});
474 : }
475 1 : harvester.update_channel_plane_numbers(channel_plane_numbers, 16, 1);
476 :
477 1 : auto results = harvester.harvest_once();
478 1 : BOOST_REQUIRE_EQUAL(results.size(), 16);
479 :
480 : // Verify boundary values are preserved exactly
481 1 : std::array<int16_t, 16> expected_values = {
482 : INT16_MIN, INT16_MIN + 1, -1, 0, 1, INT16_MAX - 1, INT16_MAX,
483 : INT16_MIN, INT16_MIN + 1, -1, 0, 1, INT16_MAX - 1, INT16_MAX, INT16_MIN, INT16_MAX
484 : };
485 :
486 17 : for (int i = 0; i < 16; ++i) {
487 16 : channel_t channel = 200 + i;
488 16 : auto it = results.find(channel);
489 16 : BOOST_REQUIRE(it != results.end());
490 16 : BOOST_REQUIRE_EQUAL(it->second.size(), 1);
491 16 : BOOST_REQUIRE_EQUAL(it->second[0].first, "extreme_values");
492 16 : BOOST_REQUIRE_EQUAL(it->second[0].second, expected_values[i]);
493 : }
494 2 : }
495 :
496 2 : BOOST_AUTO_TEST_CASE(MultiPipelineValueMapping)
497 : {
498 1 : TPGInternalStateHarvester harvester;
499 :
500 : // Create two processors with different values
501 1 : std::vector<std::string> metric_names = {"pipeline_id"};
502 :
503 : // Pipeline 0: values 100-115
504 1 : std::vector<std::array<int16_t, 16>> metric_values_pipeline0 = {
505 : {{100, 101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, 115}}
506 1 : };
507 :
508 : // Pipeline 1: values 200-215
509 1 : std::vector<std::array<int16_t, 16>> metric_values_pipeline1 = {
510 : {{200, 201, 202, 203, 204, 205, 206, 207, 208, 209, 210, 211, 212, 213, 214, 215}}
511 1 : };
512 :
513 1 : auto proc0 = std::make_shared<MockProcessor>(metric_names, metric_values_pipeline0);
514 1 : auto proc1 = std::make_shared<MockProcessor>(metric_names, metric_values_pipeline1);
515 :
516 1 : std::vector<TPGInternalStateHarvester::ProcRef> refs = {
517 3 : {proc0, 0}, {proc1, 1}
518 3 : };
519 1 : harvester.set_processor_references(refs);
520 :
521 : // Setup channels: 300-315 for pipeline 0, 400-415 for pipeline 1
522 1 : std::vector<std::pair<channel_t, int16_t>> channel_plane_numbers;
523 17 : for (int i = 0; i < 16; ++i) {
524 16 : channel_plane_numbers.push_back({300 + i, 0}); // pipeline 0
525 : }
526 17 : for (int i = 0; i < 16; ++i) {
527 16 : channel_plane_numbers.push_back({400 + i, 1}); // pipeline 1
528 : }
529 1 : harvester.update_channel_plane_numbers(channel_plane_numbers, 16, 2);
530 :
531 1 : auto results = harvester.harvest_once();
532 1 : BOOST_REQUIRE_EQUAL(results.size(), 32);
533 :
534 : // Verify pipeline 0 values (channels 300-315)
535 17 : for (int i = 0; i < 16; ++i) {
536 16 : channel_t channel = 300 + i;
537 16 : auto it = results.find(channel);
538 16 : BOOST_REQUIRE(it != results.end());
539 16 : BOOST_REQUIRE_EQUAL(it->second.size(), 1);
540 16 : BOOST_REQUIRE_EQUAL(it->second[0].first, "pipeline_id");
541 16 : BOOST_REQUIRE_EQUAL(it->second[0].second, 100 + i);
542 : }
543 :
544 : // Verify pipeline 1 values (channels 400-415)
545 17 : for (int i = 0; i < 16; ++i) {
546 16 : channel_t channel = 400 + i;
547 16 : auto it = results.find(channel);
548 16 : BOOST_REQUIRE(it != results.end());
549 16 : BOOST_REQUIRE_EQUAL(it->second.size(), 1);
550 16 : BOOST_REQUIRE_EQUAL(it->second[0].first, "pipeline_id");
551 16 : BOOST_REQUIRE_EQUAL(it->second[0].second, 200 + i);
552 : }
553 2 : }
554 :
555 2 : BOOST_AUTO_TEST_CASE(ValueConsistencyAcrossHarvests)
556 : {
557 1 : TPGInternalStateHarvester harvester;
558 :
559 : // Create processor with fixed values
560 1 : std::vector<std::string> metric_names = {"stable_metric"};
561 1 : std::vector<std::array<int16_t, 16>> metric_values = {
562 : {{42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57}}
563 1 : };
564 :
565 1 : auto proc = std::make_shared<MockProcessor>(metric_names, metric_values);
566 2 : std::vector<TPGInternalStateHarvester::ProcRef> refs = {{proc, 0}};
567 1 : harvester.set_processor_references(refs);
568 :
569 1 : std::vector<std::pair<channel_t, int16_t>> channel_plane_numbers;
570 17 : for (int i = 0; i < 16; ++i) {
571 16 : channel_plane_numbers.push_back({500 + i, 0});
572 : }
573 1 : harvester.update_channel_plane_numbers(channel_plane_numbers, 16, 1);
574 :
575 : // Perform multiple harvests and verify values remain consistent
576 11 : for (int harvest = 0; harvest < 10; ++harvest) {
577 10 : auto results = harvester.harvest_once();
578 10 : BOOST_REQUIRE_EQUAL(results.size(), 16);
579 :
580 : // Verify all values are exactly as expected
581 170 : for (int i = 0; i < 16; ++i) {
582 160 : channel_t channel = 500 + i;
583 160 : auto it = results.find(channel);
584 160 : BOOST_REQUIRE(it != results.end());
585 160 : BOOST_REQUIRE_EQUAL(it->second.size(), 1);
586 160 : BOOST_REQUIRE_EQUAL(it->second[0].first, "stable_metric");
587 160 : BOOST_REQUIRE_EQUAL(it->second[0].second, 42 + i);
588 : }
589 10 : }
590 2 : }
591 :
592 2 : BOOST_AUTO_TEST_CASE(MetricNameValuePairOrdering)
593 : {
594 1 : TPGInternalStateHarvester harvester;
595 :
596 : // Create processor with multiple metrics in specific order
597 1 : std::vector<std::string> metric_names = {"first", "second", "third", "fourth"};
598 1 : std::vector<std::array<int16_t, 16>> metric_values = {
599 : {{1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1}}, // first
600 : {{2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2}}, // second
601 : {{3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3}}, // third
602 : {{4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4}} // fourth
603 1 : };
604 :
605 1 : auto proc = std::make_shared<MockProcessor>(metric_names, metric_values);
606 2 : std::vector<TPGInternalStateHarvester::ProcRef> refs = {{proc, 0}};
607 1 : harvester.set_processor_references(refs);
608 :
609 1 : std::vector<std::pair<channel_t, int16_t>> channel_plane_numbers;
610 17 : for (int i = 0; i < 16; ++i) {
611 16 : channel_plane_numbers.push_back({600 + i, 0});
612 : }
613 1 : harvester.update_channel_plane_numbers(channel_plane_numbers, 16, 1);
614 :
615 1 : auto results = harvester.harvest_once();
616 1 : BOOST_REQUIRE_EQUAL(results.size(), 16);
617 :
618 : // Verify metric ordering is preserved
619 17 : for (int i = 0; i < 16; ++i) {
620 16 : channel_t channel = 600 + i;
621 16 : auto it = results.find(channel);
622 16 : BOOST_REQUIRE(it != results.end());
623 16 : BOOST_REQUIRE_EQUAL(it->second.size(), 4);
624 :
625 : // Verify order and values
626 16 : BOOST_REQUIRE_EQUAL(it->second[0].first, "first");
627 16 : BOOST_REQUIRE_EQUAL(it->second[0].second, 1);
628 16 : BOOST_REQUIRE_EQUAL(it->second[1].first, "second");
629 16 : BOOST_REQUIRE_EQUAL(it->second[1].second, 2);
630 16 : BOOST_REQUIRE_EQUAL(it->second[2].first, "third");
631 16 : BOOST_REQUIRE_EQUAL(it->second[2].second, 3);
632 16 : BOOST_REQUIRE_EQUAL(it->second[3].first, "fourth");
633 16 : BOOST_REQUIRE_EQUAL(it->second[3].second, 4);
634 : }
635 2 : }
636 :
637 2 : BOOST_AUTO_TEST_CASE(AsyncValueCollectionVerification)
638 : {
639 1 : TPGInternalStateHarvester harvester;
640 :
641 : // Setup processor with known values
642 1 : std::vector<std::string> metric_names = {"async_metric"};
643 1 : std::vector<std::array<int16_t, 16>> metric_values = {
644 : {{777, 778, 779, 780, 781, 782, 783, 784, 785, 786, 787, 788, 789, 790, 791, 792}}
645 1 : };
646 :
647 1 : auto proc = std::make_shared<MockProcessor>(metric_names, metric_values);
648 2 : std::vector<TPGInternalStateHarvester::ProcRef> refs = {{proc, 0}};
649 1 : harvester.set_processor_references(refs);
650 :
651 1 : std::vector<std::pair<channel_t, int16_t>> channel_plane_numbers;
652 17 : for (int i = 0; i < 16; ++i) {
653 16 : channel_plane_numbers.push_back({700 + i, 0});
654 : }
655 1 : harvester.update_channel_plane_numbers(channel_plane_numbers, 16, 1);
656 :
657 : // Start collection thread
658 1 : harvester.start_collection_thread();
659 1 : std::this_thread::sleep_for(std::chrono::milliseconds(10));
660 :
661 : // Trigger harvest and wait for results
662 1 : harvester.trigger_harvest();
663 1 : std::this_thread::sleep_for(std::chrono::milliseconds(50));
664 :
665 1 : auto results = harvester.get_latest_results();
666 1 : BOOST_REQUIRE_EQUAL(results.size(), 16);
667 :
668 : // Verify exact values were collected asynchronously
669 17 : for (int i = 0; i < 16; ++i) {
670 16 : channel_t channel = 700 + i;
671 16 : auto it = results.find(channel);
672 16 : BOOST_REQUIRE(it != results.end());
673 16 : BOOST_REQUIRE_EQUAL(it->second.size(), 1);
674 16 : BOOST_REQUIRE_EQUAL(it->second[0].first, "async_metric");
675 16 : BOOST_REQUIRE_EQUAL(it->second[0].second, 777 + i);
676 : }
677 :
678 1 : harvester.stop_collection_thread();
679 2 : }
680 :
681 2 : BOOST_AUTO_TEST_CASE(ZeroValueCollection)
682 : {
683 1 : TPGInternalStateHarvester harvester;
684 :
685 : // Test with all zero values
686 1 : std::vector<std::string> metric_names = {"zero_metric"};
687 1 : std::vector<std::array<int16_t, 16>> metric_values = {
688 : {{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}}
689 1 : };
690 :
691 1 : auto proc = std::make_shared<MockProcessor>(metric_names, metric_values);
692 2 : std::vector<TPGInternalStateHarvester::ProcRef> refs = {{proc, 0}};
693 1 : harvester.set_processor_references(refs);
694 :
695 1 : std::vector<std::pair<channel_t, int16_t>> channel_plane_numbers;
696 17 : for (int i = 0; i < 16; ++i) {
697 16 : channel_plane_numbers.push_back({800 + i, 0});
698 : }
699 1 : harvester.update_channel_plane_numbers(channel_plane_numbers, 16, 1);
700 :
701 1 : auto results = harvester.harvest_once();
702 1 : BOOST_REQUIRE_EQUAL(results.size(), 16);
703 :
704 : // Verify all values are exactly zero
705 17 : for (int i = 0; i < 16; ++i) {
706 16 : channel_t channel = 800 + i;
707 16 : auto it = results.find(channel);
708 16 : BOOST_REQUIRE(it != results.end());
709 16 : BOOST_REQUIRE_EQUAL(it->second.size(), 1);
710 16 : BOOST_REQUIRE_EQUAL(it->second[0].first, "zero_metric");
711 16 : BOOST_REQUIRE_EQUAL(it->second[0].second, 0);
712 : }
713 2 : }
714 :
715 : BOOST_AUTO_TEST_SUITE_END()
|