Line data Source code
1 : /**
2 : * @file test_ratelimiter_app.cxx Test application for
3 : * ratelimiter implementation
4 : *
5 : * This is part of the DUNE DAQ Application Framework, copyright 2020.
6 : * Licensing/copyright details are in the COPYING file that you should have
7 : * received with this code.
8 : */
9 : #include "datahandlinglibs/utils/RateLimiter.hpp"
10 :
11 : #include "logging/Logging.hpp"
12 :
13 : #include "datahandlinglibs/ReadoutTypes.hpp"
14 :
15 : #include "folly/ConcurrentSkipList.h"
16 :
17 : #include <atomic>
18 : #include <chrono>
19 : #include <memory>
20 : #include <random>
21 : #include <set>
22 : #include <string>
23 : #include <utility>
24 : #include <vector>
25 :
26 : using namespace dunedaq::datahandlinglibs;
27 : using namespace folly;
28 :
29 : int
30 0 : main(int /*argc*/, char** /*argv[]*/)
31 : {
32 :
33 : // ConcurrentSkipList from Folly
34 0 : typedef ConcurrentSkipList<types::DUMMY_FRAME_STRUCT> SkipListT;
35 0 : typedef SkipListT::Accessor SkipListTAcc;
36 : // typedef SkipListT::Skipper SkipListTSkip; //Skipper accessor to test
37 :
38 : // Skiplist instance
39 0 : auto head_height = 2;
40 0 : std::shared_ptr<SkipListT> skl(SkipListT::createInstance(head_height));
41 :
42 : // Run for seconds
43 0 : int runsecs = 15;
44 :
45 : // Run marker
46 0 : std::atomic<bool> marker{ true };
47 :
48 : // RateLimiter
49 0 : TLOG() << "Creating ratelimiter with 10 KHz...";
50 0 : RateLimiter rl(10);
51 :
52 0 : std::random_device rd;
53 0 : std::mt19937 mt(rd());
54 :
55 : // RateLimiter adjuster
56 0 : auto adjuster = std::thread([&]() {
57 0 : TLOG() << "Spawned adjuster thread...";
58 0 : std::uniform_real_distribution<> dist(10.0, 100.0);
59 0 : std::vector<float> rand_rates;
60 0 : for (int i = 0; i < runsecs; ++i) {
61 0 : rand_rates.push_back(dist(mt));
62 : }
63 : int idx = 0;
64 0 : while (marker) {
65 0 : TLOG() << "Adjusting rate to: " << rand_rates[idx] << " [kHz]";
66 0 : rl.adjust(rand_rates[idx]);
67 0 : if (idx > runsecs - 1) {
68 : idx = 0;
69 : } else {
70 0 : ++idx;
71 : }
72 0 : std::this_thread::sleep_for(std::chrono::seconds(1));
73 : }
74 0 : });
75 :
76 : // Producer thread
77 0 : auto producer = std::thread([&]() {
78 0 : TLOG() << "SkipList Producer spawned... Creating accessor.";
79 0 : uint64_t ts = 0; // NOLINT(build/unsigned)
80 0 : while (marker) {
81 0 : types::DUMMY_FRAME_STRUCT pl;
82 0 : auto plptr =
83 : const_cast<types::DUMMY_FRAME_STRUCT*>(reinterpret_cast<const types::DUMMY_FRAME_STRUCT*>(&pl)); // NOLINT
84 0 : plptr->timestamp = ts;
85 0 : {
86 0 : SkipListTAcc prodacc(skl);
87 0 : prodacc.insert(std::move(pl));
88 0 : }
89 0 : ts += 25;
90 0 : rl.limit();
91 : }
92 0 : TLOG() << "Producer joins...";
93 0 : });
94 :
95 : // Cleanup thread
96 0 : auto cleaner = std::thread([&]() {
97 0 : std::string tname("Cleaner");
98 0 : TLOG() << "SkipList " << tname << " spawned... Creating accessor.";
99 0 : uint64_t max_time_diff = 100000; // accounts for few seconds in FE clock // NOLINT(build/unsigned)
100 0 : while (marker) {
101 0 : SkipListTAcc cleanacc(skl);
102 0 : TLOG() << tname << ": SkipList size: " << cleanacc.size();
103 0 : auto tail = cleanacc.last();
104 0 : auto head = cleanacc.first();
105 0 : if (tail && head) {
106 0 : auto tailptr = reinterpret_cast<const types::DUMMY_FRAME_STRUCT*>(tail); // NOLINT
107 0 : auto headptr = reinterpret_cast<const types::DUMMY_FRAME_STRUCT*>(head); // NOLINT
108 0 : auto tailts = tailptr->get_timestamp();
109 0 : auto headts = headptr->get_timestamp();
110 0 : if (headts - tailts > max_time_diff) { // ts differnce exceeds maximum
111 0 : uint64_t timediff = max_time_diff; // NOLINT(build/unsigned)
112 0 : auto removed_ctr = 0;
113 0 : while (timediff >= max_time_diff) {
114 0 : bool removed = cleanacc.remove(*tail);
115 0 : if (!removed) {
116 0 : TLOG() << tname << ": Unsuccessfull remove: " << removed;
117 : } else {
118 0 : ++removed_ctr;
119 : }
120 0 : tail = cleanacc.last();
121 0 : tailptr = reinterpret_cast<const types::DUMMY_FRAME_STRUCT*>(tail); // NOLINT
122 0 : tailts = tailptr->get_timestamp();
123 0 : timediff = headts - tailts;
124 : }
125 0 : TLOG() << tname << ": Cleared " << removed_ctr << " elements.";
126 : }
127 : } else {
128 0 : TLOG() << tname << ": Didn't manage to get SKL head and tail!";
129 : }
130 0 : std::this_thread::sleep_for(std::chrono::milliseconds(500));
131 0 : }
132 0 : TLOG() << "Cleaner joins...";
133 0 : });
134 :
135 : // Data extractor (Trigger Matcher, timestamp finder.)
136 0 : auto extractor = std::thread([&]() {
137 0 : std::string tname("TriggerMatcher");
138 0 : TLOG() << "SkipList " << tname << " spawned...";
139 0 : while (marker) {
140 0 : { // block enforce for Accessor
141 0 : SkipListTAcc exacc(skl);
142 0 : auto tail = exacc.last();
143 0 : auto head = exacc.first();
144 0 : if (tail && head) {
145 0 : auto tailptr = reinterpret_cast<const types::DUMMY_FRAME_STRUCT*>(tail); // NOLINT
146 0 : auto headptr = reinterpret_cast<const types::DUMMY_FRAME_STRUCT*>(head); // NOLINT
147 0 : auto tailts = tailptr->get_timestamp();
148 0 : auto headts = headptr->get_timestamp();
149 :
150 : // Adjust trigger right in between:
151 0 : auto trigts = (tailts + headts) / static_cast<uint64_t>(2); // NOLINT(build/unsigned)
152 0 : types::DUMMY_FRAME_STRUCT trigger_pl;
153 0 : auto trigptr = const_cast<types::DUMMY_FRAME_STRUCT*>(
154 : reinterpret_cast<const types::DUMMY_FRAME_STRUCT*>(&trigger_pl)); // NOLINT
155 0 : trigptr->timestamp = trigts;
156 :
157 : // Find closest to trigger payload
158 0 : auto close = exacc.lower_bound(trigger_pl);
159 : // if (close) {
160 0 : auto foundptr = reinterpret_cast<const types::DUMMY_FRAME_STRUCT*>(&(*close)); // NOLINT
161 0 : auto foundts = foundptr->get_timestamp();
162 0 : TLOG() << tname << ": Found element lower bound to " << trigts << " in skiplist with timestamp --> "
163 0 : << foundts;
164 : //} else {
165 : // TLOG() << tname << ": Timestamp doesn't seem to be in Skiplist...";
166 : //}
167 : //
168 : //} else {
169 : // TLOG() << tname << ": Didn't manage to get SKL head and tail!";
170 : }
171 0 : }
172 0 : std::this_thread::sleep_for(std::chrono::milliseconds(100));
173 : }
174 0 : TLOG() << "Extractor joins...";
175 0 : });
176 :
177 : // Killswitch that flips the run marker
178 0 : auto killswitch = std::thread([&]() {
179 0 : TLOG() << "Application will terminate in 5s...";
180 0 : std::this_thread::sleep_for(std::chrono::seconds(runsecs));
181 0 : marker.store(false);
182 0 : });
183 :
184 : // Join local threads
185 0 : TLOG() << "Flipping killswitch in order to stop threads...";
186 0 : if (killswitch.joinable()) {
187 0 : killswitch.join();
188 : }
189 :
190 0 : if (producer.joinable()) {
191 0 : producer.join();
192 : }
193 :
194 0 : if (cleaner.joinable()) {
195 0 : cleaner.join();
196 : }
197 :
198 0 : if (extractor.joinable()) {
199 0 : extractor.join();
200 : }
201 0 : if (adjuster.joinable()) {
202 0 : adjuster.join();
203 : }
204 :
205 : // Exit
206 0 : TLOG() << "Exiting.";
207 :
208 0 : std::this_thread::sleep_for(std::chrono::seconds(2));
209 :
210 0 : return 0;
211 0 : } // NOLINT(readability/fn_size)
|