Line data Source code
1 : /**
2 : * @file datahandlinglibs_DataHandlingModel_test.cxx Unit Tests for DataHandlingModel
3 : *
4 : * This is part of the DUNE DAQ Application Framework, copyright 2020.
5 : * Licensing/copyright details are in the COPYING file that you should have
6 : * received with this code.
7 : */
8 :
9 : #define BOOST_TEST_MODULE datahandlinglibs_DataHandlingModel_test // NOLINT
10 :
11 : #include "boost/test/unit_test.hpp"
12 :
13 : #include "datahandlinglibs/ReadoutTypes.hpp"
14 : #include "datahandlinglibs/models/SkipListLatencyBufferModel.hpp"
15 : #include "datahandlinglibs/testutils/UnitTestUtilities.hpp"
16 :
17 : #include <folly/futures/ManualTimekeeper.h>
18 :
19 : #include <memory>
20 : #include <utility>
21 :
22 : BOOST_AUTO_TEST_SUITE(datahandlinglibs_DataHandlingModel_test)
23 :
24 : using namespace dunedaq::datahandlinglibs;
25 :
26 : using ReadoutType = types::DUMMY_FRAME_STRUCT;
27 :
28 2 : BOOST_AUTO_TEST_CASE(datahandlinglibs_DataHandlingModel_run_postprocess_scheduler_timeout)
29 : {
30 1 : std::atomic<bool> run_marker = true;
31 :
32 1 : auto model =
33 : unittest::MockDataHandlingModel<ReadoutType,
34 : DefaultRequestHandlerModel<ReadoutType, SkipListLatencyBufferModel<ReadoutType>>,
35 : SkipListLatencyBufferModel<ReadoutType>,
36 1 : TaskRawDataProcessorModel<ReadoutType>>(run_marker);
37 :
38 1 : auto buffer = std::make_shared<SkipListLatencyBufferModel<ReadoutType>>(); // Empty buffer
39 :
40 1 : constexpr bool post_processing_enabled = true;
41 1 : auto error_registry = std::make_unique<FrameErrorRegistry>();
42 :
43 1 : auto raw_processor =
44 1 : std::make_shared<TaskRawDataProcessorModel<ReadoutType>>(error_registry, post_processing_enabled);
45 :
46 1 : auto timekeeper = std::make_unique<folly::ManualTimekeeper>();
47 1 : auto* timekeeper_ptr = timekeeper.get();
48 :
49 1 : constexpr uint64_t delay_max_wait = 2; // NOLINT(build/unsigned)
50 :
51 3 : std::thread coro_thread([&]() {
52 1 : model.test_run_postprocess_scheduler(buffer, raw_processor, std::move(timekeeper), delay_max_wait);
53 2 : });
54 :
55 : // Wait for coroutine to start then timeout to get registered
56 2 : while (timekeeper_ptr->numScheduled() == 0) {
57 1 : std::this_thread::sleep_for(1ms);
58 : }
59 : // Safe-guard for the delay between timeout registration and coroutine suspension
60 : // If the test is failing, consider a longer sleep or a better way to synchronize
61 1 : std::this_thread::sleep_for(1ms);
62 1 : timekeeper_ptr->advance(std::chrono::milliseconds{ delay_max_wait }); // Trigger a timeout
63 :
64 1 : model.set_run_marker(false); // Let coroutine end
65 1 : coro_thread.join(); // The test will stuck here if timeout is not triggered (because of folly::coro::blockingWait)
66 :
67 1 : BOOST_REQUIRE_EQUAL(model.get_num_post_processing_delay_max_waits(), 1);
68 1 : }
69 :
70 2 : BOOST_AUTO_TEST_CASE(datahandlinglibs_DataHandlingModel_PostprocessScheduleAlgorithm_timeout)
71 : {
72 1 : std::atomic<bool> run_marker = true;
73 :
74 1 : auto model =
75 : unittest::MockDataHandlingModel<ReadoutType,
76 : DefaultRequestHandlerModel<ReadoutType, SkipListLatencyBufferModel<ReadoutType>>,
77 : SkipListLatencyBufferModel<ReadoutType>,
78 1 : TaskRawDataProcessorModel<ReadoutType>>(run_marker);
79 :
80 1 : auto buffer = std::make_shared<SkipListLatencyBufferModel<ReadoutType>>();
81 :
82 6 : for (int i = 1; i < 6; i++) {
83 5 : ReadoutType frame{};
84 5 : frame.timestamp = i * 62500;
85 5 : buffer->write(std::move(frame));
86 : }
87 :
88 1 : constexpr bool post_processing_enabled = true;
89 1 : auto error_registry = std::make_unique<FrameErrorRegistry>();
90 :
91 1 : auto raw_processor =
92 1 : std::make_shared<TaskRawDataProcessorModel<ReadoutType>>(error_registry, post_processing_enabled);
93 :
94 1 : constexpr uint64_t delay_ticks = 4 * 62500; // NOLINT(build/unsigned)
95 1 : constexpr uint64_t delay_min_wait = 1; // NOLINT(build/unsigned)
96 1 : constexpr uint64_t delay_max_wait = 2; // NOLINT(build/unsigned)
97 :
98 1 : typename decltype(model)::PostprocessScheduleAlgorithm sched_algo{
99 1 : *buffer, *raw_processor, delay_ticks, delay_min_wait, delay_max_wait
100 1 : };
101 :
102 : // First pass
103 1 : bool timeout = false;
104 1 : int processed_count = sched_algo.run(timeout);
105 : // Buffer = {1, 2, 3, 4, 5} delay_ticks = 4
106 : // 5 - 1 > 4 is false => no postprocessing
107 1 : BOOST_REQUIRE_EQUAL(processed_count, 0);
108 :
109 1 : timeout = true;
110 : // 1st timeout => timeout_accumulated = 1 * 2 (delay_max_wait = 2)
111 : // end_win_ts = 5 - 4 + 2 => postprocess until 3 {1, 2}
112 1 : processed_count += sched_algo.run(timeout);
113 1 : BOOST_REQUIRE_EQUAL(processed_count, 2);
114 :
115 : // 2nd timeout => timeout_accumulated = 2 * 2
116 : // end_win_ts = 5 - 4 + 4 => postprocess until 5 {3, 4}
117 1 : processed_count += sched_algo.run(timeout);
118 1 : BOOST_REQUIRE_EQUAL(processed_count, 4);
119 :
120 : // 3rd timeout => timeout_accumulated = 3 * 2
121 : // end_win_ts = 5 - 4 + 6 => postprocess until 6 (capped to newest_ts + 1) {5}
122 1 : processed_count += sched_algo.run(timeout);
123 1 : BOOST_REQUIRE_EQUAL(processed_count, 5);
124 :
125 : // 4th timeout
126 : // m_processed_up_to.timestamp = newest_ts + 1 => nothing to postprocess (at cap)
127 1 : processed_count += sched_algo.run(timeout);
128 1 : BOOST_REQUIRE_EQUAL(processed_count, 5);
129 1 : }
130 :
131 2 : BOOST_AUTO_TEST_CASE(datahandlinglibs_DataHandlingModel_PostprocessScheduleAlgorithm_data_arrives_after_fully_processed_with_timeout)
132 : {
133 1 : std::atomic<bool> run_marker = true;
134 :
135 1 : auto model =
136 : unittest::MockDataHandlingModel<ReadoutType,
137 : DefaultRequestHandlerModel<ReadoutType, SkipListLatencyBufferModel<ReadoutType>>,
138 : SkipListLatencyBufferModel<ReadoutType>,
139 1 : TaskRawDataProcessorModel<ReadoutType>>(run_marker);
140 :
141 1 : auto buffer = std::make_shared<SkipListLatencyBufferModel<ReadoutType>>();
142 :
143 3 : for (int i = 1; i < 3; i++) {
144 2 : ReadoutType frame{};
145 2 : frame.timestamp = i * 62500;
146 2 : buffer->write(std::move(frame));
147 : }
148 :
149 1 : {
150 1 : ReadoutType frame{};
151 1 : frame.timestamp = 4 * 62500;
152 1 : buffer->write(std::move(frame));
153 : }
154 :
155 1 : constexpr bool post_processing_enabled = true;
156 1 : auto error_registry = std::make_unique<FrameErrorRegistry>();
157 :
158 1 : auto raw_processor =
159 1 : std::make_shared<TaskRawDataProcessorModel<ReadoutType>>(error_registry, post_processing_enabled);
160 :
161 1 : constexpr uint64_t delay_ticks = 1 * 62500; // NOLINT(build/unsigned)
162 1 : constexpr uint64_t delay_min_wait = 1; // NOLINT(build/unsigned)
163 1 : constexpr uint64_t delay_max_wait = 2; // NOLINT(build/unsigned)
164 :
165 1 : typename decltype(model)::PostprocessScheduleAlgorithm sched_algo{
166 1 : *buffer, *raw_processor, delay_ticks, delay_min_wait, delay_max_wait
167 1 : };
168 :
169 1 : bool timeout = true;
170 1 : int processed_count = sched_algo.run(timeout);
171 : // Buffer = {1, 2, 4} delay_ticks = 1
172 : // 1st timeout => timeout_accumulated = 1 * 2 (delay_max_wait = 2)
173 : // end_win_ts = 4 - 1 + 2 => postprocess until 5 {1, 2, 4}
174 1 : BOOST_REQUIRE_EQUAL(processed_count, 3);
175 :
176 1 : {
177 1 : ReadoutType frame{};
178 1 : frame.timestamp = 3 * 62500;
179 1 : buffer->write(std::move(frame));
180 : }
181 : // Buffer = {1, 2, 3, 4}
182 :
183 : // To not trigger the "too fast" case
184 1 : std::this_thread::sleep_for(std::chrono::milliseconds(delay_min_wait + 1));
185 :
186 1 : timeout = false;
187 : // m_processed_up_to.timestamp = newest_ts + 1 => nothing to postprocess (data arrived too late)
188 1 : processed_count += sched_algo.run(timeout);
189 1 : BOOST_REQUIRE_EQUAL(processed_count, 3);
190 1 : }
191 :
192 : BOOST_AUTO_TEST_SUITE_END()
|