Line data Source code
1 : /**
2 : * @file ZmqPubSub_test.cxx Test ZmqPublisher to ZmqSubscriber transfer
3 : *
4 : * This is part of the DUNE DAQ Application Framework, copyright 2020.
5 : * Licensing/copyright details are in the COPYING file that you should have
6 : * received with this code.
7 : */
8 :
9 : #include "ipm/Sender.hpp"
10 : #include "ipm/Subscriber.hpp"
11 :
12 : #include "logging/Logging.hpp"
13 :
14 : #define BOOST_TEST_MODULE ZmqPubSub_test // NOLINT
15 :
16 : #include "boost/test/unit_test.hpp"
17 :
18 : #include <string>
19 : #include <vector>
20 :
21 : using namespace dunedaq::ipm;
22 :
23 : BOOST_AUTO_TEST_SUITE(ZmqPubSub_test)
24 :
25 : size_t
26 2 : elapsed_time_milliseconds(std::chrono::steady_clock::time_point const& then)
27 : {
28 2 : return static_cast<size_t>(
29 2 : std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - then).count());
30 : }
31 :
32 2 : BOOST_AUTO_TEST_CASE(SendReceiveTest)
33 : {
34 1 : auto the_receiver = make_ipm_subscriber("ZmqSubscriber");
35 1 : BOOST_REQUIRE(the_receiver != nullptr);
36 1 : BOOST_REQUIRE(!the_receiver->can_receive());
37 :
38 1 : auto the_sender = make_ipm_sender("ZmqPublisher");
39 1 : BOOST_REQUIRE(the_sender != nullptr);
40 1 : BOOST_REQUIRE(!the_sender->can_send());
41 :
42 1 : Sender::ConnectionInfo sender_config("test_sender", "inproc://default");
43 1 : Receiver::ConnectionInfo receiver_config("test_receiver", "inproc://default");
44 :
45 1 : the_sender->connect_for_sends(sender_config);
46 1 : the_receiver->connect_for_receives(receiver_config);
47 :
48 1 : BOOST_REQUIRE(the_receiver->can_receive());
49 1 : BOOST_REQUIRE(the_sender->can_send());
50 :
51 1 : BOOST_REQUIRE(!the_receiver->data_pending());
52 :
53 1 : the_receiver->subscribe("testTopic");
54 :
55 1 : std::vector<char> test_data{ 'T', 'E', 'S', 'T' };
56 :
57 1 : the_sender->send(test_data.data(), test_data.size(), Sender::s_no_block, "ignoredTopic");
58 1 : auto before_recv = std::chrono::steady_clock::now();
59 2 : BOOST_REQUIRE_EXCEPTION(
60 : the_receiver->receive(std::chrono::milliseconds(100)),
61 : dunedaq::ipm::ReceiveTimeoutExpired,
62 : [&](dunedaq::ipm::ReceiveTimeoutExpired) { return elapsed_time_milliseconds(before_recv) >= 100; });
63 :
64 1 : the_sender->send(test_data.data(), test_data.size(), Sender::s_no_block, "testTopic");
65 1 : BOOST_REQUIRE(the_receiver->data_pending());
66 1 : auto response = the_receiver->receive(Receiver::s_block);
67 1 : BOOST_REQUIRE(!the_receiver->data_pending());
68 1 : BOOST_REQUIRE_EQUAL(response.data.size(), 4);
69 1 : BOOST_REQUIRE_EQUAL(response.data[0], 'T');
70 1 : BOOST_REQUIRE_EQUAL(response.data[1], 'E');
71 1 : BOOST_REQUIRE_EQUAL(response.data[2], 'S');
72 1 : BOOST_REQUIRE_EQUAL(response.data[3], 'T');
73 :
74 1 : the_receiver->unsubscribe("testTopic");
75 1 : the_sender->send(test_data.data(), test_data.size(), Sender::s_no_block, "testTopic");
76 1 : BOOST_REQUIRE(!the_receiver->data_pending());
77 2 : BOOST_REQUIRE_EXCEPTION(
78 : the_receiver->receive(std::chrono::milliseconds(2000)),
79 : dunedaq::ipm::ReceiveTimeoutExpired,
80 : [&](dunedaq::ipm::ReceiveTimeoutExpired) { return elapsed_time_milliseconds(before_recv) >= 2000; });
81 1 : }
82 :
83 2 : BOOST_AUTO_TEST_CASE(CallbackTest)
84 : {
85 :
86 1 : auto the_receiver = make_ipm_subscriber("ZmqSubscriber");
87 1 : BOOST_REQUIRE(the_receiver != nullptr);
88 1 : BOOST_REQUIRE(!the_receiver->can_receive());
89 :
90 1 : auto the_sender = make_ipm_sender("ZmqPublisher");
91 1 : BOOST_REQUIRE(the_sender != nullptr);
92 1 : BOOST_REQUIRE(!the_sender->can_send());
93 :
94 1 : Sender::ConnectionInfo sender_config("test_sender", "inproc://default");
95 1 : Receiver::ConnectionInfo receiver_config("test_receiver", "inproc://default");
96 1 : the_sender->connect_for_sends(sender_config);
97 1 : the_receiver->connect_for_receives(receiver_config);
98 :
99 1 : BOOST_REQUIRE(the_receiver->can_receive());
100 1 : BOOST_REQUIRE(the_sender->can_send());
101 :
102 1 : the_receiver->subscribe("testTopic");
103 :
104 1 : std::vector<char> test_data{ 'T', 'E', 'S', 'T' };
105 1 : std::atomic<bool> message_received = false;
106 :
107 2 : auto callback_fun = [&](Receiver::Response& res) {
108 2 : TLOG() << "Callback function called with res.data.size() == " << static_cast<int>(res.data.size());
109 1 : BOOST_REQUIRE_EQUAL(res.data.size(), test_data.size());
110 13 : for (size_t ii = 0; ii < res.data.size(); ++ii) {
111 12 : BOOST_REQUIRE_EQUAL(res.data[ii], test_data[ii]);
112 : }
113 1 : message_received = true;
114 2 : };
115 1 : the_receiver->register_callback(callback_fun);
116 :
117 1 : the_sender->send(test_data.data(), test_data.size(), Sender::s_no_block);
118 1 : usleep(100000);
119 1 : BOOST_REQUIRE_EQUAL(message_received, false);
120 :
121 1 : message_received = false;
122 1 : test_data = { 'A', 'N', 'O', 'T', 'H', 'E', 'R', ' ', 'T', 'E', 'S', 'T' };
123 1 : the_sender->send(test_data.data(), test_data.size(), Sender::s_no_block, "testTopic");
124 3 : while (!message_received.load()) {
125 2 : usleep(15000);
126 : }
127 :
128 1 : message_received = false;
129 1 : test_data = { 'A', ' ', 'T', 'H', 'I', 'R', 'D', ' ', 'T', 'E', 'S', 'T' };
130 1 : the_sender->send(test_data.data(), test_data.size(), Sender::s_no_block, "ignoredTopic");
131 1 : usleep(100000);
132 1 : BOOST_REQUIRE_EQUAL(message_received, false);
133 :
134 1 : the_receiver->unregister_callback();
135 1 : message_received = false;
136 1 : test_data = { 'A', ' ', 'F', 'O', 'U', 'R', 'T', 'H', ' ', 'T', 'E', 'S', 'T' };
137 1 : the_sender->send(test_data.data(), test_data.size(), Sender::s_no_block, "testTopic");
138 :
139 1 : usleep(100000);
140 1 : BOOST_REQUIRE_EQUAL(message_received, false);
141 1 : auto response = the_receiver->receive(Receiver::s_block);
142 1 : BOOST_REQUIRE_EQUAL(response.data.size(), test_data.size());
143 1 : BOOST_REQUIRE(!the_receiver->data_pending());
144 1 : }
145 :
146 2 : BOOST_AUTO_TEST_CASE(MultiplePublishers)
147 : {
148 1 : auto first_publisher = make_ipm_sender("ZmqPublisher");
149 1 : auto second_publisher = make_ipm_sender("ZmqPublisher");
150 1 : auto the_subscriber = make_ipm_subscriber("ZmqSubscriber");
151 :
152 1 : Sender::ConnectionInfo first_sender_config("test_sender", "inproc://foo");
153 1 : Sender::ConnectionInfo second_sender_config("test_sender", "inproc://bar");
154 1 : Receiver::ConnectionInfo receiver_config("test_receiver", "", { "inproc://foo", "inproc://bar" });
155 1 : first_publisher->connect_for_sends(first_sender_config);
156 1 : second_publisher->connect_for_sends(second_sender_config);
157 1 : the_subscriber->connect_for_receives(receiver_config);
158 :
159 1 : the_subscriber->subscribe("testTopic");
160 :
161 2 : std::vector<char> test_data{ 'T', 'E', 'S', 'T' };
162 1 : first_publisher->send(test_data.data(), test_data.size(), Sender::s_no_block, "testTopic");
163 1 : auto response = the_subscriber->receive(Receiver::s_block);
164 1 : BOOST_REQUIRE_EQUAL(response.data.size(), 4);
165 1 : BOOST_REQUIRE_EQUAL(response.data[0], 'T');
166 1 : BOOST_REQUIRE_EQUAL(response.data[1], 'E');
167 1 : BOOST_REQUIRE_EQUAL(response.data[2], 'S');
168 1 : BOOST_REQUIRE_EQUAL(response.data[3], 'T');
169 :
170 1 : second_publisher->send(test_data.data(), test_data.size(), Sender::s_no_block, "testTopic");
171 1 : auto response2 = the_subscriber->receive(Receiver::s_block);
172 1 : BOOST_REQUIRE_EQUAL(response2.data.size(), 4);
173 1 : BOOST_REQUIRE_EQUAL(response2.data[0], 'T');
174 1 : BOOST_REQUIRE_EQUAL(response2.data[1], 'E');
175 1 : BOOST_REQUIRE_EQUAL(response2.data[2], 'S');
176 1 : BOOST_REQUIRE_EQUAL(response2.data[3], 'T');
177 1 : BOOST_REQUIRE(!the_subscriber->data_pending());
178 1 : }
179 :
180 : BOOST_AUTO_TEST_SUITE_END()
|