Line data Source code
1 : /**
2 : * @file performance_test.cxx Performance Unit Tests
3 : *
4 : * This is part of the DUNE DAQ Application Framework, copyright 2021.
5 : * Licensing/copyright details are in the COPYING file that you should have
6 : * received with this code.
7 : */
8 :
9 : #include "iomanager/IOManager.hpp"
10 :
11 : #include "serialization/Serialization.hpp"
12 :
13 : #define BOOST_TEST_MODULE performance_test // NOLINT
14 :
15 : #include "boost/test/unit_test.hpp"
16 : #include "opmonlib/TestOpMonManager.hpp"
17 :
18 : #include <atomic>
19 : #include <functional>
20 : #include <future>
21 : #include <memory>
22 : #include <string>
23 : #include <utility>
24 : #include <vector>
25 :
26 : using namespace dunedaq::iomanager;
27 :
28 : namespace dunedaq {
29 : struct data_t
30 : {
31 : std::vector<uint8_t> d; // NOLINT(build/unsigned)
32 : data_t() = default;
33 40000 : data_t(unsigned int size, uint8_t c) // NOLINT(build/unsigned)
34 40000 : : d(size, c)
35 : {
36 40000 : ;
37 40000 : }
38 40000 : DUNE_DAQ_SERIALIZE(data_t, d);
39 : };
40 40016 : DUNE_DAQ_SERIALIZABLE(data_t, "data_t");
41 :
42 : } // namespace dunedaq
43 :
44 : BOOST_AUTO_TEST_SUITE(performance_test)
45 :
46 : struct ConfigurationTestFixture
47 : {
48 4 : ConfigurationTestFixture()
49 4 : {
50 4 : confdb = std::make_shared<dunedaq::conffwk::Configuration>("oksconflibs:test/config/iomanager_test.data.xml");
51 4 : confdb->get<dunedaq::confmodel::Queue>(queues);
52 4 : confdb->get<dunedaq::confmodel::NetworkConnection>(connections);
53 :
54 4 : IOManager::get()->configure("performance_t", queues, connections, nullptr, opmgr); // Not using connectivity service
55 4 : }
56 4 : ~ConfigurationTestFixture() { IOManager::get()->reset(); }
57 :
58 : ConfigurationTestFixture(ConfigurationTestFixture const&) = delete;
59 : ConfigurationTestFixture(ConfigurationTestFixture&&) = delete;
60 : ConfigurationTestFixture& operator=(ConfigurationTestFixture const&) = delete;
61 : ConfigurationTestFixture& operator=(ConfigurationTestFixture&&) = delete;
62 :
63 : dunedaq::opmonlib::TestOpMonManager opmgr;
64 :
65 : dunedaq::iomanager::ConnectionId network_id;
66 : dunedaq::iomanager::ConnectionId queue_id;
67 : std::shared_ptr<dunedaq::conffwk::Configuration> confdb;
68 : std::vector<const dunedaq::confmodel::Queue*> queues;
69 : std::vector<const dunedaq::confmodel::NetworkConnection*> connections;
70 : const size_t n_sends = 10000;
71 : const size_t message_size = 55680;
72 : };
73 :
74 2 : BOOST_FIXTURE_TEST_CASE(CallbackRegistrationNetwork, ConfigurationTestFixture)
75 : {
76 1 : std::atomic<unsigned int> received_count = 0;
77 10001 : std::function<void(dunedaq::data_t)> callback = [&](dunedaq::data_t) { ++received_count; }; // NOLINT
78 :
79 1 : IOManager::get()->add_callback<dunedaq::data_t>("network", callback);
80 1 : auto net_sender = IOManager::get()->get_sender<dunedaq::data_t>("network");
81 1 : auto start_time = std::chrono::steady_clock::now();
82 10001 : for (unsigned int i = 0; i < n_sends; ++i) {
83 10000 : dunedaq::data_t temp(message_size, i % 200);
84 10000 : net_sender->send(std::move(temp), Sender::s_no_block);
85 10000 : }
86 1 : BOOST_TEST_MESSAGE("Messages sent, waiting for receives");
87 2 : while (received_count < n_sends) {
88 1 : usleep(1000);
89 : }
90 :
91 1 : IOManager::get()->remove_callback<dunedaq::data_t>("network");
92 1 : auto stop_time = std::chrono::steady_clock::now();
93 :
94 1 : auto time = std::chrono::duration_cast<std::chrono::microseconds>(stop_time - start_time).count();
95 2 : double rate = received_count.load() / static_cast<double>(time) * 1e6; // Hz
96 :
97 1 : BOOST_CHECK(rate > 0.);
98 1 : BOOST_TEST_MESSAGE("network callback rate " << rate << " Hz");
99 1 : }
100 :
101 2 : BOOST_FIXTURE_TEST_CASE(CallbackRegistrationQueue, ConfigurationTestFixture)
102 : {
103 1 : std::atomic<unsigned int> received_count = 0;
104 10001 : std::function<void(dunedaq::data_t)> callback = [&](dunedaq::data_t) { ++received_count; }; // NOLINT
105 :
106 1 : IOManager::get()->add_callback<dunedaq::data_t>("queue", callback);
107 1 : auto queue_sender = IOManager::get()->get_sender<dunedaq::data_t>("queue");
108 1 : auto start_time = std::chrono::steady_clock::now();
109 10001 : for (unsigned int i = 0; i < n_sends; ++i) {
110 10000 : dunedaq::data_t temp(message_size, i % 200);
111 10000 : queue_sender->send(std::move(temp), std::chrono::milliseconds(1000));
112 10000 : }
113 1 : BOOST_TEST_MESSAGE("Messages sent, waiting for receives");
114 1 : while (received_count < n_sends) {
115 0 : usleep(1000);
116 : }
117 1 : IOManager::get()->remove_callback<dunedaq::data_t>("queue");
118 1 : auto stop_time = std::chrono::steady_clock::now();
119 :
120 1 : auto time = std::chrono::duration_cast<std::chrono::microseconds>(stop_time - start_time).count();
121 2 : double rate = received_count.load() / static_cast<double>(time) * 1e6; // Hz
122 :
123 1 : BOOST_CHECK(rate > 0.);
124 1 : BOOST_TEST_MESSAGE("queue callback rate " << rate << " Hz");
125 1 : }
126 :
127 2 : BOOST_FIXTURE_TEST_CASE(DirectReadNetwork, ConfigurationTestFixture)
128 : {
129 1 : std::atomic<unsigned int> received_count = 0;
130 1 : unsigned int total_send = n_sends;
131 3 : std::function<void()> recv_func = [&]() {
132 10000 : do {
133 10000 : auto mess = IOManager::get()->get_receiver<dunedaq::data_t>("network")->receive(std::chrono::milliseconds(10));
134 10000 : ++received_count;
135 20000 : } while (received_count.load() < total_send);
136 2 : };
137 :
138 1 : auto net_sender = IOManager::get()->get_sender<dunedaq::data_t>("network");
139 1 : auto rcv_ftr = std::async(std::launch::async, recv_func);
140 :
141 1 : auto start_time = std::chrono::steady_clock::now();
142 10001 : for (unsigned int i = 0; i < total_send; ++i) {
143 10000 : dunedaq::data_t temp(message_size, i % 200);
144 10000 : net_sender->send(std::move(temp), dunedaq::iomanager::Sender::s_no_block);
145 10000 : }
146 1 : BOOST_TEST_MESSAGE("Messages sent, waiting for receives");
147 1 : rcv_ftr.get();
148 1 : auto stop_time = std::chrono::steady_clock::now();
149 :
150 1 : auto time = std::chrono::duration_cast<std::chrono::microseconds>(stop_time - start_time).count();
151 2 : double rate = received_count.load() / static_cast<double>(time) * 1e6; // Hz
152 :
153 1 : BOOST_CHECK(rate > 0.);
154 1 : BOOST_TEST_MESSAGE("network read rate " << rate << " Hz");
155 1 : }
156 :
157 2 : BOOST_FIXTURE_TEST_CASE(DirectReadQueue, ConfigurationTestFixture)
158 : {
159 1 : std::atomic<unsigned int> received_count = 0;
160 1 : unsigned int total_send = n_sends;
161 3 : std::function<void()> recv_func = [&]() {
162 10000 : do {
163 10000 : auto mess = IOManager::get()->get_receiver<dunedaq::data_t>("queue")->receive(std::chrono::milliseconds(10));
164 10000 : ++received_count;
165 20000 : } while (received_count.load() < total_send);
166 2 : };
167 :
168 1 : auto queue_sender = IOManager::get()->get_sender<dunedaq::data_t>("queue");
169 1 : auto rcv_ftr = std::async(std::launch::async, recv_func);
170 :
171 1 : auto start_time = std::chrono::steady_clock::now();
172 10001 : for (unsigned int i = 0; i < total_send; ++i) {
173 10000 : dunedaq::data_t temp(message_size, i % 200);
174 10000 : queue_sender->send(std::move(temp), std::chrono::milliseconds(10));
175 10000 : }
176 1 : BOOST_TEST_MESSAGE("Messages sent, waiting for receives");
177 1 : rcv_ftr.get();
178 1 : auto stop_time = std::chrono::steady_clock::now();
179 :
180 1 : auto time = std::chrono::duration_cast<std::chrono::microseconds>(stop_time - start_time).count();
181 2 : double rate = received_count.load() / static_cast<double>(time) * 1e6; // Hz
182 :
183 1 : BOOST_CHECK(rate > 0.);
184 1 : BOOST_TEST_MESSAGE("queue read rate " << rate << " Hz");
185 1 : }
186 :
187 : BOOST_AUTO_TEST_SUITE_END()
|