LCOV - code coverage report
Current view: top level - ipm/unittest - ZmqPubSub_test.cxx (source / functions) Coverage Total Hit
Test: code.result Lines: 100.0 % 111 111
Test Date: 2026-02-16 10:18:04 Functions: 100.0 % 11 11

            Line data    Source code
       1              : /**
       2              :  * @file ZmqPubSub_test.cxx Test ZmqPublisher to ZmqSubscriber transfer
       3              :  *
       4              :  * This is part of the DUNE DAQ Application Framework, copyright 2020.
       5              :  * Licensing/copyright details are in the COPYING file that you should have
       6              :  * received with this code.
       7              :  */
       8              : 
       9              : #include "ipm/Sender.hpp"
      10              : #include "ipm/Subscriber.hpp"
      11              : 
      12              : #include "logging/Logging.hpp"
      13              : 
      14              : #define BOOST_TEST_MODULE ZmqPubSub_test // NOLINT
      15              : 
      16              : #include "boost/test/unit_test.hpp"
      17              : 
      18              : #include <string>
      19              : #include <vector>
      20              : 
      21              : using namespace dunedaq::ipm;
      22              : 
      23              : BOOST_AUTO_TEST_SUITE(ZmqPubSub_test)
      24              : 
      25              : size_t
      26            2 : elapsed_time_milliseconds(std::chrono::steady_clock::time_point const& then)
      27              : {
      28            2 :   return static_cast<size_t>(
      29            2 :     std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - then).count());
      30              : }
      31              : 
      32            2 : BOOST_AUTO_TEST_CASE(SendReceiveTest)
      33              : {
      34            1 :   auto the_receiver = make_ipm_subscriber("ZmqSubscriber");
      35            1 :   BOOST_REQUIRE(the_receiver != nullptr);
      36            1 :   BOOST_REQUIRE(!the_receiver->can_receive());
      37              : 
      38            1 :   auto the_sender = make_ipm_sender("ZmqPublisher");
      39            1 :   BOOST_REQUIRE(the_sender != nullptr);
      40            1 :   BOOST_REQUIRE(!the_sender->can_send());
      41              : 
      42            1 :   Sender::ConnectionInfo sender_config("test_sender", "inproc://default");
      43            1 :   Receiver::ConnectionInfo receiver_config("test_receiver", "inproc://default");
      44              :   
      45            1 :   the_sender->connect_for_sends(sender_config);
      46            1 :   the_receiver->connect_for_receives(receiver_config);
      47              : 
      48            1 :   BOOST_REQUIRE(the_receiver->can_receive());
      49            1 :   BOOST_REQUIRE(the_sender->can_send());
      50              : 
      51            1 :   BOOST_REQUIRE(!the_receiver->data_pending());
      52              :   
      53            1 :   the_receiver->subscribe("testTopic");
      54              : 
      55            1 :   std::vector<char> test_data{ 'T', 'E', 'S', 'T' };
      56              : 
      57            1 :   the_sender->send(test_data.data(), test_data.size(), Sender::s_no_block, "ignoredTopic");
      58            1 :   auto before_recv = std::chrono::steady_clock::now();
      59            2 :   BOOST_REQUIRE_EXCEPTION(
      60              :     the_receiver->receive(std::chrono::milliseconds(100)),
      61              :     dunedaq::ipm::ReceiveTimeoutExpired,
      62              :     [&](dunedaq::ipm::ReceiveTimeoutExpired) { return elapsed_time_milliseconds(before_recv) >= 100; });
      63              : 
      64            1 :   the_sender->send(test_data.data(), test_data.size(), Sender::s_no_block, "testTopic");
      65            1 :   BOOST_REQUIRE(the_receiver->data_pending());
      66            1 :   auto response = the_receiver->receive(Receiver::s_block);
      67            1 :   BOOST_REQUIRE(!the_receiver->data_pending());
      68            1 :   BOOST_REQUIRE_EQUAL(response.data.size(), 4);
      69            1 :   BOOST_REQUIRE_EQUAL(response.data[0], 'T');
      70            1 :   BOOST_REQUIRE_EQUAL(response.data[1], 'E');
      71            1 :   BOOST_REQUIRE_EQUAL(response.data[2], 'S');
      72            1 :   BOOST_REQUIRE_EQUAL(response.data[3], 'T');
      73              : 
      74            1 :   the_receiver->unsubscribe("testTopic");
      75            1 :   the_sender->send(test_data.data(), test_data.size(), Sender::s_no_block, "testTopic");
      76            1 :   BOOST_REQUIRE(!the_receiver->data_pending());
      77            2 :   BOOST_REQUIRE_EXCEPTION(
      78              :     the_receiver->receive(std::chrono::milliseconds(2000)),
      79              :     dunedaq::ipm::ReceiveTimeoutExpired,
      80              :     [&](dunedaq::ipm::ReceiveTimeoutExpired) { return elapsed_time_milliseconds(before_recv) >= 2000; });
      81            1 : }
      82              : 
      83            2 : BOOST_AUTO_TEST_CASE(CallbackTest)
      84              : {
      85              : 
      86            1 :   auto the_receiver = make_ipm_subscriber("ZmqSubscriber");
      87            1 :   BOOST_REQUIRE(the_receiver != nullptr);
      88            1 :   BOOST_REQUIRE(!the_receiver->can_receive());
      89              : 
      90            1 :   auto the_sender = make_ipm_sender("ZmqPublisher");
      91            1 :   BOOST_REQUIRE(the_sender != nullptr);
      92            1 :   BOOST_REQUIRE(!the_sender->can_send());
      93              : 
      94            1 :   Sender::ConnectionInfo sender_config("test_sender", "inproc://default");
      95            1 :   Receiver::ConnectionInfo receiver_config("test_receiver", "inproc://default");
      96            1 :   the_sender->connect_for_sends(sender_config);
      97            1 :   the_receiver->connect_for_receives(receiver_config);
      98              : 
      99            1 :   BOOST_REQUIRE(the_receiver->can_receive());
     100            1 :   BOOST_REQUIRE(the_sender->can_send());
     101              : 
     102            1 :   the_receiver->subscribe("testTopic");
     103              : 
     104            1 :   std::vector<char> test_data{ 'T', 'E', 'S', 'T' };
     105            1 :   std::atomic<bool> message_received = false;
     106              : 
     107            2 :   auto callback_fun = [&](Receiver::Response& res) {
     108            2 :     TLOG() << "Callback function called with res.data.size() == " << static_cast<int>(res.data.size());
     109            1 :     BOOST_REQUIRE_EQUAL(res.data.size(), test_data.size());
     110           13 :     for (size_t ii = 0; ii < res.data.size(); ++ii) {
     111           12 :       BOOST_REQUIRE_EQUAL(res.data[ii], test_data[ii]);
     112              :     }
     113            1 :     message_received = true;
     114            2 :   };
     115            1 :   the_receiver->register_callback(callback_fun);
     116              : 
     117            1 :   the_sender->send(test_data.data(), test_data.size(), Sender::s_no_block);
     118            1 :   usleep(100000);
     119            1 :   BOOST_REQUIRE_EQUAL(message_received, false);
     120              : 
     121            1 :   message_received = false;
     122            1 :   test_data = { 'A', 'N', 'O', 'T', 'H', 'E', 'R', ' ', 'T', 'E', 'S', 'T' };
     123            1 :   the_sender->send(test_data.data(), test_data.size(), Sender::s_no_block, "testTopic");
     124            3 :   while (!message_received.load()) {
     125            2 :     usleep(15000);
     126              :   }
     127              : 
     128            1 :   message_received = false;
     129            1 :   test_data = { 'A', ' ', 'T', 'H', 'I', 'R', 'D', ' ', 'T', 'E', 'S', 'T' };
     130            1 :   the_sender->send(test_data.data(), test_data.size(), Sender::s_no_block, "ignoredTopic");
     131            1 :   usleep(100000);
     132            1 :   BOOST_REQUIRE_EQUAL(message_received, false);
     133              : 
     134            1 :   the_receiver->unregister_callback();
     135            1 :   message_received = false;
     136            1 :   test_data = { 'A', ' ', 'F', 'O', 'U', 'R', 'T', 'H', ' ', 'T', 'E', 'S', 'T' };
     137            1 :   the_sender->send(test_data.data(), test_data.size(), Sender::s_no_block, "testTopic");
     138              : 
     139            1 :   usleep(100000);
     140            1 :   BOOST_REQUIRE_EQUAL(message_received, false);
     141            1 :   auto response = the_receiver->receive(Receiver::s_block);
     142            1 :   BOOST_REQUIRE_EQUAL(response.data.size(), test_data.size());
     143            1 :   BOOST_REQUIRE(!the_receiver->data_pending());
     144            1 : }
     145              : 
     146            2 : BOOST_AUTO_TEST_CASE(MultiplePublishers)
     147              : {
     148            1 :   auto first_publisher = make_ipm_sender("ZmqPublisher");
     149            1 :   auto second_publisher = make_ipm_sender("ZmqPublisher");
     150            1 :   auto the_subscriber = make_ipm_subscriber("ZmqSubscriber");
     151              : 
     152            1 :   Sender::ConnectionInfo first_sender_config("test_sender", "inproc://foo");
     153            1 :   Sender::ConnectionInfo second_sender_config("test_sender", "inproc://bar");
     154            1 :   Receiver::ConnectionInfo receiver_config("test_receiver", "", { "inproc://foo", "inproc://bar" });
     155            1 :   first_publisher->connect_for_sends(first_sender_config);
     156            1 :   second_publisher->connect_for_sends(second_sender_config);
     157            1 :   the_subscriber->connect_for_receives(receiver_config);
     158              : 
     159            1 :   the_subscriber->subscribe("testTopic");
     160              : 
     161            2 :   std::vector<char> test_data{ 'T', 'E', 'S', 'T' };
     162            1 :   first_publisher->send(test_data.data(), test_data.size(), Sender::s_no_block, "testTopic");
     163            1 :   auto response = the_subscriber->receive(Receiver::s_block);
     164            1 :   BOOST_REQUIRE_EQUAL(response.data.size(), 4);
     165            1 :   BOOST_REQUIRE_EQUAL(response.data[0], 'T');
     166            1 :   BOOST_REQUIRE_EQUAL(response.data[1], 'E');
     167            1 :   BOOST_REQUIRE_EQUAL(response.data[2], 'S');
     168            1 :   BOOST_REQUIRE_EQUAL(response.data[3], 'T');
     169              : 
     170            1 :   second_publisher->send(test_data.data(), test_data.size(), Sender::s_no_block, "testTopic");
     171            1 :   auto response2 = the_subscriber->receive(Receiver::s_block);
     172            1 :   BOOST_REQUIRE_EQUAL(response2.data.size(), 4);
     173            1 :   BOOST_REQUIRE_EQUAL(response2.data[0], 'T');
     174            1 :   BOOST_REQUIRE_EQUAL(response2.data[1], 'E');
     175            1 :   BOOST_REQUIRE_EQUAL(response2.data[2], 'S');
     176            1 :   BOOST_REQUIRE_EQUAL(response2.data[3], 'T');
     177            1 :   BOOST_REQUIRE(!the_subscriber->data_pending());
     178            1 : }
     179              : 
     180              : BOOST_AUTO_TEST_SUITE_END()
        

Generated by: LCOV version 2.0-1