LCOV - code coverage report
Current view: top level - ipm/test/apps - zmq_recv.cpp (source / functions) Coverage Total Hit
Test: code.result Lines: 0.0 % 60 0
Test Date: 2025-12-21 13:07:08 Functions: 0.0 % 4 0

            Line data    Source code
       1              : /**
       2              :  * @file zmq_recv.cpp ZeroMQ Receive Test Application
       3              :  *
       4              :  * Used in conjunction with zmq_send, this test application instantiates a ZmqReceiver plugin and reports the incoming
       5              :  * data rate.
       6              :  *
       7              :  * This is part of the DUNE DAQ Application Framework, copyright 2020.
       8              :  * Licensing/copyright details are in the COPYING file that you should have
       9              :  * received with this code.
      10              :  */
      11              : 
      12              : #include "ipm/Receiver.hpp"
      13              : #include "ipm/ZmqContext.hpp"
      14              : 
      15              : #include "boost/program_options.hpp"
      16              : 
      17              : #include <chrono>
      18              : #include <map>
      19              : #include <memory>
      20              : #include <string>
      21              : 
      22              : using namespace dunedaq::ipm;
      23              : 
      24              : int
      25            0 : main(int argc, char* argv[])
      26              : {
      27            0 :   std::string conString = "tcp://127.0.0.1:12345";
      28            0 :   size_t npackets = 1;
      29            0 :   int nthreads = 1;
      30            0 :   int timeout = 10;
      31              : 
      32            0 :   namespace po = boost::program_options;
      33            0 :   po::options_description desc("Simple test program for ZmqReceiver");
      34            0 :   desc.add_options()(
      35            0 :     "connection,c", po::value<std::string>(&conString)->default_value(conString), "Connection to listen on")(
      36            0 :     "threads,t", po::value<int>(&nthreads)->default_value(nthreads), "Number of ZMQ threads")(
      37            0 :     "packets,p", po::value<size_t>(&npackets)->default_value(npackets), "Number of packets per group for reporting")(
      38            0 :     "timeout,o", po::value<int>(&timeout)->default_value(timeout), "Timeout, in seconds");
      39            0 :   try {
      40            0 :     po::variables_map vm;
      41            0 :     po::store(po::parse_command_line(argc, argv, desc), vm);
      42            0 :     po::notify(vm);
      43            0 :   } catch (std::exception& ex) {
      44            0 :     std::cerr << "Error parsing command line " << ex.what() << std::endl; // NOLINT(runtime/output_format)
      45            0 :     std::cerr << desc << std::endl;                                       // NOLINT(runtime/output_format)
      46            0 :     return 0;
      47            0 :   }
      48              : 
      49            0 :   dunedaq::logging::Logging::setup("ZMQ Test", "zmq_recv");
      50            0 :   if (nthreads > 1) {
      51            0 :     dunedaq::ipm::ZmqContext::instance().set_context_threads(nthreads);
      52              :   }
      53              : 
      54              :   // Receiver side
      55            0 :   std::shared_ptr<Receiver> receiver = make_ipm_receiver("ZmqReceiver");
      56            0 :   receiver->connect_for_receives({ { "connection_string", conString } });
      57              : 
      58            0 :   std::map<uint32_t, uint32_t> last_received_sequence; // NOLINT(build/unsigned)
      59            0 :   int64_t first_latency = 0;
      60            0 :   try {
      61            0 :     while (true) {
      62              :       // Last arg is receive timeout
      63            0 :       auto start = std::chrono::steady_clock::now();
      64            0 :       double bytesReceived = 0;
      65            0 :       for (size_t p = 0; p < npackets; p++) {
      66            0 :         Receiver::Response resp = receiver->receive(std::chrono::seconds(timeout));
      67            0 :         int64_t recvd_ts =
      68            0 :           std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::system_clock::now().time_since_epoch())
      69            0 :             .count();
      70            0 :         bytesReceived += resp.data.size();
      71            0 :         auto this_id = *(reinterpret_cast<uint32_t*>(resp.data.data()));           // NOLINT
      72            0 :         auto this_sequence = *(reinterpret_cast<uint32_t*>(resp.data.data()) + 1); // NOLINT
      73            0 :         auto this_ts = *(reinterpret_cast<uint64_t*>(resp.data.data()) + 1);       // NOLINT
      74              : 
      75            0 :         if (this_sequence < last_received_sequence[this_id] + 1) {
      76            0 :           TLOG() << "Received sequence ID " << this_sequence << " < expected sequence "
      77            0 :                  << (last_received_sequence[this_id] + 1) << " from sender " << this_id;
      78              :         }
      79            0 :         auto this_latency = static_cast<int64_t>(recvd_ts - this_ts);
      80            0 :         if (first_latency == 0) {
      81            0 :           first_latency = this_latency;
      82              :         }
      83            0 :         TLOG_DEBUG(6) << "Received message " << this_sequence << " from sender " << this_id
      84            0 :                       << ", latency= " << this_latency << " us (diff= " << (this_latency - first_latency) << " us)";
      85            0 :         last_received_sequence[this_id] = this_sequence;
      86            0 :       }
      87            0 :       auto elapsed = std::chrono::steady_clock::now() - start;
      88            0 :       auto nano = std::chrono::duration_cast<std::chrono::nanoseconds>(elapsed).count();
      89            0 :       auto bw = bytesReceived / static_cast<double>(nano);
      90            0 :       TLOG() << "Received " << bytesReceived << " bytes in " << nano << " ns " << bw << " GB/s";
      91              :       // std::cout << "resp.data=";
      92              :       // for (auto d: resp.data) {
      93              :       //   std::cout << d << std::endl;
      94              :       // }
      95            0 :     }
      96            0 :   } catch (ReceiveTimeoutExpired const& exc) {
      97            0 :     std::cout << "Gave up waiting\n"; // NOLINT(runtime/output_format)
      98            0 :   }
      99            0 : }
        

Generated by: LCOV version 2.0-1