Line data Source code
1 : /**
2 : * @file test_ratelimiter_app.cxx Test application for
3 : * ratelimiter implementation
4 : *
5 : * This is part of the DUNE DAQ Application Framework, copyright 2020.
6 : * Licensing/copyright details are in the COPYING file that you should have
7 : * received with this code.
8 : */
9 : #include "datahandlinglibs/utils/RateLimiter.hpp"
10 :
11 : #include "logging/Logging.hpp"
12 :
13 : #include <atomic>
14 : #include <chrono>
15 : #include <random>
16 : #include <vector>
17 :
18 : using namespace dunedaq::datahandlinglibs;
19 :
20 : int
21 0 : main(int /*argc*/, char** /*argv[]*/)
22 : {
23 0 : int runsecs = 15;
24 :
25 : // Run marker
26 0 : std::atomic<bool> marker{ true };
27 :
28 : // RateLimiter
29 0 : TLOG() << "Creating ratelimiter with 1MHz...";
30 0 : RateLimiter rl(1000);
31 :
32 : // Counter for ops/s
33 0 : std::atomic<int> newops = 0;
34 :
35 : // Stats
36 0 : auto stats = std::thread([&]() {
37 0 : TLOG() << "Spawned stats thread...";
38 0 : while (marker) {
39 0 : TLOG() << "ops/s -> " << newops.exchange(0);
40 0 : std::this_thread::sleep_for(std::chrono::seconds(1));
41 : }
42 0 : });
43 :
44 0 : std::random_device rd;
45 0 : std::mt19937 mt(rd());
46 :
47 : // Adjuster
48 0 : auto adjuster = std::thread([&]() {
49 0 : TLOG() << "Spawned adjuster thread...";
50 0 : std::uniform_int_distribution<> dist(1, 1000);
51 0 : std::vector<int> rand_rates;
52 0 : for (int i = 0; i < runsecs; ++i) {
53 0 : rand_rates.push_back(dist(mt));
54 : }
55 : int idx = 0;
56 0 : while (marker) {
57 0 : TLOG() << "Adjusting rate to: " << rand_rates[idx] << "[kHz]";
58 0 : rl.adjust(rand_rates[idx]);
59 0 : if (idx > runsecs - 1) {
60 : idx = 0;
61 : } else {
62 0 : ++idx;
63 : }
64 0 : std::this_thread::sleep_for(std::chrono::seconds(1));
65 : }
66 0 : });
67 :
68 : // Killswitch that flips the run marker
69 0 : auto killswitch = std::thread([&]() {
70 0 : TLOG() << "Application will terminate in 5s...";
71 0 : std::this_thread::sleep_for(std::chrono::seconds(runsecs));
72 0 : marker.store(false);
73 0 : });
74 :
75 : // Limit task
76 0 : TLOG() << "Launching task to count...";
77 0 : int sumops = 0;
78 0 : rl.init();
79 0 : while (marker) {
80 : // just count...
81 0 : sumops++;
82 0 : newops++;
83 0 : rl.limit();
84 : }
85 :
86 : // Join local threads
87 0 : TLOG() << "Flipping killswitch in order to stop...";
88 0 : if (killswitch.joinable()) {
89 0 : killswitch.join();
90 : }
91 0 : if (stats.joinable()) {
92 0 : stats.join();
93 : }
94 0 : if (adjuster.joinable()) {
95 0 : adjuster.join();
96 : }
97 :
98 : // Check
99 : // TLOG() << "Operations in 5 seconds (should be really close to 5 million:): " << sumops;
100 :
101 : // Exit
102 0 : TLOG() << "Exiting.";
103 0 : return 0;
104 0 : }
|