Line data Source code
1 : /**
2 : *
3 : * @file test_transmit_and_receive.cxx
4 : *
5 : * This is a standalone program which will transmit ethernet packets
6 : * out on one dpdk-enabled network interface port and receive them on
7 : * another. Obviously you actually need two such ports for this to
8 : * work.
9 : *
10 : *
11 : * This is part of the DUNE DAQ Application Framework, copyright 2020.
12 : * Licensing/copyright details are in the COPYING file that you should have
13 : * received with this code.
14 : */
15 :
16 : #include "dpdklibs/EALSetup.hpp"
17 : #include "dpdklibs/udp/PacketCtor.hpp"
18 : #include "dpdklibs/udp/Utils.hpp"
19 :
20 : #include "detdataformats/DAQEthHeader.hpp"
21 : #include "logging/Logging.hpp"
22 :
23 : #include "rte_common.h"
24 : #include "rte_eal.h"
25 : #include "rte_ethdev.h"
26 :
27 : #include "CLI/App.hpp"
28 : #include "CLI/Config.hpp"
29 : #include "CLI/Formatter.hpp"
30 :
31 : #include <limits>
32 : #include <mutex>
33 :
34 : using namespace dunedaq::dpdklibs;
35 : using dunedaq::detdataformats::DAQEthHeader;
36 :
37 : namespace {
38 :
39 : constexpr int burst_size = 256;
40 :
41 : std::mutex lcore_thread_mutex;
42 :
43 : int payload_bytes = 9000;
44 :
45 : std::atomic<long> num_sent_bursts = 0;
46 : std::atomic<long> num_sent_packets = 0;
47 : std::atomic<long> num_received_packets = 0;
48 : std::atomic<long> num_sent_bytes = 0;
49 : std::atomic<long> num_received_bytes = 0;
50 : std::atomic<long> num_bad_packets = 0;
51 :
52 : }; // namespace
53 :
54 : struct lcore_thread_arg {
55 : rte_mempool* pool;
56 : int iface_id;
57 : bool is_receiver;
58 : };
59 :
60 0 : inline bool operator!=(const DAQEthHeader& header1, const DAQEthHeader& header2) {
61 0 : return (header1.version != header2.version ||
62 0 : header1.det_id != header2.det_id ||
63 0 : header1.crate_id != header2.crate_id ||
64 0 : header1.slot_id != header2.slot_id ||
65 0 : header1.stream_id != header2.stream_id ||
66 0 : header1.seq_id != header2.seq_id ||
67 0 : header1.block_length != header2.block_length ||
68 0 : header1.timestamp != header2.timestamp);
69 : }
70 :
71 :
72 0 : int lcore_main(void* arg) {
73 :
74 0 : uint16_t lid = rte_lcore_id();
75 :
76 : // 0: main thread (doesn't use this function), 1: receiver thread, 2: transmitter thread
77 0 : if (lid == 0 || lid >= 3) {
78 : return 0;
79 : }
80 :
81 0 : TLOG() << "From lcore_main, lid == " << lid << ", rte_socket_id() == " << rte_socket_id();
82 :
83 0 : lcore_thread_arg info;
84 0 : const auto ptr_to_vector_of_thread_configs = static_cast<std::vector<lcore_thread_arg>*>(arg);
85 :
86 0 : {
87 0 : std::lock_guard<std::mutex> guard(lcore_thread_mutex);
88 0 : info = ptr_to_vector_of_thread_configs->at(lid - 1);
89 0 : }
90 0 : TLOG() << "In lcore_main, thread #" << lid << ", iface_id == " << info.iface_id << ", is_receiver == " << info.is_receiver;
91 :
92 0 : if (rte_eth_dev_socket_id(info.iface_id) >= 0 && rte_eth_dev_socket_id(info.iface_id) != static_cast<int>(rte_socket_id())) {
93 0 : TLOG(TLVL_WARNING) << "WARNING, dpdk interface " << info.iface_id << " is on remote NUMA node to polling thread.\nrte_eth_dev_socket_id(" << info.iface_id << ") == " << rte_eth_dev_socket_id(info.iface_id) << ", rte_socket_id() == " << static_cast<int>(rte_socket_id()) << "\nPerformance will not be optimal.\n";
94 : }
95 :
96 0 : auto bufs = static_cast<rte_mbuf**>( malloc(sizeof(struct rte_mbuf*) * burst_size) );
97 :
98 0 : if (bufs == NULL) {
99 0 : TLOG(TLVL_ERROR) << "Unable to allocate memory for buffers; returning from thread...";
100 0 : return 1;
101 : }
102 :
103 0 : int retval = 0;
104 0 : retval = rte_pktmbuf_alloc_bulk(info.pool, bufs, burst_size);
105 :
106 0 : if (retval != 0) {
107 0 : std::stringstream errstr;
108 0 : errstr << "A failure occurred calling rte_pktmbuf_alloc_bulk (" << strerror(abs(retval)) << "); returning from thread...";
109 0 : return 2;
110 0 : }
111 :
112 0 : if (info.is_receiver) {
113 :
114 0 : uint16_t nb_rx = 0;
115 0 : int bytes_in_burst = 0;
116 0 : int bad_packets_in_burst = 0;
117 :
118 : // Here, I'm taking advantage of the fact that the transmitting
119 : // thread calls construct_packets_for_burst, which itself fills
120 : // the DAQEthHeader structure inside the packet it sends using
121 : // "set_daqethheader_test_values". I'll compare the received
122 : // packet's DAQEthHeader to this one and make sure they're equal
123 :
124 0 : DAQEthHeader daqethheader_reference;
125 0 : udp::set_daqethheader_test_values(daqethheader_reference);
126 0 : DAQEthHeader* daqethheader_ptr = nullptr;
127 :
128 0 : while (true) {
129 :
130 0 : bytes_in_burst = 0; // Quicker to add up the bytes in the packets in a local variables, then attach the total to the atomic variable used in stats reporting
131 0 : bad_packets_in_burst = 0; // Same thinking
132 :
133 0 : nb_rx = rte_eth_rx_burst(info.iface_id, 0, bufs, burst_size);
134 0 : num_received_packets += nb_rx;
135 :
136 0 : for (int i_p = 0; i_p < nb_rx; ++i_p) {
137 0 : bytes_in_burst += bufs[i_p]->pkt_len;
138 :
139 : // DAQEthHeader (should be) the first thing after the ethernet + IPv4 + UDP headers
140 0 : daqethheader_ptr = rte_pktmbuf_mtod_offset(bufs[0], DAQEthHeader*, sizeof(udp::ipv4_udp_packet_hdr));
141 0 : if (*daqethheader_ptr != daqethheader_reference) {
142 0 : bad_packets_in_burst++;
143 : }
144 : }
145 :
146 0 : num_received_bytes += bytes_in_burst;
147 0 : num_bad_packets += bad_packets_in_burst;
148 :
149 : // Free any unsent packets
150 0 : if (unlikely(nb_rx < burst_size)) {
151 0 : rte_pktmbuf_free_bulk(bufs, burst_size - nb_rx);
152 : }
153 : }
154 :
155 : } else {
156 :
157 0 : rte_ether_addr dst_mac_addr_struct;
158 :
159 : // Notice that in the snippet of code below, the assumption is that
160 : // the first entry in the vector of thread configurations
161 : // corresponds to the receiving thread
162 0 : uint16_t receiver_iface = std::numeric_limits<uint16_t>::max();
163 0 : {
164 0 : std::lock_guard<std::mutex> guard(lcore_thread_mutex);
165 0 : receiver_iface = ptr_to_vector_of_thread_configs->at(0).iface_id;
166 0 : }
167 :
168 0 : retval = rte_eth_macaddr_get(receiver_iface, &dst_mac_addr_struct);
169 :
170 0 : if (retval == 0) {
171 0 : TLOG() << "Will send to ethernet interface with MAC address " << ealutils::get_mac_addr_str(dst_mac_addr_struct);
172 : } else {
173 0 : TLOG() << "Problem trying to obtain the MAC address of the destination port; returning...";
174 0 : return 3;
175 : }
176 :
177 0 : const std::string dst_mac_addr = ealutils::get_mac_addr_str(dst_mac_addr_struct);
178 0 : udp::construct_packets_for_burst(info.iface_id, dst_mac_addr, payload_bytes, burst_size, bufs);
179 :
180 0 : TLOG() << "Dump of the first packet header: ";
181 0 : TLOG() << udp::get_udp_header_str(bufs[0]);
182 :
183 0 : TLOG() << "Dump of the first packet rte_mbuf object: ";
184 0 : TLOG() << udp::get_rte_mbuf_str(bufs[0]);
185 :
186 0 : rte_mbuf_sanity_check(bufs[0], 1);
187 :
188 0 : uint16_t nb_tx = 0;
189 :
190 0 : while (true) {
191 :
192 0 : int queue_id = 0;
193 : //TLOG() << "Just about to call rte_eth_tx_burst with arguments info.iface_id == " << info.iface_id << ", second argument " << queue_id << ", bufs at " << bufs << " and burst size " << burst_size;
194 0 : nb_tx = rte_eth_tx_burst(info.iface_id, queue_id, bufs, burst_size);
195 0 : num_sent_packets += nb_tx;
196 0 : num_sent_bytes += nb_tx * bufs[0]->pkt_len;
197 0 : num_sent_bursts++;
198 : //TLOG() << "Sent " << nb_tx;
199 :
200 : // Free any unsent packets
201 0 : if (unlikely(nb_tx < burst_size)) {
202 0 : rte_pktmbuf_free_bulk(bufs, burst_size - nb_tx);
203 : }
204 : }
205 0 : }
206 : return 0;
207 : }
208 :
209 0 : int main(int argc, char** argv) {
210 :
211 0 : CLI::App app{"test transmit and receive"};
212 :
213 0 : std::stringstream payload_desc;
214 0 : payload_desc << "Bytes of payload past the DAQEthHeader header (default " << payload_bytes << " bytes)";
215 0 : app.add_option("--payload", payload_bytes, payload_desc.str());
216 :
217 0 : CLI11_PARSE(app, argc, argv);
218 :
219 0 : argc = 1; // So rte_eal_init doesn't interpret what gets passed to the command line
220 :
221 0 : int retval = rte_eal_init(argc, argv);
222 :
223 0 : if (retval < 0) {
224 0 : rte_exit(EXIT_FAILURE, "ERROR: EAL initialization failed, info: %s\n", strerror(abs(retval)));
225 : }
226 :
227 0 : int nb_ports = rte_eth_dev_count_avail();
228 0 : TLOG() << "There are " << nb_ports << " ethernet ports available out of a total of " << rte_eth_dev_count_total();
229 :
230 0 : if (nb_ports < 2) {
231 0 : rte_exit(EXIT_FAILURE, "ERROR: fewer than 2 ethernet ports are available.\nYou need one port for transmitting packets and one port for receiving them\n");
232 : }
233 :
234 0 : auto stats = std::thread([&]() {
235 0 : long sleep_time = 1;
236 0 : while (true) {
237 0 : TLOG() << "\nRates: \n" << num_sent_bursts / static_cast<double>(sleep_time) << " sent bursts/s\n" << num_sent_packets / static_cast<double>(sleep_time) << " sent packets/s\n" << num_received_packets / static_cast<double>(sleep_time) << " received packets/s\n" << num_bad_packets / static_cast<double>(sleep_time) << " bad packets/s\n"<< num_sent_bytes / static_cast<double>(sleep_time) << " sent bytes/s\n" << num_received_bytes / static_cast<double>(sleep_time) << " received bytes/s\n";
238 0 : num_sent_packets.exchange(0);
239 0 : num_sent_bytes.exchange(0);
240 0 : num_sent_bursts.exchange(0);
241 0 : num_received_packets.exchange(0);
242 0 : num_received_bytes.exchange(0);
243 0 : num_bad_packets.exchange(0);
244 0 : std::this_thread::sleep_for(std::chrono::seconds(sleep_time));
245 0 : }
246 0 : });
247 :
248 :
249 0 : uint16_t portid = std::numeric_limits<uint16_t>::max();
250 0 : int n_rx_qs = 0;
251 0 : int n_tx_qs = 0;
252 0 : uint16_t rx_ring_size = 1024;
253 0 : uint16_t tx_ring_size = 1024;
254 0 : int port_cntr = -1;
255 0 : std::map<int, std::unique_ptr<rte_mempool>> mbuf_pools;
256 0 : std::vector<lcore_thread_arg> lcore_thread_args;
257 :
258 0 : RTE_ETH_FOREACH_DEV(portid) { // RTE_ETH_FOREACH_DEV since no guarantee dpdk counts port IDs as 0, 1, 2 ...
259 :
260 0 : port_cntr++; // Initialize at -1, start at 0
261 :
262 : // Right now (May-2-2023) iface_init's implementation explicitly
263 : // assumes that the pools corresponding to <N> receiver queues
264 : // correspond to keys 0...N-1 in a map whose values are
265 : // unique_ptrs to pools
266 :
267 0 : std::stringstream poolname;
268 0 : poolname << "MBP-" << portid;
269 0 : mbuf_pools[port_cntr] = ealutils::get_mempool(poolname.str());
270 :
271 0 : bool is_receiver = false;
272 :
273 0 : if (port_cntr == 0) {
274 : n_tx_qs = 0;
275 : n_rx_qs = 1;
276 : is_receiver = true;
277 0 : } else if (port_cntr == 1) {
278 0 : n_tx_qs = 1;
279 0 : n_rx_qs = 0;
280 0 : is_receiver = false;
281 : }
282 :
283 0 : retval = ealutils::iface_init(portid, n_rx_qs, n_tx_qs, rx_ring_size, tx_ring_size, mbuf_pools);
284 :
285 0 : if (retval != 0) {
286 0 : rte_eal_cleanup();
287 :
288 0 : std::stringstream errstr;
289 0 : errstr << "A failure occurred initializing ethernet interface #" << portid << " (" << strerror(abs(retval)) << "); exiting...";
290 0 : rte_exit(EXIT_FAILURE, errstr.str().c_str());
291 0 : }
292 :
293 0 : auto pool = mbuf_pools[port_cntr].release();
294 0 : mbuf_pools.erase( mbuf_pools.find(port_cntr) );
295 :
296 0 : lcore_thread_args.emplace_back( lcore_thread_arg { pool, portid, is_receiver } );
297 0 : }
298 :
299 0 : rte_eal_mp_remote_launch(lcore_main, &lcore_thread_args, SKIP_MAIN);
300 :
301 0 : rte_eal_mp_wait_lcore();
302 0 : rte_eal_cleanup();
303 :
304 0 : int mynum = 55;
305 0 : TLOG() << "Things are still OK at this line, and here's a number: " << mynum;
306 :
307 0 : return 0;
308 0 : }
|