Line data Source code
1 : /**
2 : * @file zmq_send.cpp ZeroMQ Send Test Application
3 : *
4 : * Used in conjunction with zmq_receive, this test application instantiates a ZmqSender plugin,
5 : * sends the given number of packets, and reports the outgoing data rate.
6 : *
7 : * This is part of the DUNE DAQ Application Framework, copyright 2020.
8 : * Licensing/copyright details are in the COPYING file that you should have
9 : * received with this code.
10 : */
11 :
12 : #include "boost/program_options.hpp"
13 : #include "ipm/Sender.hpp"
14 : #include "ipm/ZmqContext.hpp"
15 : #include "logging/Logging.hpp"
16 :
17 : #include <chrono>
18 : #include <cstdlib>
19 : #include <memory>
20 : #include <string>
21 : #include <vector>
22 :
23 : int
24 0 : main(int argc, char* argv[])
25 : {
26 0 : uint32_t npackets = 1; // NOLINT(build/unsigned)
27 0 : size_t packetSize = 100;
28 0 : size_t interval = 0;
29 0 : std::string conString = "tcp://127.0.0.1:12345";
30 0 : int nthreads = 1;
31 0 : uint32_t id = 0; // NOLINT(build/unsigned)
32 :
33 0 : namespace po = boost::program_options;
34 0 : po::options_description desc("Simple test program for ZmqSender");
35 0 : desc.add_options()(
36 0 : "connection,c", po::value<std::string>(&conString)->default_value(conString), "Connection to listen on")(
37 0 : "threads,t", po::value<int>(&nthreads)->default_value(nthreads), "Number of ZMQ threads")(
38 : "packets,p",
39 0 : po::value<uint32_t>(&npackets)->default_value(npackets), // NOLINT(build/unsigned)
40 0 : "Number of packets to send")(
41 0 : "packetSize,s", po::value<size_t>(&packetSize)->default_value(packetSize), "Number of bytes per packet")(
42 0 : "interval,i", po::value<size_t>(&interval)->default_value(interval), "Microseconds to sleep between messages")(
43 0 : "id", po::value<uint32_t>(&id)->default_value(id), "Identifier for this Sender"); // NOLINT(build/unsigned)
44 0 : try {
45 0 : po::variables_map vm;
46 0 : po::store(po::parse_command_line(argc, argv, desc), vm);
47 0 : po::notify(vm);
48 0 : } catch (std::exception& ex) {
49 0 : std::cerr << "Error parsing command line " << ex.what() << std::endl; // NOLINT(runtime/output_format)
50 0 : std::cerr << desc << std::endl; // NOLINT(runtime/output_format)
51 0 : return 0;
52 0 : }
53 :
54 0 : dunedaq::logging::Logging::setup("ZMQ Test", "zmq_send");
55 0 : if (nthreads > 1) {
56 0 : dunedaq::ipm::ZmqContext::instance().set_context_threads(nthreads);
57 : }
58 :
59 : // Set the minimum packet size to 16 bytes, 8 bytes for sequence number and 8 bytes for current time
60 0 : if (packetSize < 16) {
61 0 : packetSize = 16;
62 : }
63 :
64 0 : std::shared_ptr<dunedaq::ipm::Sender> sender = dunedaq::ipm::make_ipm_sender("ZmqSender");
65 0 : dunedaq::ipm::Sender::ConnectionInfo conn_info("zmq_send", conString);
66 0 : sender->connect_for_sends(conn_info);
67 :
68 0 : std::vector<char> message(packetSize, 0);
69 0 : *reinterpret_cast<uint32_t*>(message.data()) = id; // NOLINT
70 :
71 0 : auto start = std::chrono::steady_clock::now();
72 0 : for (uint64_t p = 0; p < npackets; p++) { // NOLINT(build/unsigned)
73 0 : TLOG_DEBUG(3) << "Preparing Message " << p;
74 : // Last arg is send timeout
75 0 : int attempt = 0;
76 0 : bool success = false;
77 :
78 0 : *(reinterpret_cast<uint32_t*>(message.data()) + 1) = p; // NOLINT
79 0 : uint64_t time_since_epoch = // NOLINT(build/unsigned)
80 0 : std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::system_clock::now().time_since_epoch())
81 0 : .count();
82 0 : *(reinterpret_cast<uint64_t*>(message.data()) + 1) = time_since_epoch; // NOLINT
83 :
84 0 : TLOG_DEBUG(4) << "Sending Message " << p;
85 0 : do {
86 0 : success = sender->send(static_cast<void*>(message.data()), packetSize, std::chrono::milliseconds(2), "", true);
87 0 : attempt++;
88 0 : if (success == false && attempt == 1)
89 0 : TLOG() << "bad omen";
90 0 : } while (success == false);
91 :
92 0 : TLOG_DEBUG(5) << "Message sent " << p;
93 0 : if (interval > 0) {
94 0 : usleep(interval);
95 : }
96 : }
97 :
98 0 : auto elapsed = std::chrono::steady_clock::now() - start;
99 0 : auto nano = std::chrono::duration_cast<std::chrono::nanoseconds>(elapsed).count();
100 0 : auto bw = (packetSize * npackets) / static_cast<double>(nano);
101 0 : TLOG() << "Sent " << packetSize * npackets << " bytes in " << nano << " ns " << bw << " GB/s";
102 0 : }
|