Line data Source code
1 : /**
2 : *
3 : * @file queue_IO_check.cxx
4 : *
5 : * A low-level test of queue classes which implement the Queue
6 : * interface. We have a user-settable number of threads writing
7 : * elements to a queue while a user-settable number of threads reads
8 : * from the queue
9 : *
10 : * Run "queue_IO_check --help" to see options
11 : *
12 : * This is part of the DUNE DAQ Application Framework, copyright 2020.
13 : * Licensing/copyright details are in the COPYING file that you should have
14 : * received with this code.
15 : */
16 :
17 : #include "iomanager/queue/FollyQueue.hpp"
18 : #include "iomanager/queue/StdDeQueue.hpp"
19 :
20 : #include "logging/Logging.hpp"
21 :
22 : #include "boost/program_options.hpp"
23 : namespace bpo = boost::program_options;
24 :
25 : #include <chrono>
26 : #include <future>
27 : #include <iostream>
28 : #include <memory>
29 : #include <random>
30 : #include <sstream>
31 : #include <stdexcept>
32 : #include <string>
33 : #include <thread>
34 : #include <utility>
35 : #include <vector>
36 :
37 : namespace dunedaq {
38 : // Disable coverage collection LCOV_EXCL_START
39 : ERS_DECLARE_ISSUE(iomanager, ///< Namespace
40 : ParameterDomainIssue, ///< Issue class name
41 : "ParameterDomainIssue: \"" << ers_messg << "\"",
42 : ((std::string)ers_messg))
43 : // Re-enable coverage collection LCOV_EXCL_STOP
44 : } // namespace dunedaq
45 :
46 : namespace {
47 :
48 : /**
49 : * @brief Configuration of the test, derived from command-line options
50 : */
51 : struct test_config
52 : {
53 :
54 : std::chrono::milliseconds timeout = std::chrono::milliseconds(100); ///< Queue's timeout
55 :
56 : /**
57 : * @brief Queue instance for test
58 : */
59 : std::unique_ptr<dunedaq::iomanager::Queue<int>> queue = nullptr;
60 :
61 : int num_elements = 1000000; ///< Number of elements to push to the Queue (total)
62 : int num_adding_threads = 1; ///< Number of threads which will call push
63 : int num_removing_threads = 1; ///< Number of threads which will call pop
64 :
65 : int avg_milliseconds_between_pushes = 0; ///< Target average rate of pushes
66 : int avg_milliseconds_between_pops = 0; ///< Target average rate of pops
67 :
68 : // The enable_ options, when set to true, contain code that executes
69 : // for each push/pop, which will of course affect the overall
70 : // execution time of the threads while also adding info about the
71 : // behavior of the system
72 :
73 : bool enable_per_pushpop_timing = true;
74 : bool enable_max_size_checking = true;
75 : };
76 :
77 : /**
78 : * @brief Results of the test
79 : */
80 : struct test_results
81 : {
82 : std::atomic<int> queue_size = 0; ///< Queue's current size
83 : std::atomic<int> max_queue_size = 0; ///< Queue's maximum size
84 :
85 : std::atomic<int> timeout_pushes = 0; ///< Number of pushes which timed out
86 : std::atomic<int> timeout_pops = 0; ///< Number of pops which timed out
87 : std::atomic<int> throw_pushes = 0; ///< Number of pushes which threw an exception
88 : std::atomic<int> throw_pops = 0; ///< Number of pops which threw an exception
89 :
90 : /**
91 : * @brief A time-based seed for the random number generators
92 : */
93 : std::chrono::milliseconds now_ms =
94 : std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch());
95 : uint64_t relatively_random_seed = static_cast<uint64_t>(now_ms.count() % 1000); // NOLINT
96 : std::default_random_engine generator{ relatively_random_seed }; ///< Random number generator with time-based seed
97 : std::unique_ptr<std::uniform_int_distribution<int>> push_distribution =
98 : nullptr; ///< Random number distribution to use for push waits
99 : std::unique_ptr<std::uniform_int_distribution<int>> pop_distribution =
100 : nullptr; ///< Random number distribution to use for pop waits
101 : };
102 :
103 : /**
104 : * @brief Put elements onto the queue
105 : */
106 : void
107 0 : add_things(test_config const& config, test_results& results, const volatile bool& spinlock)
108 : {
109 0 : const int num_pushes = config.num_elements / config.num_adding_threads;
110 0 : auto start_time_push = std::chrono::steady_clock::now(); // Won't ever use the initialization value
111 0 : auto size_snapshot = results.queue_size.load(); // Unlike queue_size, only this thread writes to size_snapshot
112 :
113 0 : while (spinlock) {
114 : } // Main program thread will set this to false, then this thread starts pushing
115 :
116 0 : const auto start_time = std::chrono::steady_clock::now();
117 0 : const auto start_time_system =
118 0 : std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
119 :
120 0 : for (int i = 0; i < num_pushes; ++i) {
121 :
122 0 : if (config.avg_milliseconds_between_pushes > 0) {
123 0 : std::this_thread::sleep_for(std::chrono::milliseconds((*results.push_distribution)(results.generator)));
124 : }
125 :
126 0 : while (!config.queue->can_push()) {
127 0 : std::this_thread::sleep_for(std::chrono::milliseconds(1));
128 : }
129 :
130 0 : while (true) {
131 0 : try {
132 0 : auto i_copy = i;
133 0 : if (!config.enable_per_pushpop_timing) {
134 0 : config.queue->push(std::move(i_copy), config.timeout); // NOLINT
135 : } else {
136 0 : start_time_push = std::chrono::steady_clock::now();
137 0 : config.queue->push(std::move(i_copy), config.timeout); // NOLINT
138 0 : if (std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() -
139 0 : start_time_push) > config.timeout) {
140 0 : results.timeout_pushes++;
141 : }
142 : }
143 :
144 0 : if (config.enable_max_size_checking) {
145 0 : size_snapshot = results.queue_size.fetch_add(1) + 1; // fetch_add returns previous value
146 :
147 0 : if (size_snapshot > results.max_queue_size) {
148 0 : results.max_queue_size = size_snapshot;
149 : }
150 : }
151 :
152 0 : break;
153 0 : } catch (const dunedaq::iomanager::QueueTimeoutExpired& err) {
154 0 : results.throw_pushes++;
155 0 : std::ostringstream msg;
156 0 : msg << "Thread #" << std::this_thread::get_id() << ": exception thrown on push #" << i << ": " << err.what();
157 0 : TLOG(TLVL_WARNING) << msg.str();
158 0 : }
159 : }
160 : }
161 :
162 0 : std::ostringstream msg;
163 0 : msg << "Thread #" << std::this_thread::get_id() << ": started at " << start_time_system
164 0 : << " ms since epoch, tried pushing " << num_pushes << " elements; time taken was "
165 0 : << std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - start_time).count()
166 0 : << " ms" << "\n";
167 0 : TLOG(TLVL_INFO) << msg.str();
168 0 : }
169 :
170 : /**
171 : * @brief Pop elements off of the queue
172 : */
173 : void
174 0 : remove_things(test_config const& config, test_results& results, const volatile bool& spinlock)
175 : {
176 0 : const int num_pops = config.num_removing_threads > 0 ? config.num_elements / config.num_removing_threads : 0;
177 0 : auto start_time_pop = std::chrono::steady_clock::now(); // Won't ever use the initialization value
178 0 : int val = -999;
179 :
180 0 : while (spinlock) {
181 : } // Main program thread will set this to false, then this thread starts popping
182 :
183 0 : const auto start_time = std::chrono::steady_clock::now();
184 0 : const auto start_time_system =
185 0 : std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
186 :
187 0 : for (int i = 0; i < num_pops; ++i) {
188 :
189 0 : if (config.avg_milliseconds_between_pops > 0) {
190 0 : std::this_thread::sleep_for(std::chrono::milliseconds((*results.pop_distribution)(results.generator)));
191 : }
192 :
193 0 : while (!config.queue->can_pop()) {
194 0 : std::this_thread::sleep_for(std::chrono::milliseconds(1));
195 : }
196 :
197 0 : while (true) {
198 0 : try {
199 :
200 0 : if (!config.enable_per_pushpop_timing) {
201 0 : config.queue->pop(val, config.timeout);
202 : } else {
203 0 : start_time_pop = std::chrono::steady_clock::now();
204 0 : config.queue->pop(val, config.timeout);
205 :
206 0 : if (std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - start_time_pop) >
207 : config.timeout)
208 0 : results.timeout_pops++;
209 : }
210 :
211 0 : if (config.enable_max_size_checking) {
212 0 : results.queue_size--;
213 : }
214 0 : break;
215 :
216 0 : } catch (const dunedaq::iomanager::QueueTimeoutExpired& e) {
217 0 : results.throw_pops++;
218 0 : std::ostringstream msg;
219 0 : msg << "Thread #" << std::this_thread::get_id() << ": exception thrown on pop #" << i << ": " << e.what();
220 0 : TLOG(TLVL_WARNING) << msg.str();
221 0 : }
222 : }
223 : }
224 :
225 0 : std::ostringstream msg;
226 0 : msg << "Thread #" << std::this_thread::get_id() << ": started at " << start_time_system
227 0 : << " ms since epoch, tried popping " << num_pops << " elements; time taken was "
228 0 : << std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - start_time).count()
229 0 : << " ms" << "\n";
230 0 : TLOG(TLVL_INFO) << msg.str();
231 0 : }
232 :
233 : } // namespace ""
234 :
235 : int
236 0 : main(int argc, char* argv[])
237 : {
238 0 : test_config config;
239 0 : test_results results;
240 :
241 0 : double initial_capacity_used = 0; ///< The initial portion of the Queue which was full.
242 :
243 0 : std::ostringstream descstr;
244 0 : descstr << argv[0] << " known arguments "; // NOLINT
245 :
246 0 : std::ostringstream num_elements_desc;
247 0 : num_elements_desc << "# of elements you want pushed and/or popped (default is " << config.num_elements << ")";
248 :
249 0 : std::ostringstream push_threads_desc;
250 0 : push_threads_desc << "# of threads you want pushing elements onto the queue (default is " << config.num_adding_threads
251 0 : << ")";
252 :
253 0 : std::ostringstream pop_threads_desc;
254 0 : pop_threads_desc << "# of threads you want popping elements off the queue (default is " << config.num_removing_threads
255 0 : << ")";
256 :
257 0 : std::ostringstream push_pause_desc;
258 0 : push_pause_desc << "average time in milliseconds between a thread's pushes (default is "
259 0 : << config.avg_milliseconds_between_pushes << ")";
260 :
261 0 : std::ostringstream pop_pause_desc;
262 0 : pop_pause_desc << "average time in milliseconds between a thread's pops (default is "
263 0 : << config.avg_milliseconds_between_pops << ")";
264 :
265 0 : std::ostringstream capacity_used_desc;
266 0 : capacity_used_desc << "fraction of the queue's capacity filled at start (default is " << initial_capacity_used << ")";
267 :
268 0 : bpo::options_description desc(descstr.str());
269 0 : desc.add_options()("queue_type",
270 0 : bpo::value<std::string>(),
271 : "Type of queue instance you want to test (default is "
272 : "StdDeQueue) (supported "
273 0 : "types are: StdDeQueue, FollySPSCQueue, FollyMPMCQueue)")(
274 0 : "nelements", bpo::value<int>(), num_elements_desc.str().c_str())(
275 0 : "push_threads", bpo::value<int>(), push_threads_desc.str().c_str())(
276 0 : "pop_threads", bpo::value<int>(), pop_threads_desc.str().c_str())(
277 0 : "pause_between_pushes", bpo::value<int>(), push_pause_desc.str().c_str())(
278 0 : "pause_between_pops", bpo::value<int>(), pop_pause_desc.str().c_str())(
279 0 : "capacity", bpo::value<int>()->default_value(1000000000), "queue capacity")(
280 0 : "initial_capacity_used", bpo::value<double>(), capacity_used_desc.str().c_str())("help,h", "produce help message");
281 :
282 0 : bpo::variables_map vm;
283 0 : bpo::store(bpo::parse_command_line(argc, argv, desc), vm);
284 0 : bpo::notify(vm);
285 :
286 0 : if (vm.count("help")) {
287 0 : std::cout << desc << "\n"; // NOLINT (TRACE prints an unnecessary warning
288 : // suggesting that a streamer be implemented for
289 : // boost::program_options::options_description)
290 : return 0;
291 : }
292 :
293 0 : std::string queue_type = "StdDeQueue";
294 0 : if (vm.count("queue_type")) {
295 0 : queue_type = vm["queue_type"].as<std::string>();
296 : }
297 :
298 0 : int capacity = vm["capacity"].as<int>();
299 :
300 0 : if (queue_type == "StdDeQueue") {
301 0 : config.queue = std::make_unique<dunedaq::iomanager::StdDeQueue<int>>("StdDeQueue", static_cast<size_t>(capacity));
302 0 : } else if (queue_type == "FollySPSCQueue") {
303 0 : config.queue =
304 0 : std::make_unique<dunedaq::iomanager::FollySPSCQueue<int>>("FollySPSCQueue", static_cast<size_t>(capacity));
305 0 : } else if (queue_type == "FollyMPMCQueue") {
306 0 : config.queue =
307 0 : std::make_unique<dunedaq::iomanager::FollyMPMCQueue<int>>("FollyMPMCQueue", static_cast<size_t>(capacity));
308 : } else {
309 0 : TLOG(TLVL_ERROR) << "Unknown queue type \"" << queue_type << "\" requested for testing";
310 0 : return 1;
311 : }
312 :
313 0 : if (vm.count("nelements")) {
314 0 : config.num_elements = vm["nelements"].as<int>();
315 :
316 0 : if (config.num_elements <= 0) {
317 0 : throw dunedaq::iomanager::ParameterDomainIssue(ERS_HERE, "# of elements must be a positive integer");
318 : }
319 : }
320 :
321 0 : if (vm.count("push_threads")) {
322 :
323 0 : config.num_adding_threads = vm["push_threads"].as<int>();
324 :
325 0 : if (config.num_adding_threads < 0) {
326 0 : throw dunedaq::iomanager::ParameterDomainIssue(ERS_HERE, "# of pushing threads must be non-negative");
327 : }
328 0 : if (queue_type == "FollySPSCQueue" && config.num_adding_threads != 0 && config.num_adding_threads != 1) {
329 0 : throw dunedaq::iomanager::ParameterDomainIssue(ERS_HERE, "# of pushing threads must 0 or 1 for SPSC queue");
330 : }
331 0 : if (config.num_adding_threads > 0 && config.num_elements % config.num_adding_threads != 0) {
332 0 : std::ostringstream msg;
333 0 : msg << "# of pushing threads must divide into the # of elements (" << config.num_elements
334 0 : << ") without a remainder";
335 0 : throw dunedaq::iomanager::ParameterDomainIssue(ERS_HERE, msg.str());
336 0 : }
337 : }
338 :
339 0 : if (vm.count("pop_threads")) {
340 0 : config.num_removing_threads = vm["pop_threads"].as<int>();
341 :
342 0 : if (config.num_removing_threads < 0) {
343 0 : throw dunedaq::iomanager::ParameterDomainIssue(ERS_HERE, "# of popping threads must be non-negative");
344 : }
345 0 : if (queue_type == "FollySPSCQueue" && config.num_removing_threads != 0 && config.num_removing_threads != 1) {
346 0 : throw dunedaq::iomanager::ParameterDomainIssue(ERS_HERE, "# of popping threads must 0 or 1 for SPSC queue");
347 : }
348 0 : if (config.num_removing_threads > 0 && config.num_elements % config.num_removing_threads != 0) {
349 0 : std::ostringstream msg;
350 0 : msg << "# of popping threads must divide into the # of elements (" << config.num_elements
351 0 : << ") without a remainder";
352 0 : throw dunedaq::iomanager::ParameterDomainIssue(ERS_HERE, msg.str());
353 0 : }
354 : }
355 :
356 0 : if (vm.count("pause_between_pushes")) {
357 0 : config.avg_milliseconds_between_pushes = vm["pause_between_pushes"].as<int>();
358 :
359 0 : if (config.avg_milliseconds_between_pushes < 0) {
360 0 : throw dunedaq::iomanager::ParameterDomainIssue(ERS_HERE,
361 0 : "Average # of milliseconds between pushes must be non-negative");
362 : }
363 : }
364 :
365 0 : if (vm.count("pause_between_pops")) {
366 0 : config.avg_milliseconds_between_pops = vm["pause_between_pops"].as<int>();
367 :
368 0 : if (config.avg_milliseconds_between_pops < 0) {
369 0 : throw dunedaq::iomanager::ParameterDomainIssue(ERS_HERE,
370 0 : "Average # of milliseconds between pops must be non-negative");
371 : }
372 : }
373 :
374 0 : if (vm.count("initial_capacity_used")) {
375 0 : initial_capacity_used = vm["initial_capacity_used"].as<double>();
376 :
377 0 : if (initial_capacity_used < 0 || initial_capacity_used > 1) {
378 0 : throw dunedaq::iomanager::ParameterDomainIssue(
379 0 : ERS_HERE, "Initial fractional capacity of queue which is used must lie in the range [0, 1]");
380 : }
381 : }
382 :
383 0 : results.push_distribution =
384 0 : std::make_unique<std::uniform_int_distribution<int>>(0, 2 * config.avg_milliseconds_between_pushes);
385 0 : results.pop_distribution =
386 0 : std::make_unique<std::uniform_int_distribution<int>>(0, 2 * config.avg_milliseconds_between_pops);
387 :
388 0 : TLOG(TLVL_INFO) << config.num_adding_threads << " thread(s) pushing " << config.num_elements
389 0 : << " elements between them, each thread has an average time of "
390 0 : << config.avg_milliseconds_between_pushes << " milliseconds between pushes";
391 0 : TLOG(TLVL_INFO) << config.num_removing_threads << " thread(s) popping " << config.num_elements
392 0 : << " elements between them, each thread has an average time of "
393 0 : << config.avg_milliseconds_between_pops << " milliseconds between pops";
394 0 : TLOG(TLVL_INFO) << "Queue of type " << queue_type << " has capacity for " << capacity << " elements";
395 :
396 0 : int elements_to_begin_with = static_cast<int>(initial_capacity_used * capacity);
397 :
398 0 : if (initial_capacity_used > 0) {
399 :
400 0 : TLOG(TLVL_INFO) << "Before test officially begins, pushing " << elements_to_begin_with
401 0 : << " elements onto the queue";
402 0 : for (int i_e = 0; i_e < elements_to_begin_with; ++i_e) {
403 0 : config.queue->push(-1, config.timeout);
404 : }
405 0 : results.queue_size = elements_to_begin_with;
406 0 : results.max_queue_size = elements_to_begin_with;
407 0 : TLOG(TLVL_INFO) << "Finished pre-test filling of the queue";
408 : }
409 :
410 0 : if (config.num_adding_threads > 0 && elements_to_begin_with + config.num_elements > capacity) {
411 0 : std::ostringstream msg;
412 0 : msg << "The number of elements the queue is initially filled with (" << elements_to_begin_with
413 0 : << ") plus the number of elements which will be pushed onto it (" << config.num_elements
414 0 : << ") exceeds the queue's capacity (" << capacity << ")";
415 0 : if (config.num_removing_threads > 0) {
416 0 : TLOG(TLVL_WARNING) << msg.str();
417 : } else {
418 0 : TLOG(TLVL_ERROR) << msg.str();
419 0 : return 2;
420 : }
421 0 : }
422 :
423 0 : if (config.num_removing_threads > 0 && config.num_elements > elements_to_begin_with) {
424 0 : std::ostringstream msg;
425 0 : msg << "The number of elements the queue is initially filled with (" << elements_to_begin_with
426 0 : << ") minus the number of elements which will be popped off of it (" << config.num_elements
427 0 : << ") is less than zero";
428 :
429 0 : if (config.num_adding_threads > 0) {
430 0 : TLOG(TLVL_WARNING) << msg.str();
431 : } else {
432 0 : TLOG(TLVL_ERROR) << msg.str();
433 0 : return 3;
434 : }
435 0 : }
436 :
437 0 : bool spinlock = true;
438 :
439 0 : std::vector<std::thread> adders;
440 0 : std::vector<std::thread> removers;
441 :
442 0 : for (int i = 0; i < config.num_adding_threads; ++i) {
443 0 : adders.emplace_back(add_things, std::cref(config), std::ref(results), std::cref(spinlock));
444 : }
445 :
446 0 : for (int i = 0; i < config.num_removing_threads; ++i) {
447 0 : removers.emplace_back(remove_things, std::cref(config), std::ref(results), std::cref(spinlock));
448 : }
449 :
450 : // 20 ms is the pause Ron used when he originally implemented the
451 : // spinlock strategy in his logging package
452 :
453 0 : std::this_thread::sleep_for(std::chrono::milliseconds(20));
454 0 : spinlock = false;
455 :
456 0 : const auto start_time = std::chrono::steady_clock::now();
457 0 : for (auto& adder : adders) {
458 0 : adder.join();
459 : }
460 :
461 0 : for (auto& remover : removers) {
462 0 : remover.join();
463 : }
464 0 : const auto total_time =
465 0 : std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - start_time).count();
466 :
467 0 : TLOG(TLVL_INFO) << "\n\nFinal results: ";
468 :
469 0 : if (config.enable_max_size_checking) {
470 0 : TLOG(TLVL_INFO) << "Max queue size during running was " << results.max_queue_size;
471 : } else {
472 0 : TLOG(TLVL_INFO) << "Disabled check for max queue size during running";
473 : }
474 :
475 0 : if (config.num_adding_threads > 0) {
476 0 : TLOG(TLVL_INFO) << "There were " << results.throw_pushes << " exception throws on push calls";
477 :
478 0 : if (config.enable_per_pushpop_timing) {
479 0 : TLOG(TLVL_INFO) << "There were " << results.timeout_pushes
480 0 : << " pushes which took longer than the provided timeout of " << config.timeout.count() << " ms\n";
481 : } else {
482 0 : TLOG(TLVL_INFO) << "Disabled count of slow pushes\n";
483 : }
484 : }
485 :
486 0 : if (config.num_removing_threads > 0) {
487 0 : TLOG(TLVL_INFO) << "There were " << results.throw_pops << " exception throws on pop calls";
488 :
489 0 : if (config.enable_per_pushpop_timing) {
490 0 : TLOG(TLVL_INFO) << "There were " << results.timeout_pops
491 0 : << " pops which took longer than the provided timeout of " << config.timeout.count() << " ms\n";
492 : } else {
493 0 : TLOG(TLVL_INFO) << "Disabled count of slow pops\n";
494 : }
495 : }
496 :
497 0 : TLOG(TLVL_INFO) << "Total time from start of thread launch to the last thread wrapping up was " << total_time
498 0 : << " ms";
499 :
500 0 : return 0;
501 0 : } // NOLINT
|