Line data Source code
1 : /**
2 : * @file IOManager_test.cxx IOManager Unit Tests
3 : *
4 : * This is part of the DUNE DAQ Application Framework, copyright 2021.
5 : * Licensing/copyright details are in the COPYING file that you should have
6 : * received with this code.
7 : */
8 :
9 : #include "iomanager/IOManager.hpp"
10 :
11 : #include "opmonlib/TestOpMonManager.hpp"
12 : #include "serialization/Serialization.hpp"
13 :
14 : #define BOOST_TEST_MODULE IOManager_test // NOLINT
15 :
16 : #include "boost/test/unit_test.hpp"
17 :
18 : #include <memory>
19 : #include <string>
20 : #include <utility>
21 : #include <vector>
22 :
23 : using namespace dunedaq::iomanager;
24 :
25 : namespace dunedaq {
26 : namespace iomanager {
27 : struct Data
28 : {
29 : int d1;
30 : double d2;
31 : std::string d3;
32 :
33 5 : Data() = default;
34 4 : Data(int i, double d, std::string s)
35 4 : : d1(i)
36 4 : , d2(d)
37 4 : , d3(s)
38 : {
39 4 : }
40 20 : virtual ~Data() = default;
41 : Data(Data const&) = default;
42 : Data& operator=(Data const&) = default;
43 11 : Data(Data&&) = default;
44 5 : Data& operator=(Data&&) = default;
45 :
46 4 : DUNE_DAQ_SERIALIZE(Data, d1, d2, d3);
47 : };
48 : struct Data2
49 : {
50 : int d1;
51 : double d2;
52 :
53 10 : Data2() = default;
54 10 : Data2(int i, double d)
55 10 : : d1(i)
56 10 : , d2(d)
57 : {
58 10 : }
59 16 : virtual ~Data2() = default;
60 : Data2(Data2 const&) = default;
61 : Data2& operator=(Data2 const&) = default;
62 0 : Data2(Data2&&) = default;
63 4 : Data2& operator=(Data2&&) = default;
64 :
65 20 : DUNE_DAQ_SERIALIZE(Data2, d1, d2);
66 : };
67 : struct Data3
68 : {
69 : int d1;
70 :
71 1 : Data3() = default;
72 1 : explicit Data3(int i)
73 1 : : d1(i)
74 : {
75 1 : }
76 2 : virtual ~Data3() = default;
77 : Data3(Data3 const&) = default;
78 : Data3& operator=(Data3 const&) = default;
79 0 : Data3(Data3&&) = default;
80 0 : Data3& operator=(Data3&&) = default;
81 :
82 2 : DUNE_DAQ_SERIALIZE(Data3, d1);
83 : };
84 :
85 : struct NonCopyableData
86 : {
87 : int d1;
88 : double d2;
89 : std::string d3;
90 :
91 5 : NonCopyableData() = default;
92 4 : NonCopyableData(int i, double d, std::string s)
93 4 : : d1(i)
94 4 : , d2(d)
95 4 : , d3(s)
96 : {
97 4 : }
98 20 : virtual ~NonCopyableData() = default;
99 : NonCopyableData(NonCopyableData const&) = delete;
100 : NonCopyableData& operator=(NonCopyableData const&) = delete;
101 11 : NonCopyableData(NonCopyableData&&) = default;
102 5 : NonCopyableData& operator=(NonCopyableData&&) = default;
103 :
104 4 : DUNE_DAQ_SERIALIZE(NonCopyableData, d1, d2, d3);
105 : };
106 :
107 : struct NonSerializableData
108 : {
109 : int d1;
110 : double d2;
111 : std::string d3;
112 :
113 4 : NonSerializableData() = default;
114 4 : NonSerializableData(int i, double d, std::string s)
115 4 : : d1(i)
116 4 : , d2(d)
117 4 : , d3(s)
118 : {
119 4 : }
120 17 : virtual ~NonSerializableData() = default;
121 : NonSerializableData(NonSerializableData const&) = default;
122 : NonSerializableData& operator=(NonSerializableData const&) = default;
123 9 : NonSerializableData(NonSerializableData&&) = default;
124 4 : NonSerializableData& operator=(NonSerializableData&&) = default;
125 : };
126 :
127 : struct NonSerializableNonCopyable
128 : {
129 : int d1;
130 : double d2;
131 : std::string d3;
132 :
133 4 : NonSerializableNonCopyable() = default;
134 4 : NonSerializableNonCopyable(int i, double d, std::string s)
135 4 : : d1(i)
136 4 : , d2(d)
137 4 : , d3(s)
138 : {
139 4 : }
140 17 : virtual ~NonSerializableNonCopyable() = default;
141 : NonSerializableNonCopyable(NonSerializableNonCopyable const&) = delete;
142 : NonSerializableNonCopyable& operator=(NonSerializableNonCopyable const&) = delete;
143 9 : NonSerializableNonCopyable(NonSerializableNonCopyable&&) = default;
144 4 : NonSerializableNonCopyable& operator=(NonSerializableNonCopyable&&) = default;
145 : };
146 :
147 : } // namespace iomanager
148 :
149 : // Must be in dunedaq namespace only
150 16 : DUNE_DAQ_SERIALIZABLE(iomanager::Data, "data_t");
151 14 : DUNE_DAQ_SERIALIZABLE(iomanager::Data2, "data2_t");
152 2 : DUNE_DAQ_SERIALIZABLE(iomanager::Data3, "data3_t");
153 10 : DUNE_DAQ_SERIALIZABLE(iomanager::NonCopyableData, "data_t");
154 :
155 : // Note: Using the same data type string is bad, don't do it for real data types!
156 : template<>
157 : inline std::string
158 10 : datatype_to_string<NonSerializableData>()
159 : {
160 10 : return "data_t";
161 : }
162 : template<>
163 : inline std::string
164 10 : datatype_to_string<NonSerializableNonCopyable>()
165 : {
166 10 : return "data_t";
167 : }
168 : } // namespace dunedaq
169 :
170 : BOOST_AUTO_TEST_SUITE(IOManager_test)
171 :
172 : struct ConfigurationTestFixture
173 : {
174 14 : ConfigurationTestFixture()
175 14 : {
176 14 : confdb = std::make_shared<dunedaq::conffwk::Configuration>("oksconflibs:test/config/iomanager_test.data.xml");
177 14 : confdb->get<dunedaq::confmodel::Queue>(queues);
178 14 : confdb->get<dunedaq::confmodel::NetworkConnection>(connections);
179 :
180 14 : conn_id = ConnectionId{ "network", "data_t" };
181 14 : queue_id = ConnectionId{ "queue", "data_t" };
182 :
183 14 : pub1_id = ConnectionId{ "pub1", "data2_t" };
184 14 : pub2_id = ConnectionId{ "pub2", "data2_t" };
185 14 : pub3_id = ConnectionId{ "pub3", "data3_t" };
186 14 : sub1_id = ConnectionId{ "pub.*", "data2_t" };
187 14 : sub1b_id = ConnectionId{ "pub.*", "data2_t", "b" };
188 14 : sub2_id = ConnectionId{ "pub2", "data2_t" };
189 14 : sub3_id = ConnectionId{ "pub.*", "data3_t" };
190 :
191 14 : IOManager::get()->configure("IOManager_t", queues, connections, nullptr, opmgr); // Not using connectivity service
192 14 : }
193 14 : ~ConfigurationTestFixture() { IOManager::get()->reset(); }
194 :
195 : ConfigurationTestFixture(ConfigurationTestFixture const&) = delete;
196 : ConfigurationTestFixture(ConfigurationTestFixture&&) = delete;
197 : ConfigurationTestFixture& operator=(ConfigurationTestFixture const&) = delete;
198 : ConfigurationTestFixture& operator=(ConfigurationTestFixture&&) = delete;
199 :
200 : ConnectionId conn_id;
201 : ConnectionId queue_id;
202 : ConnectionId pub1_id;
203 : ConnectionId pub2_id;
204 : ConnectionId pub3_id;
205 : ConnectionId sub1_id;
206 : ConnectionId sub1b_id;
207 : ConnectionId sub2_id;
208 : ConnectionId sub3_id;
209 :
210 : std::shared_ptr<dunedaq::conffwk::Configuration> confdb;
211 : std::vector<const dunedaq::confmodel::Queue*> queues;
212 : std::vector<const dunedaq::confmodel::NetworkConnection*> connections;
213 :
214 : dunedaq::opmonlib::TestOpMonManager opmgr;
215 : };
216 :
217 2 : BOOST_AUTO_TEST_CASE(CopyAndMoveSemantics)
218 : {
219 1 : BOOST_REQUIRE(!std::is_copy_constructible_v<IOManager>);
220 1 : BOOST_REQUIRE(!std::is_copy_assignable_v<IOManager>);
221 1 : BOOST_REQUIRE(!std::is_move_constructible_v<IOManager>);
222 1 : BOOST_REQUIRE(!std::is_move_assignable_v<IOManager>);
223 1 : }
224 :
225 2 : BOOST_AUTO_TEST_CASE(Singleton)
226 : {
227 1 : auto iom = IOManager::get();
228 1 : auto another_iom = IOManager::get();
229 :
230 1 : BOOST_REQUIRE_EQUAL(iom.get(), another_iom.get());
231 1 : }
232 :
233 2 : BOOST_FIXTURE_TEST_CASE(DatatypeMismatchException, ConfigurationTestFixture)
234 : {
235 1 : ConnectionId bad_id{ "iomanager_test_network", "baddata_t" };
236 :
237 1 : auto sender = IOManager::get()->get_sender<Data>(conn_id);
238 5 : BOOST_REQUIRE_EXCEPTION(
239 : IOManager::get()->get_sender<Data>(bad_id), DatatypeMismatch, [](DatatypeMismatch const&) { return true; });
240 :
241 1 : auto receiver = IOManager::get()->get_receiver<Data>(conn_id);
242 5 : BOOST_REQUIRE_EXCEPTION(
243 : IOManager::get()->get_receiver<Data>(bad_id), DatatypeMismatch, [](DatatypeMismatch const&) { return true; });
244 1 : }
245 :
246 2 : BOOST_FIXTURE_TEST_CASE(SimpleSendReceive, ConfigurationTestFixture)
247 : {
248 1 : auto net_sender = IOManager::get()->get_sender<Data>(conn_id);
249 1 : auto net_receiver = IOManager::get()->get_receiver<Data>(conn_id);
250 1 : auto q_sender = IOManager::get()->get_sender<Data>(queue_id);
251 1 : auto q_receiver = IOManager::get()->get_receiver<Data>(queue_id);
252 :
253 1 : Data sent_nw(56, 26.5, "test1");
254 1 : Data sent_q(57, 27.5, "test2");
255 1 : net_sender->send(std::move(sent_nw), dunedaq::iomanager::Sender::s_no_block);
256 :
257 1 : auto ret = net_receiver->receive(std::chrono::milliseconds(10));
258 1 : BOOST_CHECK_EQUAL(ret.d1, 56);
259 1 : BOOST_CHECK_EQUAL(ret.d2, 26.5);
260 1 : BOOST_CHECK_EQUAL(ret.d3, "test1");
261 :
262 1 : q_sender->send(std::move(sent_q), std::chrono::milliseconds(10));
263 :
264 1 : ret = q_receiver->receive(std::chrono::milliseconds(10));
265 1 : BOOST_CHECK_EQUAL(ret.d1, 57);
266 1 : BOOST_CHECK_EQUAL(ret.d2, 27.5);
267 1 : BOOST_CHECK_EQUAL(ret.d3, "test2");
268 1 : }
269 :
270 2 : BOOST_FIXTURE_TEST_CASE(SimplePubSub, ConfigurationTestFixture)
271 : {
272 1 : auto pub1_sender = IOManager::get()->get_sender<Data2>(pub1_id);
273 1 : auto pub2_sender = IOManager::get()->get_sender<Data2>(pub2_id);
274 1 : auto pub3_sender = IOManager::get()->get_sender<Data3>(pub3_id);
275 1 : auto sub1_receiver = IOManager::get()->get_receiver<Data2>(sub1_id);
276 1 : auto sub2_receiver = IOManager::get()->get_receiver<Data2>(sub2_id);
277 1 : auto sub3_receiver = IOManager::get()->get_receiver<Data3>(sub3_id);
278 :
279 : // Sub1 is subscribed to all data_t publishers, Sub2 only to pub2, Sub3 to all data2_t
280 1 : Data2 sent_t1(56, 26.5);
281 1 : pub1_sender->send(std::move(sent_t1), dunedaq::iomanager::Sender::s_no_block);
282 :
283 1 : auto ret1 = sub1_receiver->receive(std::chrono::milliseconds(10));
284 3 : BOOST_REQUIRE_EXCEPTION(
285 : sub2_receiver->receive(std::chrono::milliseconds(10)), TimeoutExpired, [](TimeoutExpired const&) { return true; });
286 3 : BOOST_REQUIRE_EXCEPTION(
287 : sub3_receiver->receive(std::chrono::milliseconds(10)), TimeoutExpired, [](TimeoutExpired const&) { return true; });
288 1 : BOOST_CHECK_EQUAL(ret1.d1, 56);
289 1 : BOOST_CHECK_EQUAL(ret1.d2, 26.5);
290 :
291 1 : Data2 sent_t2(57, 27.5);
292 1 : pub2_sender->send(std::move(sent_t2), dunedaq::iomanager::Sender::s_no_block);
293 :
294 1 : ret1 = sub1_receiver->receive(std::chrono::milliseconds(10));
295 1 : auto ret2 = sub2_receiver->receive(std::chrono::milliseconds(10));
296 3 : BOOST_REQUIRE_EXCEPTION(
297 : sub3_receiver->receive(std::chrono::milliseconds(10)), TimeoutExpired, [](TimeoutExpired const&) { return true; });
298 1 : BOOST_CHECK_EQUAL(ret1.d1, 57);
299 1 : BOOST_CHECK_EQUAL(ret1.d2, 27.5);
300 1 : BOOST_CHECK_EQUAL(ret2.d1, 57);
301 1 : BOOST_CHECK_EQUAL(ret2.d2, 27.5);
302 :
303 1 : Data3 sent_t3(58);
304 1 : pub3_sender->send(std::move(sent_t3), dunedaq::iomanager::Sender::s_no_block);
305 :
306 3 : BOOST_REQUIRE_EXCEPTION(
307 : sub1_receiver->receive(std::chrono::milliseconds(10)), TimeoutExpired, [](TimeoutExpired const&) { return true; });
308 3 : BOOST_REQUIRE_EXCEPTION(
309 : sub2_receiver->receive(std::chrono::milliseconds(10)), TimeoutExpired, [](TimeoutExpired const&) { return true; });
310 1 : auto ret3 = sub3_receiver->receive(std::chrono::milliseconds(10));
311 1 : BOOST_CHECK_EQUAL(ret3.d1, 58);
312 1 : }
313 :
314 2 : BOOST_FIXTURE_TEST_CASE(MultipleReceiverPubSub, ConfigurationTestFixture)
315 : {
316 1 : auto pub1_sender = IOManager::get()->get_sender<Data2>(pub1_id);
317 1 : auto sub1a_receiver = IOManager::get()->get_receiver<Data2>(sub1_id);
318 1 : auto sub1b_receiver = IOManager::get()->get_receiver<Data2>(sub1b_id);
319 :
320 : // Sub1 is subscribed to all data_t publishers, two instances should both get all messages
321 1 : Data2 sent_t1(56, 26.5);
322 1 : pub1_sender->send(std::move(sent_t1), dunedaq::iomanager::Sender::s_no_block);
323 :
324 1 : auto ret1a = sub1a_receiver->receive(std::chrono::milliseconds(10));
325 1 : auto ret1b = sub1b_receiver->receive(std::chrono::milliseconds(10));
326 :
327 1 : BOOST_CHECK_EQUAL(ret1a.d1, 56);
328 1 : BOOST_CHECK_EQUAL(ret1a.d2, 26.5);
329 1 : BOOST_CHECK_EQUAL(ret1b.d1, 56);
330 1 : BOOST_CHECK_EQUAL(ret1b.d2, 26.5);
331 :
332 1 : Data2 sent_t2(57, 27.5);
333 1 : pub1_sender->send(std::move(sent_t2), dunedaq::iomanager::Sender::s_no_block);
334 :
335 1 : ret1a = sub1a_receiver->receive(std::chrono::milliseconds(10));
336 1 : ret1b = sub1b_receiver->receive(std::chrono::milliseconds(10));
337 :
338 1 : BOOST_CHECK_EQUAL(ret1a.d1, 57);
339 1 : BOOST_CHECK_EQUAL(ret1a.d2, 27.5);
340 1 : BOOST_CHECK_EQUAL(ret1b.d1, 57);
341 1 : BOOST_CHECK_EQUAL(ret1b.d2, 27.5);
342 1 : }
343 :
344 2 : BOOST_FIXTURE_TEST_CASE(PubSubWithTopic, ConfigurationTestFixture)
345 : {
346 1 : auto pub1_sender = IOManager::get()->get_sender<Data2>(pub1_id);
347 1 : auto pub2_sender = IOManager::get()->get_sender<Data2>(pub2_id);
348 1 : auto sub1_receiver = IOManager::get()->get_receiver<Data2>(sub1_id);
349 1 : auto sub2_receiver = IOManager::get()->get_receiver<Data2>(sub2_id);
350 :
351 1 : sub1_receiver->subscribe("sub1_topic");
352 1 : sub2_receiver->subscribe("sub2_topic");
353 :
354 : // Sub1 is subscribed to all data_t publishers, Sub2 only to pub2
355 1 : Data2 sent_t0(54, 24.5);
356 1 : pub1_sender->send(std::move(sent_t0), dunedaq::iomanager::Sender::s_no_block);
357 1 : auto ret1 = sub1_receiver->receive(std::chrono::milliseconds(10));
358 1 : BOOST_CHECK_EQUAL(ret1.d1, 54);
359 1 : BOOST_CHECK_EQUAL(ret1.d2, 24.5);
360 3 : BOOST_REQUIRE_EXCEPTION(
361 : sub2_receiver->receive(std::chrono::milliseconds(10)), TimeoutExpired, [](TimeoutExpired const&) { return true; });
362 :
363 1 : sub1_receiver->unsubscribe("data2_t");
364 1 : Data2 sent_t1(55, 25.5);
365 1 : pub1_sender->send(std::move(sent_t1), dunedaq::iomanager::Sender::s_no_block);
366 3 : BOOST_REQUIRE_EXCEPTION(
367 : sub1_receiver->receive(std::chrono::milliseconds(10)), TimeoutExpired, [](TimeoutExpired const&) { return true; });
368 3 : BOOST_REQUIRE_EXCEPTION(
369 : sub2_receiver->receive(std::chrono::milliseconds(10)), TimeoutExpired, [](TimeoutExpired const&) { return true; });
370 :
371 1 : Data2 sent_t2(56, 26.5);
372 1 : pub1_sender->send_with_topic(std::move(sent_t2), dunedaq::iomanager::Sender::s_no_block, "test_topic");
373 3 : BOOST_REQUIRE_EXCEPTION(
374 : sub1_receiver->receive(std::chrono::milliseconds(10)), TimeoutExpired, [](TimeoutExpired const&) { return true; });
375 3 : BOOST_REQUIRE_EXCEPTION(
376 : sub2_receiver->receive(std::chrono::milliseconds(10)), TimeoutExpired, [](TimeoutExpired const&) { return true; });
377 :
378 1 : Data2 sent_t3{ 57, 27.5 };
379 1 : pub1_sender->send_with_topic(std::move(sent_t3), dunedaq::iomanager::Sender::s_no_block, "sub1_topic");
380 1 : ret1 = sub1_receiver->receive(std::chrono::milliseconds(10));
381 1 : BOOST_CHECK_EQUAL(ret1.d1, 57);
382 1 : BOOST_CHECK_EQUAL(ret1.d2, 27.5);
383 3 : BOOST_REQUIRE_EXCEPTION(
384 : sub2_receiver->receive(std::chrono::milliseconds(10)), TimeoutExpired, [](TimeoutExpired const&) { return true; });
385 :
386 1 : Data2 sent_t4{ 58, 28.5 };
387 1 : pub1_sender->send_with_topic(std::move(sent_t4), dunedaq::iomanager::Sender::s_no_block, "sub2_topic");
388 3 : BOOST_REQUIRE_EXCEPTION(
389 : sub1_receiver->receive(std::chrono::milliseconds(10)), TimeoutExpired, [](TimeoutExpired const&) { return true; });
390 3 : BOOST_REQUIRE_EXCEPTION(
391 : sub2_receiver->receive(std::chrono::milliseconds(10)), TimeoutExpired, [](TimeoutExpired const&) { return true; });
392 :
393 1 : Data2 sent_t5{ 59, 29.5 };
394 1 : pub2_sender->send_with_topic(std::move(sent_t5), dunedaq::iomanager::Sender::s_no_block, "sub2_topic");
395 3 : BOOST_REQUIRE_EXCEPTION(
396 : sub1_receiver->receive(std::chrono::milliseconds(10)), TimeoutExpired, [](TimeoutExpired const&) { return true; });
397 1 : auto ret2 = sub2_receiver->receive(std::chrono::milliseconds(10));
398 1 : BOOST_CHECK_EQUAL(ret2.d1, 59);
399 1 : BOOST_CHECK_EQUAL(ret2.d2, 29.5);
400 1 : }
401 :
402 2 : BOOST_FIXTURE_TEST_CASE(NotFound, ConfigurationTestFixture)
403 : {
404 1 : ConnectionId bad_id{ "pub4", "data2_t" };
405 1 : auto receiver = IOManager::get()->get_receiver<Data2>(bad_id);
406 :
407 3 : BOOST_REQUIRE_EXCEPTION(receiver->receive(std::chrono::milliseconds(10)),
408 : ConnectionInstanceNotFound,
409 : [](ConnectionInstanceNotFound const&) { return true; });
410 :
411 1 : auto ret = receiver->try_receive(std::chrono::milliseconds(10));
412 1 : BOOST_REQUIRE_EQUAL(ret.has_value(), false);
413 :
414 1 : std::function<void(Data2&)> callback = [&](Data2&) { BOOST_REQUIRE(false); };
415 1 : IOManager::get()->add_callback<Data2>(bad_id, callback);
416 :
417 1 : usleep(1000000);
418 :
419 1 : IOManager::get()->remove_callback<Data2>(bad_id);
420 1 : }
421 :
422 2 : BOOST_FIXTURE_TEST_CASE(NonSerializableSendReceive, ConfigurationTestFixture)
423 : {
424 1 : auto net_sender = IOManager::get()->get_sender<NonSerializableData>(conn_id);
425 1 : auto net_receiver = IOManager::get()->get_receiver<NonSerializableData>(conn_id);
426 1 : auto q_sender = IOManager::get()->get_sender<NonSerializableData>(queue_id);
427 1 : auto q_receiver = IOManager::get()->get_receiver<NonSerializableData>(queue_id);
428 :
429 1 : NonSerializableData sent_nw(56, 26.5, "test1");
430 1 : NonSerializableData sent_q(57, 27.5, "test2");
431 3 : BOOST_REQUIRE_EXCEPTION(net_sender->send(std::move(sent_nw), dunedaq::iomanager::Sender::s_no_block),
432 : NetworkMessageNotSerializable,
433 : [](NetworkMessageNotSerializable const&) { return true; });
434 :
435 1 : NonSerializableData ret;
436 3 : BOOST_REQUIRE_EXCEPTION(ret = net_receiver->receive(std::chrono::milliseconds(10)),
437 : NetworkMessageNotSerializable,
438 : [](NetworkMessageNotSerializable const&) { return true; });
439 :
440 1 : q_sender->send(std::move(sent_q), std::chrono::milliseconds(10));
441 :
442 1 : ret = q_receiver->receive(std::chrono::milliseconds(10));
443 1 : BOOST_CHECK_EQUAL(ret.d1, 57);
444 1 : BOOST_CHECK_EQUAL(ret.d2, 27.5);
445 1 : BOOST_CHECK_EQUAL(ret.d3, "test2");
446 1 : }
447 :
448 2 : BOOST_FIXTURE_TEST_CASE(NonCopyableSendReceive, ConfigurationTestFixture)
449 : {
450 1 : auto net_sender = IOManager::get()->get_sender<NonCopyableData>(conn_id);
451 1 : auto net_receiver = IOManager::get()->get_receiver<NonCopyableData>(conn_id);
452 1 : auto q_sender = IOManager::get()->get_sender<NonCopyableData>(queue_id);
453 1 : auto q_receiver = IOManager::get()->get_receiver<NonCopyableData>(queue_id);
454 :
455 1 : NonCopyableData sent_nw(56, 26.5, "test1");
456 1 : NonCopyableData sent_q(57, 27.5, "test2");
457 1 : net_sender->send(std::move(sent_nw), dunedaq::iomanager::Sender::s_no_block);
458 :
459 1 : auto ret = net_receiver->receive(std::chrono::milliseconds(10));
460 1 : BOOST_CHECK_EQUAL(ret.d1, 56);
461 1 : BOOST_CHECK_EQUAL(ret.d2, 26.5);
462 1 : BOOST_CHECK_EQUAL(ret.d3, "test1");
463 :
464 1 : q_sender->send(std::move(sent_q), std::chrono::milliseconds(10));
465 :
466 1 : ret = q_receiver->receive(std::chrono::milliseconds(10));
467 1 : BOOST_CHECK_EQUAL(ret.d1, 57);
468 1 : BOOST_CHECK_EQUAL(ret.d2, 27.5);
469 1 : BOOST_CHECK_EQUAL(ret.d3, "test2");
470 1 : }
471 :
472 2 : BOOST_FIXTURE_TEST_CASE(NonSerializableNonCopyableSendReceive, ConfigurationTestFixture)
473 : {
474 1 : auto net_sender = IOManager::get()->get_sender<NonSerializableNonCopyable>(conn_id);
475 1 : auto net_receiver = IOManager::get()->get_receiver<NonSerializableNonCopyable>(conn_id);
476 1 : auto q_sender = IOManager::get()->get_sender<NonSerializableNonCopyable>(queue_id);
477 1 : auto q_receiver = IOManager::get()->get_receiver<NonSerializableNonCopyable>(queue_id);
478 :
479 1 : NonSerializableNonCopyable sent_nw(56, 26.5, "test1");
480 1 : NonSerializableNonCopyable sent_q(57, 27.5, "test2");
481 3 : BOOST_REQUIRE_EXCEPTION(net_sender->send(std::move(sent_nw), dunedaq::iomanager::Sender::s_no_block),
482 : NetworkMessageNotSerializable,
483 : [](NetworkMessageNotSerializable const&) { return true; });
484 :
485 1 : NonSerializableNonCopyable ret;
486 3 : BOOST_REQUIRE_EXCEPTION(ret = net_receiver->receive(std::chrono::milliseconds(10)),
487 : NetworkMessageNotSerializable,
488 : [](NetworkMessageNotSerializable const&) { return true; });
489 :
490 1 : q_sender->send(std::move(sent_q), std::chrono::milliseconds(10));
491 :
492 1 : ret = q_receiver->receive(std::chrono::milliseconds(10));
493 1 : BOOST_CHECK_EQUAL(ret.d1, 57);
494 1 : BOOST_CHECK_EQUAL(ret.d2, 27.5);
495 1 : BOOST_CHECK_EQUAL(ret.d3, "test2");
496 1 : }
497 :
498 2 : BOOST_FIXTURE_TEST_CASE(CallbackRegistration, ConfigurationTestFixture)
499 : {
500 1 : auto net_sender = IOManager::get()->get_sender<Data>(conn_id);
501 1 : auto q_sender = IOManager::get()->get_sender<Data>(queue_id);
502 :
503 1 : Data sent_data_nw(56, 26.5, "test1");
504 1 : Data sent_data_q(57, 27.5, "test2");
505 1 : Data recv_data;
506 1 : std::atomic<bool> has_received_data = false;
507 :
508 4 : std::function<void(Data&)> callback = [&](Data& d) {
509 2 : has_received_data = true;
510 2 : recv_data = std::move(d);
511 3 : };
512 :
513 1 : IOManager::get()->add_callback<Data>(conn_id, callback);
514 1 : IOManager::get()->add_callback<Data>(queue_id, callback);
515 :
516 1 : usleep(1000);
517 :
518 1 : net_sender->send(std::move(sent_data_nw), dunedaq::iomanager::Sender::s_no_block);
519 :
520 2 : while (!has_received_data.load())
521 1 : usleep(1000);
522 :
523 1 : BOOST_CHECK_EQUAL(recv_data.d1, 56);
524 1 : BOOST_CHECK_EQUAL(recv_data.d2, 26.5);
525 1 : BOOST_CHECK_EQUAL(recv_data.d3, "test1");
526 :
527 1 : has_received_data = false;
528 1 : q_sender->send(std::move(sent_data_q), std::chrono::milliseconds(10));
529 :
530 2 : while (!has_received_data.load())
531 1 : usleep(1000);
532 :
533 1 : BOOST_CHECK_EQUAL(recv_data.d1, 57);
534 1 : BOOST_CHECK_EQUAL(recv_data.d2, 27.5);
535 1 : BOOST_CHECK_EQUAL(recv_data.d3, "test2");
536 :
537 1 : IOManager::get()->remove_callback<Data>(conn_id);
538 1 : IOManager::get()->remove_callback<Data>(queue_id);
539 1 : }
540 :
541 2 : BOOST_FIXTURE_TEST_CASE(NonCopyableCallbackRegistration, ConfigurationTestFixture)
542 : {
543 1 : auto net_sender = IOManager::get()->get_sender<NonCopyableData>(conn_id);
544 1 : auto q_sender = IOManager::get()->get_sender<NonCopyableData>(queue_id);
545 :
546 1 : NonCopyableData sent_data_nw(56, 26.5, "test1");
547 1 : NonCopyableData sent_data_q(57, 27.5, "test2");
548 1 : NonCopyableData recv_data;
549 1 : std::atomic<bool> has_received_data = false;
550 :
551 4 : std::function<void(NonCopyableData&)> callback = [&](NonCopyableData& d) {
552 2 : has_received_data = true;
553 2 : recv_data = std::move(d);
554 3 : };
555 :
556 1 : IOManager::get()->add_callback<NonCopyableData>(conn_id, callback);
557 1 : IOManager::get()->add_callback<NonCopyableData>(queue_id, callback);
558 :
559 1 : usleep(1000);
560 :
561 1 : net_sender->send(std::move(sent_data_nw), dunedaq::iomanager::Sender::s_no_block);
562 :
563 2 : while (!has_received_data.load())
564 1 : usleep(1000);
565 :
566 1 : BOOST_CHECK_EQUAL(recv_data.d1, 56);
567 1 : BOOST_CHECK_EQUAL(recv_data.d2, 26.5);
568 1 : BOOST_CHECK_EQUAL(recv_data.d3, "test1");
569 :
570 1 : has_received_data = false;
571 1 : q_sender->send(std::move(sent_data_q), std::chrono::milliseconds(10));
572 :
573 2 : while (!has_received_data.load())
574 1 : usleep(1000);
575 :
576 1 : BOOST_CHECK_EQUAL(recv_data.d1, 57);
577 1 : BOOST_CHECK_EQUAL(recv_data.d2, 27.5);
578 1 : BOOST_CHECK_EQUAL(recv_data.d3, "test2");
579 :
580 1 : IOManager::get()->remove_callback<NonCopyableData>(conn_id);
581 1 : IOManager::get()->remove_callback<NonCopyableData>(queue_id);
582 1 : }
583 :
584 2 : BOOST_FIXTURE_TEST_CASE(NonSerializableCallbackRegistration, ConfigurationTestFixture)
585 : {
586 1 : auto net_sender = IOManager::get()->get_sender<NonSerializableData>(conn_id);
587 1 : auto q_sender = IOManager::get()->get_sender<NonSerializableData>(queue_id);
588 :
589 1 : NonSerializableData sent_data_nw(56, 26.5, "test1");
590 1 : NonSerializableData sent_data_q(57, 27.5, "test2");
591 1 : NonSerializableData recv_data;
592 1 : std::atomic<bool> has_received_data = false;
593 :
594 2 : std::function<void(NonSerializableData&)> callback = [&](NonSerializableData& d) {
595 1 : has_received_data = true;
596 1 : recv_data = std::move(d);
597 2 : };
598 :
599 5 : BOOST_REQUIRE_EXCEPTION(IOManager::get()->add_callback<NonSerializableData>(conn_id, callback),
600 : NetworkMessageNotSerializable,
601 : [](NetworkMessageNotSerializable const&) { return true; });
602 1 : IOManager::get()->add_callback<NonSerializableData>(queue_id, callback);
603 :
604 : // Have to stop the callback from endlessly setting recv_data to default-constructed object
605 1 : IOManager::get()->remove_callback<NonSerializableData>(conn_id);
606 1 : has_received_data = false;
607 1 : q_sender->send(std::move(sent_data_q), std::chrono::milliseconds(10));
608 :
609 1 : while (!has_received_data.load())
610 0 : usleep(1000);
611 :
612 1 : BOOST_CHECK_EQUAL(recv_data.d1, 57);
613 1 : BOOST_CHECK_EQUAL(recv_data.d2, 27.5);
614 1 : BOOST_CHECK_EQUAL(recv_data.d3, "test2");
615 :
616 1 : IOManager::get()->remove_callback<NonSerializableData>(queue_id);
617 1 : }
618 :
619 2 : BOOST_FIXTURE_TEST_CASE(NonSerializableNonCopyableCallbackRegistration, ConfigurationTestFixture)
620 : {
621 1 : auto net_sender = IOManager::get()->get_sender<NonSerializableNonCopyable>(conn_id);
622 1 : auto q_sender = IOManager::get()->get_sender<NonSerializableNonCopyable>(queue_id);
623 :
624 1 : NonSerializableNonCopyable sent_data_nw(56, 26.5, "test1");
625 1 : NonSerializableNonCopyable sent_data_q(57, 27.5, "test2");
626 1 : NonSerializableNonCopyable recv_data;
627 1 : std::atomic<bool> has_received_data = false;
628 :
629 2 : std::function<void(NonSerializableNonCopyable&)> callback = [&](NonSerializableNonCopyable& d) {
630 1 : has_received_data = true;
631 1 : recv_data = std::move(d);
632 2 : };
633 :
634 5 : BOOST_REQUIRE_EXCEPTION(IOManager::get()->add_callback<NonSerializableNonCopyable>(conn_id, callback),
635 : NetworkMessageNotSerializable,
636 : [](NetworkMessageNotSerializable const&) { return true; });
637 1 : IOManager::get()->add_callback<NonSerializableNonCopyable>(queue_id, callback);
638 :
639 1 : usleep(1000);
640 :
641 : // Have to stop the callback from endlessly setting recv_data to default-constructed object
642 1 : IOManager::get()->remove_callback<NonSerializableNonCopyable>(conn_id);
643 1 : has_received_data = false;
644 1 : q_sender->send(std::move(sent_data_q), std::chrono::milliseconds(10));
645 :
646 2 : while (!has_received_data.load())
647 1 : usleep(1000);
648 :
649 1 : BOOST_CHECK_EQUAL(recv_data.d1, 57);
650 1 : BOOST_CHECK_EQUAL(recv_data.d2, 27.5);
651 1 : BOOST_CHECK_EQUAL(recv_data.d3, "test2");
652 :
653 1 : IOManager::get()->remove_callback<NonSerializableNonCopyable>(queue_id);
654 1 : }
655 :
656 2 : BOOST_FIXTURE_TEST_CASE(GetDatatype, ConfigurationTestFixture)
657 : {
658 1 : auto networkDataTypes = IOManager::get()->get_datatypes("network");
659 1 : auto queueDataTypes = IOManager::get()->get_datatypes("queue");
660 1 : auto testDataTypes = IOManager::get()->get_datatypes("test");
661 1 : auto invalidDataTypes = IOManager::get()->get_datatypes("NotAConnectionUID");
662 :
663 1 : BOOST_REQUIRE_EQUAL(networkDataTypes.size(), 1);
664 1 : BOOST_REQUIRE_EQUAL(networkDataTypes.count("data_t"), 1);
665 :
666 1 : BOOST_REQUIRE_EQUAL(queueDataTypes.size(), 1);
667 1 : BOOST_REQUIRE_EQUAL(queueDataTypes.count("data_t"), 1);
668 :
669 1 : BOOST_REQUIRE_EQUAL(testDataTypes.size(), 2);
670 1 : BOOST_REQUIRE_EQUAL(testDataTypes.count("data2_t"), 1);
671 1 : BOOST_REQUIRE_EQUAL(testDataTypes.count("data3_t"), 1);
672 :
673 1 : BOOST_REQUIRE_EQUAL(invalidDataTypes.size(), 0);
674 1 : }
675 :
676 : // TODO: Eric Flumerfelt <eflumerf@github.com>, June-16-2022: Reimplement this test for IOManager
677 : /*
678 : BOOST_FIXTURE_TEST_CASE(SendThreadSafety, NetworkManagerTestFixture)
679 : {
680 : TLOG_DEBUG(12) << "SendThreadSafety test case BEGIN";
681 :
682 : auto substr_proc = [&](int idx) {
683 : const std::string pattern_string =
684 : "aaaaabbbbbcccccdddddeeeeefffffggggghhhhhiiiiijjjjjkkkkklllllmmmmmnnnnnooooopppppqqqqqrrrrrssssstttttuuuuuvvvvvww"
685 : "wwwxxxxxyyyyyzzzzzAAAAABBBBBCCCCCDDDDDEEEEEFFFFFGGGGGHHHHHIIIIIJJJJJKKKKKLLLLLMMMMMNNNNNOOOOOPPPPPQQQQQRRRRRSSSS"
686 : "STTTTTUUUUUVVVVVWWWWWXXXXXYYYYYZZZZZ";
687 : auto string_idx = idx % pattern_string.size();
688 : if (string_idx + 5 < pattern_string.size()) {
689 : return pattern_string.substr(string_idx, 5);
690 : } else {
691 : return pattern_string.substr(string_idx, 5) + pattern_string.substr(0, string_idx - pattern_string.size() + 5);
692 : }
693 : };
694 :
695 : auto send_proc = [&](int idx) {
696 : std::string buf = std::to_string(idx) + substr_proc(idx);
697 : TLOG_DEBUG(10) << "Sending " << buf << " for idx " << idx;
698 : NetworkManager::get().send_to("foo", buf.c_str(), buf.size(), dunedaq::ipm::Sender::s_block);
699 : };
700 :
701 : auto recv_proc = [&](dunedaq::ipm::Receiver::Response response) {
702 : BOOST_REQUIRE(response.data.size() > 0);
703 : auto received_idx = std::stoi(std::string(response.data.begin(), response.data.end()));
704 : auto idx_string = std::to_string(received_idx);
705 : auto received_string = std::string(response.data.begin() + idx_string.size(), response.data.end());
706 :
707 : TLOG_DEBUG(11) << "Received " << received_string << " for idx " << received_idx;
708 :
709 : BOOST_REQUIRE_EQUAL(received_string.size(), 5);
710 :
711 : std::string check = substr_proc(received_idx);
712 :
713 : BOOST_REQUIRE_EQUAL(received_string, check);
714 : };
715 :
716 : NetworkManager::get().start_listening("foo");
717 :
718 : NetworkManager::get().register_callback("foo", recv_proc);
719 :
720 : const int thread_count = 1000;
721 : std::array<std::thread, thread_count> threads;
722 :
723 : TLOG_DEBUG(12) << "Before starting send threads";
724 : for (int idx = 0; idx < thread_count; ++idx) {
725 : threads[idx] = std::thread(send_proc, idx);
726 : }
727 : TLOG_DEBUG(12) << "After starting send threads";
728 : for (int idx = 0; idx < thread_count; ++idx) {
729 : threads[idx].join();
730 : }
731 : TLOG_DEBUG(12) << "SendThreadSafety test case END";
732 : }
733 : */
734 :
735 : // TODO: Eric Flumerfelt <eflumerf@github.com>, June-16-2022: Reimplement this test for IOManager
736 : /*
737 : BOOST_FIXTURE_TEST_CASE(OneListenerThreaded, NetworkManagerTestFixture)
738 : {
739 : auto callback = [&](dunedaq::ipm::Receiver::Response) { return; };
740 : const int thread_count = 1000;
741 : std::atomic<size_t> num_connected = 0;
742 : std::atomic<size_t> num_fail = 0;
743 :
744 : auto reg_proc = [&](int idx) {
745 : try {
746 : NetworkManager::get().start_listening("foo");
747 : NetworkManager::get().register_callback("foo", callback);
748 : } catch (ListenerAlreadyRegistered const&) {
749 : num_fail++;
750 : TLOG_DEBUG(13) << "Listener " << idx << " failed to register";
751 : return;
752 : }
753 : TLOG_DEBUG(13) << "Listener " << idx << " successfully started";
754 : num_connected++;
755 : };
756 :
757 : std::array<std::thread, thread_count> threads;
758 :
759 : for (int idx = 0; idx < thread_count; ++idx) {
760 : threads[idx] = std::thread(reg_proc, idx);
761 : }
762 : for (int idx = 0; idx < thread_count; ++idx) {
763 : threads[idx].join();
764 : }
765 :
766 : BOOST_REQUIRE_EQUAL(num_connected.load(), 1);
767 : BOOST_REQUIRE_EQUAL(num_fail.load(), thread_count - 1);
768 : }
769 : */
770 :
771 : // TODO: Eric Flumerfelt <eflumerf@github.com>, June-16-2022: Reimplement this test for IOManager
772 : /*
773 : BOOST_AUTO_TEST_CASE(ManyThreadsSendingAndReceiving)
774 : {
775 : const int num_sending_threads = 100;
776 : const int num_receivers = 50;
777 :
778 : nwmgr::Connections testConfig;
779 : for (int i = 0; i < num_receivers; ++i) {
780 : nwmgr::Connection testConn;
781 : testConn.name = "foo" + std::to_string(i);
782 : testConn.address = "inproc://bar" + std::to_string(i);
783 : testConfig.push_back(testConn);
784 : }
785 : NetworkManager::get().configure(testConfig);
786 :
787 : auto substr_proc = [](int idx) {
788 : const std::string pattern_string =
789 : "aaaaabbbbbcccccdddddeeeeefffffggggghhhhhiiiiijjjjjkkkkklllllmmmmmnnnnnooooopppppqqqqqrrrrrssssstttttuuuuuvvvvvww"
790 : "wwwxxxxxyyyyyzzzzzAAAAABBBBBCCCCCDDDDDEEEEEFFFFFGGGGGHHHHHIIIIIJJJJJKKKKKLLLLLMMMMMNNNNNOOOOOPPPPPQQQQQRRRRRSSSS"
791 : "STTTTTUUUUUVVVVVWWWWWXXXXXYYYYYZZZZZ";
792 :
793 : auto string_idx = idx % pattern_string.size();
794 : if (string_idx + 5 < pattern_string.size()) {
795 : return pattern_string.substr(string_idx, 5);
796 : } else {
797 : return pattern_string.substr(string_idx, 5) + pattern_string.substr(0, string_idx - pattern_string.size() + 5);
798 : }
799 : };
800 : auto send_proc = [&](int idx) {
801 : std::string buf = std::to_string(idx) + substr_proc(idx);
802 : for (int i = 0; i < num_receivers; ++i) {
803 : TLOG_DEBUG(14) << "Sending " << buf << " for idx " << idx << " to receiver " << i;
804 : NetworkManager::get().send_to("foo" + std::to_string(i), buf.c_str(), buf.size(), dunedaq::ipm::Sender::s_block);
805 : }
806 : };
807 :
808 : std::array<std::atomic<size_t>, num_receivers> messages_received;
809 : std::array<std::atomic<size_t>, num_receivers> num_empty_responses;
810 : std::array<std::atomic<size_t>, num_receivers> num_size_errors;
811 : std::array<std::atomic<size_t>, num_receivers> num_content_errors;
812 : std::array<std::function<void(dunedaq::ipm::Receiver::Response)>, num_receivers> recv_procs;
813 :
814 : for (int i = 0; i < num_receivers; ++i) {
815 : messages_received[i] = 0;
816 : num_empty_responses[i] = 0;
817 : num_size_errors[i] = 0;
818 : num_content_errors[i] = 0;
819 : recv_procs[i] = [&, i](dunedaq::ipm::Receiver::Response response) {
820 : if (response.data.size() == 0) {
821 : num_empty_responses[i]++;
822 : }
823 : auto received_idx = std::stoi(std::string(response.data.begin(), response.data.end()));
824 : auto idx_string = std::to_string(received_idx);
825 : auto received_string = std::string(response.data.begin() + idx_string.size(), response.data.end());
826 :
827 : TLOG_DEBUG(14) << "Receiver " << i << " received " << received_string << " for idx " << received_idx;
828 :
829 : if (received_string.size() != 5) {
830 : num_size_errors[i]++;
831 : }
832 :
833 : std::string check = substr_proc(received_idx);
834 :
835 : if (received_string != check) {
836 : num_content_errors[i]++;
837 : }
838 : messages_received[i]++;
839 : };
840 : NetworkManager::get().start_listening("foo" + std::to_string(i));
841 : NetworkManager::get().register_callback("foo" + std::to_string(i), recv_procs[i]);
842 : }
843 :
844 : std::array<std::thread, num_sending_threads> threads;
845 :
846 : TLOG_DEBUG(14) << "Before starting send threads";
847 : for (int idx = 0; idx < num_sending_threads; ++idx) {
848 : threads[idx] = std::thread(send_proc, idx);
849 : }
850 : TLOG_DEBUG(14) << "After starting send threads";
851 : for (int idx = 0; idx < num_sending_threads; ++idx) {
852 : threads[idx].join();
853 : }
854 :
855 : TLOG_DEBUG(14) << "Sleeping to allow all messages to be processed";
856 : std::this_thread::sleep_for(std::chrono::seconds(1));
857 :
858 : for (auto i = 0; i < num_receivers; ++i) {
859 : TLOG_DEBUG(14) << "Shutting down receiver " << i;
860 : NetworkManager::get().stop_listening("foo" + std::to_string(i));
861 : BOOST_CHECK_EQUAL(messages_received[i], num_sending_threads);
862 : BOOST_REQUIRE_EQUAL(num_empty_responses[i], 0);
863 : BOOST_REQUIRE_EQUAL(num_size_errors[i], 0);
864 : BOOST_REQUIRE_EQUAL(num_content_errors[i], 0);
865 : }
866 :
867 : TLOG_DEBUG(14) << "Resetting NetworkManager";
868 : NetworkManager::get().reset();
869 : }
870 : */
871 :
872 : BOOST_AUTO_TEST_SUITE_END()
|