Line data Source code
1 : /**
2 : * @file zmq_send.cpp ZeroMQ Send Test Application
3 : *
4 : * Used in conjunction with zmq_receive, this test application instantiates a ZmqSender plugin,
5 : * sends the given number of packets, and reports the outgoing data rate.
6 : *
7 : * This is part of the DUNE DAQ Application Framework, copyright 2020.
8 : * Licensing/copyright details are in the COPYING file that you should have
9 : * received with this code.
10 : */
11 :
12 : #include "boost/program_options.hpp"
13 : #include "ipm/Sender.hpp"
14 : #include "ipm/ZmqContext.hpp"
15 : #include "logging/Logging.hpp"
16 :
17 : #include <chrono>
18 : #include <cstdlib>
19 : #include <memory>
20 : #include <string>
21 : #include <vector>
22 :
23 : int
24 0 : main(int argc, char* argv[])
25 : {
26 0 : uint32_t npackets = 1; // NOLINT(build/unsigned)
27 0 : size_t packetSize = 100;
28 0 : size_t interval = 0;
29 0 : std::string conString = "tcp://127.0.0.1:12345";
30 0 : int nthreads = 1;
31 0 : uint32_t id = 0; // NOLINT(build/unsigned)
32 :
33 0 : namespace po = boost::program_options;
34 0 : po::options_description desc("Simple test program for ZmqSender");
35 0 : desc.add_options()(
36 0 : "connection,c", po::value<std::string>(&conString)->default_value(conString), "Connection to listen on")(
37 0 : "threads,t", po::value<int>(&nthreads)->default_value(nthreads), "Number of ZMQ threads")(
38 : "packets,p",
39 0 : po::value<uint32_t>(&npackets)->default_value(npackets), // NOLINT(build/unsigned)
40 0 : "Number of packets to send")(
41 0 : "packetSize,s", po::value<size_t>(&packetSize)->default_value(packetSize), "Number of bytes per packet")(
42 0 : "interval,i", po::value<size_t>(&interval)->default_value(interval), "Microseconds to sleep between messages")(
43 0 : "id", po::value<uint32_t>(&id)->default_value(id), "Identifier for this Sender"); // NOLINT(build/unsigned)
44 0 : try {
45 0 : po::variables_map vm;
46 0 : po::store(po::parse_command_line(argc, argv, desc), vm);
47 0 : po::notify(vm);
48 0 : } catch (std::exception& ex) {
49 0 : std::cerr << "Error parsing command line " << ex.what() << std::endl; // NOLINT(runtime/output_format)
50 0 : std::cerr << desc << std::endl; // NOLINT(runtime/output_format)
51 0 : return 0;
52 0 : }
53 :
54 0 : dunedaq::logging::Logging::setup("ZMQ Test", "zmq_send");
55 0 : if (nthreads > 1) {
56 0 : dunedaq::ipm::ZmqContext::instance().set_context_threads(nthreads);
57 : }
58 :
59 : // Set the minimum packet size to 16 bytes, 8 bytes for sequence number and 8 bytes for current time
60 0 : if (packetSize < 16) {
61 0 : packetSize = 16;
62 : }
63 :
64 0 : std::shared_ptr<dunedaq::ipm::Sender> sender = dunedaq::ipm::make_ipm_sender("ZmqSender");
65 0 : sender->connect_for_sends({ { "connection_string", conString } });
66 :
67 0 : std::vector<char> message(packetSize, 0);
68 0 : *reinterpret_cast<uint32_t*>(message.data()) = id; // NOLINT
69 :
70 0 : auto start = std::chrono::steady_clock::now();
71 0 : for (uint64_t p = 0; p < npackets; p++) { // NOLINT(build/unsigned)
72 0 : TLOG_DEBUG(3) << "Preparing Message " << p;
73 : // Last arg is send timeout
74 0 : int attempt = 0;
75 0 : bool success = false;
76 :
77 0 : *(reinterpret_cast<uint32_t*>(message.data()) + 1) = p; // NOLINT
78 0 : uint64_t time_since_epoch = // NOLINT(build/unsigned)
79 0 : std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::system_clock::now().time_since_epoch())
80 0 : .count();
81 0 : *(reinterpret_cast<uint64_t*>(message.data()) + 1) = time_since_epoch; // NOLINT
82 :
83 0 : TLOG_DEBUG(4) << "Sending Message " << p;
84 0 : do {
85 0 : success = sender->send(static_cast<void*>(message.data()), packetSize, std::chrono::milliseconds(2), "", true);
86 0 : attempt++;
87 0 : if (success == false && attempt == 1)
88 0 : TLOG() << "bad omen";
89 0 : } while (success == false);
90 :
91 0 : TLOG_DEBUG(5) << "Message sent " << p;
92 0 : if (interval > 0) {
93 0 : usleep(interval);
94 : }
95 : }
96 :
97 0 : auto elapsed = std::chrono::steady_clock::now() - start;
98 0 : auto nano = std::chrono::duration_cast<std::chrono::nanoseconds>(elapsed).count();
99 0 : auto bw = (packetSize * npackets) / static_cast<double>(nano);
100 0 : TLOG() << "Sent " << packetSize * npackets << " bytes in " << nano << " ns " << bw << " GB/s";
101 0 : }
|