Line data Source code
1 : /**
2 : * @file zmq_recv.cpp ZeroMQ Receive Test Application
3 : *
4 : * Used in conjunction with zmq_send, this test application instantiates a ZmqReceiver plugin and reports the incoming
5 : * data rate.
6 : *
7 : * This is part of the DUNE DAQ Application Framework, copyright 2020.
8 : * Licensing/copyright details are in the COPYING file that you should have
9 : * received with this code.
10 : */
11 :
12 : #include "ipm/Receiver.hpp"
13 : #include "ipm/ZmqContext.hpp"
14 :
15 : #include "boost/program_options.hpp"
16 :
17 : #include <chrono>
18 : #include <map>
19 : #include <memory>
20 : #include <string>
21 :
22 : using namespace dunedaq::ipm;
23 :
24 : int
25 0 : main(int argc, char* argv[])
26 : {
27 0 : std::string conString = "tcp://127.0.0.1:12345";
28 0 : size_t npackets = 1;
29 0 : int nthreads = 1;
30 0 : int timeout = 10;
31 :
32 0 : namespace po = boost::program_options;
33 0 : po::options_description desc("Simple test program for ZmqReceiver");
34 0 : desc.add_options()(
35 0 : "connection,c", po::value<std::string>(&conString)->default_value(conString), "Connection to listen on")(
36 0 : "threads,t", po::value<int>(&nthreads)->default_value(nthreads), "Number of ZMQ threads")(
37 0 : "packets,p", po::value<size_t>(&npackets)->default_value(npackets), "Number of packets per group for reporting")(
38 0 : "timeout,o", po::value<int>(&timeout)->default_value(timeout), "Timeout, in seconds");
39 0 : try {
40 0 : po::variables_map vm;
41 0 : po::store(po::parse_command_line(argc, argv, desc), vm);
42 0 : po::notify(vm);
43 0 : } catch (std::exception& ex) {
44 0 : std::cerr << "Error parsing command line " << ex.what() << std::endl; // NOLINT(runtime/output_format)
45 0 : std::cerr << desc << std::endl; // NOLINT(runtime/output_format)
46 0 : return 0;
47 0 : }
48 :
49 0 : dunedaq::logging::Logging::setup("ZMQ Test", "zmq_recv");
50 0 : if (nthreads > 1) {
51 0 : dunedaq::ipm::ZmqContext::instance().set_context_threads(nthreads);
52 : }
53 :
54 : // Receiver side
55 0 : std::shared_ptr<Receiver> receiver = make_ipm_receiver("ZmqReceiver");
56 0 : receiver->connect_for_receives({ { "connection_string", conString } });
57 :
58 0 : std::map<uint32_t, uint32_t> last_received_sequence; // NOLINT(build/unsigned)
59 0 : int64_t first_latency = 0;
60 0 : try {
61 0 : while (true) {
62 : // Last arg is receive timeout
63 0 : auto start = std::chrono::steady_clock::now();
64 0 : double bytesReceived = 0;
65 0 : for (size_t p = 0; p < npackets; p++) {
66 0 : Receiver::Response resp = receiver->receive(std::chrono::seconds(timeout));
67 0 : int64_t recvd_ts =
68 0 : std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::system_clock::now().time_since_epoch())
69 0 : .count();
70 0 : bytesReceived += resp.data.size();
71 0 : auto this_id = *(reinterpret_cast<uint32_t*>(resp.data.data())); // NOLINT
72 0 : auto this_sequence = *(reinterpret_cast<uint32_t*>(resp.data.data()) + 1); // NOLINT
73 0 : auto this_ts = *(reinterpret_cast<uint64_t*>(resp.data.data()) + 1); // NOLINT
74 :
75 0 : if (this_sequence < last_received_sequence[this_id] + 1) {
76 0 : TLOG() << "Received sequence ID " << this_sequence << " < expected sequence "
77 0 : << (last_received_sequence[this_id] + 1) << " from sender " << this_id;
78 : }
79 0 : auto this_latency = static_cast<int64_t>(recvd_ts - this_ts);
80 0 : if (first_latency == 0) {
81 0 : first_latency = this_latency;
82 : }
83 0 : TLOG_DEBUG(6) << "Received message " << this_sequence << " from sender " << this_id
84 0 : << ", latency= " << this_latency << " us (diff= " << (this_latency - first_latency) << " us)";
85 0 : last_received_sequence[this_id] = this_sequence;
86 0 : }
87 0 : auto elapsed = std::chrono::steady_clock::now() - start;
88 0 : auto nano = std::chrono::duration_cast<std::chrono::nanoseconds>(elapsed).count();
89 0 : auto bw = bytesReceived / static_cast<double>(nano);
90 0 : TLOG() << "Received " << bytesReceived << " bytes in " << nano << " ns " << bw << " GB/s";
91 : // std::cout << "resp.data=";
92 : // for (auto d: resp.data) {
93 : // std::cout << d << std::endl;
94 : // }
95 0 : }
96 0 : } catch (ReceiveTimeoutExpired const& exc) {
97 0 : std::cout << "Gave up waiting\n"; // NOLINT(runtime/output_format)
98 0 : }
99 0 : }
|