LCOV - code coverage report
Current view: top level - ipm/unittest - ZmqPubSub_test.cxx (source / functions) Coverage Total Hit
Test: code.result Lines: 100.0 % 108 108
Test Date: 2025-12-21 13:07:08 Functions: 100.0 % 11 11

            Line data    Source code
       1              : /**
       2              :  * @file ZmqPubSub_test.cxx Test ZmqPublisher to ZmqSubscriber transfer
       3              :  *
       4              :  * This is part of the DUNE DAQ Application Framework, copyright 2020.
       5              :  * Licensing/copyright details are in the COPYING file that you should have
       6              :  * received with this code.
       7              :  */
       8              : 
       9              : #include "ipm/Sender.hpp"
      10              : #include "ipm/Subscriber.hpp"
      11              : 
      12              : #include "logging/Logging.hpp"
      13              : 
      14              : #define BOOST_TEST_MODULE ZmqPubSub_test // NOLINT
      15              : 
      16              : #include "boost/test/unit_test.hpp"
      17              : 
      18              : #include <string>
      19              : #include <vector>
      20              : 
      21              : using namespace dunedaq::ipm;
      22              : 
      23              : BOOST_AUTO_TEST_SUITE(ZmqPubSub_test)
      24              : 
      25              : size_t
      26            2 : elapsed_time_milliseconds(std::chrono::steady_clock::time_point const& then)
      27              : {
      28            2 :   return static_cast<size_t>(
      29            2 :     std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - then).count());
      30              : }
      31              : 
      32            2 : BOOST_AUTO_TEST_CASE(SendReceiveTest)
      33              : {
      34            1 :   auto the_receiver = make_ipm_subscriber("ZmqSubscriber");
      35            1 :   BOOST_REQUIRE(the_receiver != nullptr);
      36            1 :   BOOST_REQUIRE(!the_receiver->can_receive());
      37              : 
      38            1 :   auto the_sender = make_ipm_sender("ZmqPublisher");
      39            1 :   BOOST_REQUIRE(the_sender != nullptr);
      40            1 :   BOOST_REQUIRE(!the_sender->can_send());
      41              : 
      42            1 :   nlohmann::json config_json;
      43            1 :   config_json["connection_string"] = "inproc://default";
      44            1 :   the_sender->connect_for_sends(config_json);
      45            1 :   the_receiver->connect_for_receives(config_json);
      46              : 
      47            1 :   BOOST_REQUIRE(the_receiver->can_receive());
      48            1 :   BOOST_REQUIRE(the_sender->can_send());
      49              : 
      50            1 :   the_receiver->subscribe("testTopic");
      51              : 
      52            1 :   std::vector<char> test_data{ 'T', 'E', 'S', 'T' };
      53              : 
      54            1 :   the_sender->send(test_data.data(), test_data.size(), Sender::s_no_block, "ignoredTopic");
      55            1 :   auto before_recv = std::chrono::steady_clock::now();
      56            3 :   BOOST_REQUIRE_EXCEPTION(
      57              :     the_receiver->receive(std::chrono::milliseconds(100)),
      58              :     dunedaq::ipm::ReceiveTimeoutExpired,
      59              :     [&](dunedaq::ipm::ReceiveTimeoutExpired) { return elapsed_time_milliseconds(before_recv) >= 100; });
      60              : 
      61            1 :   the_sender->send(test_data.data(), test_data.size(), Sender::s_no_block, "testTopic");
      62            1 :   auto response = the_receiver->receive(Receiver::s_block);
      63            1 :   BOOST_REQUIRE_EQUAL(response.data.size(), 4);
      64            1 :   BOOST_REQUIRE_EQUAL(response.data[0], 'T');
      65            1 :   BOOST_REQUIRE_EQUAL(response.data[1], 'E');
      66            1 :   BOOST_REQUIRE_EQUAL(response.data[2], 'S');
      67            1 :   BOOST_REQUIRE_EQUAL(response.data[3], 'T');
      68              : 
      69            1 :   the_receiver->unsubscribe("testTopic");
      70            1 :   the_sender->send(test_data.data(), test_data.size(), Sender::s_no_block, "testTopic");
      71            3 :   BOOST_REQUIRE_EXCEPTION(
      72              :     the_receiver->receive(std::chrono::milliseconds(2000)),
      73              :     dunedaq::ipm::ReceiveTimeoutExpired,
      74              :     [&](dunedaq::ipm::ReceiveTimeoutExpired) { return elapsed_time_milliseconds(before_recv) >= 2000; });
      75            1 : }
      76              : 
      77            2 : BOOST_AUTO_TEST_CASE(CallbackTest)
      78              : {
      79              : 
      80            1 :   auto the_receiver = make_ipm_subscriber("ZmqSubscriber");
      81            1 :   BOOST_REQUIRE(the_receiver != nullptr);
      82            1 :   BOOST_REQUIRE(!the_receiver->can_receive());
      83              : 
      84            1 :   auto the_sender = make_ipm_sender("ZmqPublisher");
      85            1 :   BOOST_REQUIRE(the_sender != nullptr);
      86            1 :   BOOST_REQUIRE(!the_sender->can_send());
      87              : 
      88            1 :   nlohmann::json config_json;
      89            1 :   config_json["connection_string"] = "inproc://default";
      90            1 :   the_sender->connect_for_sends(config_json);
      91            1 :   the_receiver->connect_for_receives(config_json);
      92              : 
      93            1 :   BOOST_REQUIRE(the_receiver->can_receive());
      94            1 :   BOOST_REQUIRE(the_sender->can_send());
      95              : 
      96            1 :   the_receiver->subscribe("testTopic");
      97              : 
      98            1 :   std::vector<char> test_data{ 'T', 'E', 'S', 'T' };
      99            1 :   std::atomic<bool> message_received = false;
     100              : 
     101            2 :   auto callback_fun = [&](Receiver::Response& res) {
     102            2 :     TLOG() << "Callback function called with res.data.size() == " << static_cast<int>(res.data.size());
     103            1 :     BOOST_REQUIRE_EQUAL(res.data.size(), test_data.size());
     104           13 :     for (size_t ii = 0; ii < res.data.size(); ++ii) {
     105           12 :       BOOST_REQUIRE_EQUAL(res.data[ii], test_data[ii]);
     106              :     }
     107            1 :     message_received = true;
     108            2 :   };
     109            1 :   the_receiver->register_callback(callback_fun);
     110              : 
     111            1 :   the_sender->send(test_data.data(), test_data.size(), Sender::s_no_block);
     112            1 :   usleep(100000);
     113            1 :   BOOST_REQUIRE_EQUAL(message_received, false);
     114              : 
     115            1 :   message_received = false;
     116            1 :   test_data = { 'A', 'N', 'O', 'T', 'H', 'E', 'R', ' ', 'T', 'E', 'S', 'T' };
     117            1 :   the_sender->send(test_data.data(), test_data.size(), Sender::s_no_block, "testTopic");
     118            3 :   while (!message_received.load()) {
     119            2 :     usleep(15000);
     120              :   }
     121              : 
     122            1 :   message_received = false;
     123            1 :   test_data = { 'A', ' ', 'T', 'H', 'I', 'R', 'D', ' ', 'T', 'E', 'S', 'T' };
     124            1 :   the_sender->send(test_data.data(), test_data.size(), Sender::s_no_block, "ignoredTopic");
     125            1 :   usleep(100000);
     126            1 :   BOOST_REQUIRE_EQUAL(message_received, false);
     127              : 
     128            1 :   the_receiver->unregister_callback();
     129            1 :   message_received = false;
     130            1 :   test_data = { 'A', ' ', 'F', 'O', 'U', 'R', 'T', 'H', ' ', 'T', 'E', 'S', 'T' };
     131            1 :   the_sender->send(test_data.data(), test_data.size(), Sender::s_no_block, "testTopic");
     132              : 
     133            1 :   usleep(100000);
     134            1 :   BOOST_REQUIRE_EQUAL(message_received, false);
     135            1 :   auto response = the_receiver->receive(Receiver::s_block);
     136            1 :   BOOST_REQUIRE_EQUAL(response.data.size(), test_data.size());
     137            1 : }
     138              : 
     139            2 : BOOST_AUTO_TEST_CASE(MultiplePublishers)
     140              : {
     141            1 :   auto first_publisher = make_ipm_sender("ZmqPublisher");
     142            1 :   auto second_publisher = make_ipm_sender("ZmqPublisher");
     143            1 :   auto the_subscriber = make_ipm_subscriber("ZmqSubscriber");
     144              : 
     145            1 :   nlohmann::json first_json;
     146            1 :   nlohmann::json second_json;
     147            1 :   nlohmann::json sub_json;
     148            1 :   first_json["connection_string"] = "inproc://foo";
     149            1 :   first_publisher->connect_for_sends(first_json);
     150            1 :   second_json["connection_string"] = "inproc://bar";
     151            1 :   second_publisher->connect_for_sends(second_json);
     152            3 :   sub_json["connection_strings"] = { "inproc://foo", "inproc://bar" };
     153            1 :   the_subscriber->connect_for_receives(sub_json);
     154              : 
     155            1 :   the_subscriber->subscribe("testTopic");
     156              : 
     157            1 :   std::vector<char> test_data{ 'T', 'E', 'S', 'T' };
     158            1 :   first_publisher->send(test_data.data(), test_data.size(), Sender::s_no_block, "testTopic");
     159            1 :   auto response = the_subscriber->receive(Receiver::s_block);
     160            1 :   BOOST_REQUIRE_EQUAL(response.data.size(), 4);
     161            1 :   BOOST_REQUIRE_EQUAL(response.data[0], 'T');
     162            1 :   BOOST_REQUIRE_EQUAL(response.data[1], 'E');
     163            1 :   BOOST_REQUIRE_EQUAL(response.data[2], 'S');
     164            1 :   BOOST_REQUIRE_EQUAL(response.data[3], 'T');
     165              : 
     166            1 :   second_publisher->send(test_data.data(), test_data.size(), Sender::s_no_block, "testTopic");
     167            1 :   auto response2 = the_subscriber->receive(Receiver::s_block);
     168            1 :   BOOST_REQUIRE_EQUAL(response2.data.size(), 4);
     169            1 :   BOOST_REQUIRE_EQUAL(response2.data[0], 'T');
     170            1 :   BOOST_REQUIRE_EQUAL(response2.data[1], 'E');
     171            1 :   BOOST_REQUIRE_EQUAL(response2.data[2], 'S');
     172            1 :   BOOST_REQUIRE_EQUAL(response2.data[3], 'T');
     173            2 : }
     174              : 
     175              : BOOST_AUTO_TEST_SUITE_END()
        

Generated by: LCOV version 2.0-1