Line data Source code
1 : /**
2 : * @file test_frame_transmitter.cxx Construct UDP packets to be sent over the wire using DPDK
3 : *
4 : * This is part of the DUNE DAQ Application Framework, copyright 2020.
5 : * Licensing/copyright details are in the COPYING file that you should have
6 : * received with this code.
7 : */
8 :
9 : #include "dpdklibs/EALSetup.hpp"
10 : #include "dpdklibs/udp/PacketCtor.hpp"
11 : #include "dpdklibs/udp/Utils.hpp"
12 :
13 : #include "detdataformats/DAQEthHeader.hpp"
14 : #include "logging/Logging.hpp"
15 :
16 : #include "rte_cycles.h"
17 : #include "rte_dev.h"
18 : #include "rte_eal.h"
19 : #include "rte_ethdev.h"
20 : #include "rte_lcore.h"
21 : #include "rte_mbuf.h"
22 :
23 : #include "CLI/App.hpp"
24 : #include "CLI/Config.hpp"
25 : #include "CLI/Formatter.hpp"
26 :
27 : #include <iostream>
28 : #include <chrono>
29 : #include <thread>
30 :
31 :
32 : namespace {
33 :
34 : // Only 8 and above works
35 : constexpr int burst_size = 256;
36 :
37 : constexpr int buffer_size = 9800; // Same number as in EALSetup.hpp
38 :
39 : std::string dst_mac_addr = "";
40 : int payload_bytes = 0; // payload past the DAQEthHeader
41 :
42 : std::atomic<long> num_bursts = 0;
43 : std::atomic<long> num_packets = 0;
44 : std::atomic<long> num_bytes = 0;
45 : }
46 :
47 : using namespace dunedaq;
48 : using namespace dpdklibs;
49 : using namespace udp;
50 :
51 0 : void lcore_main(void* arg) {
52 :
53 0 : uint16_t iface = *( static_cast<uint16_t*>(arg) );
54 :
55 0 : uint16_t lid = rte_lcore_id();
56 :
57 0 : if (lid != 1) {
58 0 : return;
59 : }
60 :
61 0 : if (rte_eth_dev_socket_id(iface) >= 0 && rte_eth_dev_socket_id(iface) != static_cast<int>(rte_socket_id())) {
62 0 : TLOG(TLVL_WARNING) << "WARNING, dpdk interface " << iface << " is on remote NUMA node to polling thread.\nrte_eth_dev_socket_id(" << iface << ") == " << rte_eth_dev_socket_id(iface) << ", rte_socket_id() == " << static_cast<int>(rte_socket_id()) << "\nPerformance will not be optimal.\n";
63 : }
64 :
65 0 : struct rte_mempool *mbuf_pool = rte_pktmbuf_pool_create((std::string("MBUF_POOL") + std::to_string(lid)).c_str(), NUM_MBUFS * rte_eth_dev_count_avail(),
66 0 : MBUF_CACHE_SIZE, 0, buffer_size, rte_socket_id());
67 :
68 :
69 0 : if (mbuf_pool == NULL) {
70 0 : rte_exit(EXIT_FAILURE, "ERROR: call to rte_pktmbuf_pool_create failed, info: %s\n", rte_strerror(rte_errno));
71 : }
72 :
73 0 : struct rte_mbuf** bufs = (rte_mbuf**) malloc(sizeof(struct rte_mbuf*) * burst_size);
74 0 : if (bufs == NULL) {
75 0 : TLOG(TLVL_ERROR) << "Failure trying to acquire memory for transmission buffers on thread #" << lid << "; exiting...";
76 0 : std::exit(1);
77 : }
78 :
79 0 : int retval = rte_pktmbuf_alloc_bulk(mbuf_pool, bufs, burst_size);
80 :
81 0 : if (retval != 0) {
82 0 : rte_exit(EXIT_FAILURE, "ERROR: call to rte_pktmbuf_alloc_bulk failed, info: %s\n", strerror(abs(retval)));
83 : }
84 :
85 0 : TLOG() << "In thread, we have iface == " << iface;
86 0 : construct_packets_for_burst(iface, dst_mac_addr, payload_bytes, burst_size, bufs);
87 :
88 0 : TLOG() << "Dump of the first packet header: ";
89 0 : TLOG() << get_udp_header_str(bufs[0]);
90 :
91 0 : TLOG() << "Dump of the first packet rte_mbuf object: ";
92 0 : TLOG() << get_rte_mbuf_str(bufs[0]);
93 :
94 0 : rte_mbuf_sanity_check(bufs[0], 1);
95 :
96 0 : TLOG() << "\n\nCore " << rte_lcore_id() << " transmitting packets. [Ctrl+C to quit]\n\n";
97 :
98 0 : uint16_t nb_tx = 0;
99 0 : int cntr = 0;
100 0 : while (cntr < std::numeric_limits<int>::max()) {
101 0 : cntr++;
102 :
103 0 : nb_tx = rte_eth_tx_burst(iface, lid-1, bufs, burst_size);
104 0 : num_packets += nb_tx;
105 0 : num_bytes += nb_tx * bufs[0]->pkt_len; // n.b. Assumption in this line is that each packet is the same size
106 0 : num_bursts++;
107 :
108 : // Free any unsent packets
109 0 : if (unlikely(nb_tx < burst_size)) {
110 0 : rte_pktmbuf_free_bulk(bufs, burst_size - nb_tx);
111 : }
112 : }
113 : }
114 :
115 0 : int main(int argc, char* argv[]) {
116 :
117 0 : CLI::App app{"test frame transmitter"};
118 :
119 0 : const std::string default_mac_address = "6c:fe:54:47:98:20";
120 :
121 0 : app.add_option("--dst-mac", dst_mac_addr, "Destination MAC address (default " + default_mac_address + ")");
122 0 : app.add_option("--payload", payload_bytes, "Bytes of payload past the DAQEthHeader header");
123 :
124 0 : if (dst_mac_addr == "") {
125 0 : dst_mac_addr = default_mac_address;
126 : }
127 :
128 0 : CLI11_PARSE(app, argc, argv);
129 :
130 0 : argc = 1; // Set to 1 so rte_eal_init ignores all the CLI-parsed arguments
131 :
132 0 : int retval = rte_eal_init(argc, argv);
133 0 : if (retval < 0) {
134 0 : rte_exit(EXIT_FAILURE, "ERROR: EAL initialization failed, info: %s\n", strerror(abs(retval)));
135 : }
136 :
137 : // Check that there is an even number of ports to send/receive on
138 0 : auto nb_ports = rte_eth_dev_count_avail();
139 0 : TLOG() << "There are " << nb_ports << " ethernet ports available out of a total of " << rte_eth_dev_count_total();
140 0 : if (nb_ports == 0) {
141 0 : rte_exit(EXIT_FAILURE, "ERROR: 0 ethernet ports are available. This can be caused either by someone else currently\nusing dpdk-based code or the necessary drivers not being bound to the NICs\n(see https://github.com/DUNE-DAQ/dpdklibs#readme for more)\n");
142 : }
143 :
144 0 : const uint16_t n_tx_qs = 1;
145 0 : const uint16_t n_rx_qs = 0;
146 0 : const uint16_t rx_ring_size = 1024;
147 0 : const uint16_t tx_ring_size = 1024;
148 :
149 0 : uint16_t portid = 0;
150 0 : std::map<int, std::unique_ptr<rte_mempool>> dummyarg;
151 0 : retval = ealutils::iface_init(portid, n_rx_qs, n_tx_qs, rx_ring_size, tx_ring_size, dummyarg);
152 :
153 0 : if (retval != 0) {
154 0 : TLOG(TLVL_ERROR) << "A failure occurred initializing ethernet interface #" << portid << "; exiting...";
155 0 : rte_eal_cleanup();
156 0 : std::exit(2);
157 : }
158 :
159 0 : struct rte_eth_dev_info dev_info;
160 0 : retval = rte_eth_dev_info_get(portid, &dev_info);
161 :
162 0 : if (retval != 0) {
163 0 : TLOG(TLVL_ERROR) << "A failure occured trying to get info on ethernet interface #" << portid << "; exiting...";
164 0 : rte_eal_cleanup();
165 0 : std::exit(4);
166 : }
167 :
168 0 : TLOG() << "Name of the interface is " << rte_dev_name(dev_info.device);
169 :
170 0 : auto stats = std::thread([&]() {
171 0 : long sleep_time = 1;
172 0 : while (true) {
173 0 : TLOG() << "Rates: " << num_packets / static_cast<double>(sleep_time) << " packets/s, " << num_bytes / static_cast<double>(sleep_time) << " bytes/s, " << num_bursts / static_cast<double>(sleep_time) << " bursts/s" << "\n";
174 0 : num_packets.exchange(0);
175 0 : num_bytes.exchange(0);
176 0 : num_bursts.exchange(0);
177 0 : std::this_thread::sleep_for(std::chrono::seconds(sleep_time));
178 0 : }
179 0 : });
180 :
181 0 : rte_eal_mp_remote_launch((lcore_function_t *) lcore_main, &portid, SKIP_MAIN);
182 :
183 0 : rte_eal_mp_wait_lcore();
184 0 : rte_eal_cleanup();
185 :
186 0 : return 0;
187 0 : }
|