LCOV - code coverage report
Current view: top level - ipm/test/apps - zmq_send.cpp (source / functions) Coverage Total Hit
Test: code.result Lines: 0.0 % 60 0
Test Date: 2025-12-21 13:07:08 Functions: 0.0 % 6 0

            Line data    Source code
       1              : /**
       2              :  * @file zmq_send.cpp ZeroMQ Send Test Application
       3              :  *
       4              :  * Used in conjunction with zmq_receive, this test application instantiates a ZmqSender plugin,
       5              :  * sends the given number of packets, and reports the outgoing data rate.
       6              :  *
       7              :  * This is part of the DUNE DAQ Application Framework, copyright 2020.
       8              :  * Licensing/copyright details are in the COPYING file that you should have
       9              :  * received with this code.
      10              :  */
      11              : 
      12              : #include "boost/program_options.hpp"
      13              : #include "ipm/Sender.hpp"
      14              : #include "ipm/ZmqContext.hpp"
      15              : #include "logging/Logging.hpp"
      16              : 
      17              : #include <chrono>
      18              : #include <cstdlib>
      19              : #include <memory>
      20              : #include <string>
      21              : #include <vector>
      22              : 
      23              : int
      24            0 : main(int argc, char* argv[])
      25              : {
      26            0 :   uint32_t npackets = 1; // NOLINT(build/unsigned)
      27            0 :   size_t packetSize = 100;
      28            0 :   size_t interval = 0;
      29            0 :   std::string conString = "tcp://127.0.0.1:12345";
      30            0 :   int nthreads = 1;
      31            0 :   uint32_t id = 0; // NOLINT(build/unsigned)
      32              : 
      33            0 :   namespace po = boost::program_options;
      34            0 :   po::options_description desc("Simple test program for ZmqSender");
      35            0 :   desc.add_options()(
      36            0 :     "connection,c", po::value<std::string>(&conString)->default_value(conString), "Connection to listen on")(
      37            0 :     "threads,t", po::value<int>(&nthreads)->default_value(nthreads), "Number of ZMQ threads")(
      38              :     "packets,p",
      39            0 :     po::value<uint32_t>(&npackets)->default_value(npackets), // NOLINT(build/unsigned)
      40            0 :     "Number of packets to send")(
      41            0 :     "packetSize,s", po::value<size_t>(&packetSize)->default_value(packetSize), "Number of bytes per packet")(
      42            0 :     "interval,i", po::value<size_t>(&interval)->default_value(interval), "Microseconds to sleep between messages")(
      43            0 :     "id", po::value<uint32_t>(&id)->default_value(id), "Identifier for this Sender"); // NOLINT(build/unsigned)
      44            0 :   try {
      45            0 :     po::variables_map vm;
      46            0 :     po::store(po::parse_command_line(argc, argv, desc), vm);
      47            0 :     po::notify(vm);
      48            0 :   } catch (std::exception& ex) {
      49            0 :     std::cerr << "Error parsing command line " << ex.what() << std::endl; // NOLINT(runtime/output_format)
      50            0 :     std::cerr << desc << std::endl;                                       // NOLINT(runtime/output_format)
      51            0 :     return 0;
      52            0 :   }
      53              : 
      54            0 :   dunedaq::logging::Logging::setup("ZMQ Test", "zmq_send");
      55            0 :   if (nthreads > 1) {
      56            0 :     dunedaq::ipm::ZmqContext::instance().set_context_threads(nthreads);
      57              :   }
      58              : 
      59              :   // Set the minimum packet size to 16 bytes, 8 bytes for sequence number and 8 bytes for current time
      60            0 :   if (packetSize < 16) {
      61            0 :     packetSize = 16;
      62              :   }
      63              : 
      64            0 :   std::shared_ptr<dunedaq::ipm::Sender> sender = dunedaq::ipm::make_ipm_sender("ZmqSender");
      65            0 :   sender->connect_for_sends({ { "connection_string", conString } });
      66              : 
      67            0 :   std::vector<char> message(packetSize, 0);
      68            0 :   *reinterpret_cast<uint32_t*>(message.data()) = id; // NOLINT
      69              : 
      70            0 :   auto start = std::chrono::steady_clock::now();
      71            0 :   for (uint64_t p = 0; p < npackets; p++) { // NOLINT(build/unsigned)
      72            0 :     TLOG_DEBUG(3) << "Preparing Message " << p;
      73              :     // Last arg is send timeout
      74            0 :     int attempt = 0;
      75            0 :     bool success = false;
      76              : 
      77            0 :     *(reinterpret_cast<uint32_t*>(message.data()) + 1) = p; // NOLINT
      78            0 :     uint64_t time_since_epoch =                             // NOLINT(build/unsigned)
      79            0 :       std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::system_clock::now().time_since_epoch())
      80            0 :         .count();
      81            0 :     *(reinterpret_cast<uint64_t*>(message.data()) + 1) = time_since_epoch; // NOLINT
      82              : 
      83            0 :     TLOG_DEBUG(4) << "Sending Message " << p;
      84            0 :     do {
      85            0 :       success = sender->send(static_cast<void*>(message.data()), packetSize, std::chrono::milliseconds(2), "", true);
      86            0 :       attempt++;
      87            0 :       if (success == false && attempt == 1)
      88            0 :         TLOG() << "bad omen";
      89            0 :     } while (success == false);
      90              : 
      91            0 :     TLOG_DEBUG(5) << "Message sent " << p;
      92            0 :     if (interval > 0) {
      93            0 :       usleep(interval);
      94              :     }
      95              :   }
      96              : 
      97            0 :   auto elapsed = std::chrono::steady_clock::now() - start;
      98            0 :   auto nano = std::chrono::duration_cast<std::chrono::nanoseconds>(elapsed).count();
      99            0 :   auto bw = (packetSize * npackets) / static_cast<double>(nano);
     100            0 :   TLOG() << "Sent " << packetSize * npackets << " bytes in " << nano << " ns " << bw << " GB/s";
     101            0 : }
        

Generated by: LCOV version 2.0-1