Line data Source code
1 : /**
2 : * @file ZmqPubSub_test.cxx Test ZmqPublisher to ZmqSubscriber transfer
3 : *
4 : * This is part of the DUNE DAQ Application Framework, copyright 2020.
5 : * Licensing/copyright details are in the COPYING file that you should have
6 : * received with this code.
7 : */
8 :
9 : #include "ipm/Sender.hpp"
10 : #include "ipm/Subscriber.hpp"
11 :
12 : #include "logging/Logging.hpp"
13 :
14 : #define BOOST_TEST_MODULE ZmqPubSub_test // NOLINT
15 :
16 : #include "boost/test/unit_test.hpp"
17 :
18 : #include <string>
19 : #include <vector>
20 :
21 : using namespace dunedaq::ipm;
22 :
23 : BOOST_AUTO_TEST_SUITE(ZmqPubSub_test)
24 :
25 : size_t
26 2 : elapsed_time_milliseconds(std::chrono::steady_clock::time_point const& then)
27 : {
28 2 : return static_cast<size_t>(
29 2 : std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - then).count());
30 : }
31 :
32 2 : BOOST_AUTO_TEST_CASE(SendReceiveTest)
33 : {
34 1 : auto the_receiver = make_ipm_subscriber("ZmqSubscriber");
35 1 : BOOST_REQUIRE(the_receiver != nullptr);
36 1 : BOOST_REQUIRE(!the_receiver->can_receive());
37 :
38 1 : auto the_sender = make_ipm_sender("ZmqPublisher");
39 1 : BOOST_REQUIRE(the_sender != nullptr);
40 1 : BOOST_REQUIRE(!the_sender->can_send());
41 :
42 1 : nlohmann::json config_json;
43 1 : config_json["connection_string"] = "inproc://default";
44 1 : the_sender->connect_for_sends(config_json);
45 1 : the_receiver->connect_for_receives(config_json);
46 :
47 1 : BOOST_REQUIRE(the_receiver->can_receive());
48 1 : BOOST_REQUIRE(the_sender->can_send());
49 :
50 1 : the_receiver->subscribe("testTopic");
51 :
52 1 : std::vector<char> test_data{ 'T', 'E', 'S', 'T' };
53 :
54 1 : the_sender->send(test_data.data(), test_data.size(), Sender::s_no_block, "ignoredTopic");
55 1 : auto before_recv = std::chrono::steady_clock::now();
56 3 : BOOST_REQUIRE_EXCEPTION(
57 : the_receiver->receive(std::chrono::milliseconds(100)),
58 : dunedaq::ipm::ReceiveTimeoutExpired,
59 : [&](dunedaq::ipm::ReceiveTimeoutExpired) { return elapsed_time_milliseconds(before_recv) >= 100; });
60 :
61 1 : the_sender->send(test_data.data(), test_data.size(), Sender::s_no_block, "testTopic");
62 1 : auto response = the_receiver->receive(Receiver::s_block);
63 1 : BOOST_REQUIRE_EQUAL(response.data.size(), 4);
64 1 : BOOST_REQUIRE_EQUAL(response.data[0], 'T');
65 1 : BOOST_REQUIRE_EQUAL(response.data[1], 'E');
66 1 : BOOST_REQUIRE_EQUAL(response.data[2], 'S');
67 1 : BOOST_REQUIRE_EQUAL(response.data[3], 'T');
68 :
69 1 : the_receiver->unsubscribe("testTopic");
70 1 : the_sender->send(test_data.data(), test_data.size(), Sender::s_no_block, "testTopic");
71 3 : BOOST_REQUIRE_EXCEPTION(
72 : the_receiver->receive(std::chrono::milliseconds(2000)),
73 : dunedaq::ipm::ReceiveTimeoutExpired,
74 : [&](dunedaq::ipm::ReceiveTimeoutExpired) { return elapsed_time_milliseconds(before_recv) >= 2000; });
75 1 : }
76 :
77 2 : BOOST_AUTO_TEST_CASE(CallbackTest)
78 : {
79 :
80 1 : auto the_receiver = make_ipm_subscriber("ZmqSubscriber");
81 1 : BOOST_REQUIRE(the_receiver != nullptr);
82 1 : BOOST_REQUIRE(!the_receiver->can_receive());
83 :
84 1 : auto the_sender = make_ipm_sender("ZmqPublisher");
85 1 : BOOST_REQUIRE(the_sender != nullptr);
86 1 : BOOST_REQUIRE(!the_sender->can_send());
87 :
88 1 : nlohmann::json config_json;
89 1 : config_json["connection_string"] = "inproc://default";
90 1 : the_sender->connect_for_sends(config_json);
91 1 : the_receiver->connect_for_receives(config_json);
92 :
93 1 : BOOST_REQUIRE(the_receiver->can_receive());
94 1 : BOOST_REQUIRE(the_sender->can_send());
95 :
96 1 : the_receiver->subscribe("testTopic");
97 :
98 1 : std::vector<char> test_data{ 'T', 'E', 'S', 'T' };
99 1 : std::atomic<bool> message_received = false;
100 :
101 2 : auto callback_fun = [&](Receiver::Response& res) {
102 2 : TLOG() << "Callback function called with res.data.size() == " << static_cast<int>(res.data.size());
103 1 : BOOST_REQUIRE_EQUAL(res.data.size(), test_data.size());
104 13 : for (size_t ii = 0; ii < res.data.size(); ++ii) {
105 12 : BOOST_REQUIRE_EQUAL(res.data[ii], test_data[ii]);
106 : }
107 1 : message_received = true;
108 2 : };
109 1 : the_receiver->register_callback(callback_fun);
110 :
111 1 : the_sender->send(test_data.data(), test_data.size(), Sender::s_no_block);
112 1 : usleep(100000);
113 1 : BOOST_REQUIRE_EQUAL(message_received, false);
114 :
115 1 : message_received = false;
116 1 : test_data = { 'A', 'N', 'O', 'T', 'H', 'E', 'R', ' ', 'T', 'E', 'S', 'T' };
117 1 : the_sender->send(test_data.data(), test_data.size(), Sender::s_no_block, "testTopic");
118 3 : while (!message_received.load()) {
119 2 : usleep(15000);
120 : }
121 :
122 1 : message_received = false;
123 1 : test_data = { 'A', ' ', 'T', 'H', 'I', 'R', 'D', ' ', 'T', 'E', 'S', 'T' };
124 1 : the_sender->send(test_data.data(), test_data.size(), Sender::s_no_block, "ignoredTopic");
125 1 : usleep(100000);
126 1 : BOOST_REQUIRE_EQUAL(message_received, false);
127 :
128 1 : the_receiver->unregister_callback();
129 1 : message_received = false;
130 1 : test_data = { 'A', ' ', 'F', 'O', 'U', 'R', 'T', 'H', ' ', 'T', 'E', 'S', 'T' };
131 1 : the_sender->send(test_data.data(), test_data.size(), Sender::s_no_block, "testTopic");
132 :
133 1 : usleep(100000);
134 1 : BOOST_REQUIRE_EQUAL(message_received, false);
135 1 : auto response = the_receiver->receive(Receiver::s_block);
136 1 : BOOST_REQUIRE_EQUAL(response.data.size(), test_data.size());
137 1 : }
138 :
139 2 : BOOST_AUTO_TEST_CASE(MultiplePublishers)
140 : {
141 1 : auto first_publisher = make_ipm_sender("ZmqPublisher");
142 1 : auto second_publisher = make_ipm_sender("ZmqPublisher");
143 1 : auto the_subscriber = make_ipm_subscriber("ZmqSubscriber");
144 :
145 1 : nlohmann::json first_json;
146 1 : nlohmann::json second_json;
147 1 : nlohmann::json sub_json;
148 1 : first_json["connection_string"] = "inproc://foo";
149 1 : first_publisher->connect_for_sends(first_json);
150 1 : second_json["connection_string"] = "inproc://bar";
151 1 : second_publisher->connect_for_sends(second_json);
152 3 : sub_json["connection_strings"] = { "inproc://foo", "inproc://bar" };
153 1 : the_subscriber->connect_for_receives(sub_json);
154 :
155 1 : the_subscriber->subscribe("testTopic");
156 :
157 1 : std::vector<char> test_data{ 'T', 'E', 'S', 'T' };
158 1 : first_publisher->send(test_data.data(), test_data.size(), Sender::s_no_block, "testTopic");
159 1 : auto response = the_subscriber->receive(Receiver::s_block);
160 1 : BOOST_REQUIRE_EQUAL(response.data.size(), 4);
161 1 : BOOST_REQUIRE_EQUAL(response.data[0], 'T');
162 1 : BOOST_REQUIRE_EQUAL(response.data[1], 'E');
163 1 : BOOST_REQUIRE_EQUAL(response.data[2], 'S');
164 1 : BOOST_REQUIRE_EQUAL(response.data[3], 'T');
165 :
166 1 : second_publisher->send(test_data.data(), test_data.size(), Sender::s_no_block, "testTopic");
167 1 : auto response2 = the_subscriber->receive(Receiver::s_block);
168 1 : BOOST_REQUIRE_EQUAL(response2.data.size(), 4);
169 1 : BOOST_REQUIRE_EQUAL(response2.data[0], 'T');
170 1 : BOOST_REQUIRE_EQUAL(response2.data[1], 'E');
171 1 : BOOST_REQUIRE_EQUAL(response2.data[2], 'S');
172 1 : BOOST_REQUIRE_EQUAL(response2.data[3], 'T');
173 2 : }
174 :
175 : BOOST_AUTO_TEST_SUITE_END()
|