Line data Source code
1 :
2 : /* Application will run until quit or killed. */
3 :
4 : #include <inttypes.h>
5 : #include <rte_cycles.h>
6 : #include <rte_eal.h>
7 : #include <rte_ethdev.h>
8 : #include <rte_lcore.h>
9 : #include <rte_mbuf.h>
10 : #include <stdint.h>
11 : #include <tuple>
12 :
13 : #include <csignal>
14 : #include <fstream>
15 : #include <iomanip>
16 : #include <limits>
17 : #include <sstream>
18 :
19 : #include "CLI/App.hpp"
20 : #include "CLI/Config.hpp"
21 : #include "CLI/Formatter.hpp"
22 :
23 : #include "detdataformats/DAQEthHeader.hpp"
24 : #include "dpdklibs/EALSetup.hpp"
25 : #include "dpdklibs/udp/PacketCtor.hpp"
26 : #include "dpdklibs/udp/Utils.hpp"
27 : #include "logging/Logging.hpp"
28 :
29 : #include "dpdklibs/udp/PacketCtor.hpp"
30 : #include "dpdklibs/udp/Utils.hpp"
31 : #include "dpdklibs/arp/ARP.hpp"
32 : #include "dpdklibs/ipv4_addr.hpp"
33 :
34 : #include <fmt/core.h>
35 : #include <fmt/ranges.h>
36 :
37 : #include <regex>
38 :
39 : #define RX_RING_SIZE 1024
40 : #define TX_RING_SIZE 1024
41 :
42 : #define NUM_MBUFS 8191
43 : #define MBUF_CACHE_SIZE 250
44 :
45 : #define NSTREAM 128
46 :
47 : #define PG_JUMBO_FRAME_LEN (9600 + RTE_ETHER_CRC_LEN + RTE_ETHER_HDR_LEN)
48 : #ifndef RTE_JUMBO_ETHER_MTU
49 : #define RTE_JUMBO_ETHER_MTU (PG_JUMBO_FRAME_LEN - RTE_ETHER_HDR_LEN - RTE_ETHER_CRC_LEN) /*< Ethernet MTU. */
50 : #endif
51 :
52 : using namespace dunedaq;
53 : using namespace dpdklibs;
54 : using namespace udp;
55 :
56 :
57 : namespace {
58 :
59 : std::ostream & operator <<(std::ostream &out, const StreamUID &obj) {
60 : return out << static_cast<std::string>(obj);
61 : }
62 :
63 : struct StreamStats {
64 : std::atomic<uint64_t> total_packets = 0;
65 : std::atomic<uint64_t> num_packets = 0;
66 : std::atomic<uint64_t> num_bytes = 0;
67 : std::atomic<uint64_t> num_bad_timestamp = 0;
68 : std::atomic<uint64_t> max_timestamp_skip = 0;
69 : std::atomic<uint64_t> num_bad_seq_id = 0;
70 : std::atomic<uint64_t> max_seq_id_skip = 0;
71 : std::atomic<uint64_t> num_bad_payload_size = 0;
72 : std::atomic<uint64_t> min_payload_size = 0;
73 : std::atomic<uint64_t> max_payload_size = 0;
74 : std::atomic<uint64_t> prev_seq_id = 0;
75 : std::atomic<uint64_t> prev_timestamp = 0;
76 :
77 0 : void reset() {
78 0 : num_packets.exchange(0);
79 0 : num_bytes.exchange(0);
80 0 : num_bad_timestamp.exchange(0);
81 0 : max_timestamp_skip.exchange(0);
82 0 : num_bad_seq_id.exchange(0);
83 0 : max_seq_id_skip.exchange(0);
84 0 : num_bad_payload_size.exchange(0);
85 0 : min_payload_size.exchange(0);
86 0 : max_payload_size.exchange(0);
87 0 : }
88 : };
89 :
90 : std::map<StreamUID, StreamStats> stream_stats;
91 :
92 : // Apparently only 8 and above works for "burst_size"
93 :
94 : // From the dpdk documentation, describing the rte_eth_rx_burst
95 : // function (and keeping in mind that their "nb_pkts" variable is the
96 : // same as our "burst size" variable below):
97 : // "Some drivers using vector instructions require that nb_pkts is
98 : // divisible by 4 or 8, depending on the driver implementation."
99 :
100 : bool check_timestamp = false;
101 : bool per_stream_reports = false;
102 :
103 : constexpr int burst_size = 256;
104 :
105 : constexpr uint32_t expected_packet_type = 0x291;
106 :
107 : constexpr int default_mbuf_size = 9000; // As opposed to RTE_MBUF_DEFAULT_BUF_SIZE
108 :
109 : constexpr int max_packets_to_dump = 10;
110 : int dumped_packet_count = 0;
111 :
112 : uint16_t expected_packet_size = 0; //7243; // i.e., every packet that isn't the initial one
113 :
114 : std::atomic<uint64_t> num_packets = 0;
115 : std::atomic<uint64_t> num_bytes = 0;
116 : std::atomic<uint64_t> total_packets = 0;
117 : std::atomic<uint64_t> non_ipv4_packets = 0;
118 : std::atomic<uint64_t> total_bad_timestamp = 0;
119 : std::atomic<uint64_t> num_bad_timestamp = 0;
120 : std::atomic<uint64_t> max_timestamp_skip = 0;
121 : std::atomic<uint64_t> total_bad_seq_id = 0;
122 : std::atomic<uint64_t> num_bad_seq_id = 0;
123 : std::atomic<uint64_t> max_seq_id_skip = 0;
124 : std::atomic<uint64_t> total_bad_payload_size = 0;
125 : std::atomic<uint64_t> num_bad_payload_size = 0;
126 : std::atomic<uint64_t> min_payload_size = 0;
127 : std::atomic<uint64_t> max_payload_size = 0;
128 : std::atomic<uint64_t> udp_pkt_counter = 0;
129 :
130 : std::atomic<int64_t> garps_sent = 0;
131 :
132 : // std::ofstream datafile;
133 : // const std::string output_data_filename = "dpdklibs_test_frame_receiver.dat";
134 :
135 :
136 : } // namespace
137 :
138 0 : std::vector<char*> construct_argv(std::vector<std::string> &std_argv){
139 0 : std::vector<char*> vec_argv;
140 0 : for (int i=0; i < std_argv.size() ; i++){
141 0 : vec_argv.insert(vec_argv.end(), std_argv[i].data());
142 : }
143 0 : return vec_argv;
144 0 : }
145 :
146 0 : static inline int check_against_previous_stream(const detdataformats::DAQEthHeader* daq_hdr, uint64_t exp_ts_diff){
147 : // uint64_t unique_str_id = (daq_hdr->det_id<<22) + (daq_hdr->crate_id<<12) + (daq_hdr->slot_id<<8) + daq_hdr->stream_id;
148 0 : StreamUID unique_str_id(*daq_hdr);
149 0 : uint64_t stream_ts = daq_hdr->timestamp;
150 0 : uint64_t seq_id = daq_hdr->seq_id;
151 0 : int ret_val = 0;
152 :
153 0 : if (check_timestamp) {
154 0 : if (stream_stats[unique_str_id].prev_timestamp == 0 ) {
155 0 : stream_stats[unique_str_id].prev_timestamp = stream_ts;
156 : }else{
157 0 : uint64_t expected_ts = stream_stats[unique_str_id].prev_timestamp + exp_ts_diff;
158 0 : if (stream_ts != expected_ts) {
159 0 : uint64_t ts_difference = stream_ts - stream_stats[unique_str_id].prev_timestamp;
160 0 : ret_val = 1;
161 0 : ++num_bad_timestamp;
162 0 : ++stream_stats[unique_str_id].num_bad_timestamp;
163 0 : ++total_bad_timestamp;
164 0 : if (ts_difference > max_timestamp_skip) {max_timestamp_skip = ts_difference;}
165 0 : if (ts_difference > stream_stats[unique_str_id].max_timestamp_skip) {stream_stats[unique_str_id].max_timestamp_skip = ts_difference;}
166 : }
167 0 : stream_stats[unique_str_id].prev_timestamp = stream_ts;
168 : }
169 : }
170 :
171 :
172 0 : uint64_t expected_seq_id = (stream_stats[unique_str_id].prev_seq_id == 4095) ? 0 : stream_stats[unique_str_id].prev_seq_id + 1;
173 0 : if (seq_id != expected_seq_id) {
174 0 : uint64_t adj_expected_seq_id = (expected_seq_id == 0) ? 4096 : expected_seq_id;
175 0 : uint64_t adj_seq_id = (seq_id < adj_expected_seq_id) ? (4096 + seq_id) : seq_id;
176 0 : uint64_t seq_id_difference = adj_seq_id - adj_expected_seq_id;
177 0 : ret_val += 2;
178 0 : ++num_bad_seq_id;
179 0 : ++total_bad_seq_id;
180 0 : ++stream_stats[unique_str_id].num_bad_seq_id;
181 0 : if (seq_id_difference > max_seq_id_skip) {max_seq_id_skip = seq_id_difference;}
182 0 : if (seq_id_difference > stream_stats[unique_str_id].max_seq_id_skip) {stream_stats[unique_str_id].max_seq_id_skip = seq_id_difference;}
183 : }
184 :
185 0 : return ret_val;
186 : }
187 :
188 0 : static inline int check_packet_size(struct rte_mbuf* mbuf, StreamUID unique_str_id){
189 0 : std::size_t packet_size = mbuf->data_len;
190 :
191 0 : if (packet_size > max_payload_size) {max_payload_size = packet_size;}
192 0 : if (packet_size < min_payload_size) {min_payload_size = packet_size;}
193 0 : if (packet_size > stream_stats[unique_str_id].max_payload_size) {stream_stats[unique_str_id].max_payload_size = packet_size;}
194 0 : if (packet_size < stream_stats[unique_str_id].min_payload_size) {stream_stats[unique_str_id].min_payload_size = packet_size;}
195 :
196 0 : if (expected_packet_size and (packet_size != expected_packet_size)){
197 0 : ++num_bad_payload_size;
198 0 : ++total_bad_payload_size;
199 0 : ++stream_stats[unique_str_id].num_bad_payload_size;
200 0 : return 1;
201 : }
202 : return 0;
203 : }
204 :
205 0 : static int lcore_main(struct rte_mempool* mbuf_pool, uint16_t iface, uint64_t time_per_report, const std::vector<std::string>& garp_ip_addr_strs){
206 : /*
207 : * Check that the iface is on the same NUMA node as the polling thread
208 : * for best performance.
209 : */
210 :
211 0 : if (rte_eth_dev_socket_id(iface) >= 0 && rte_eth_dev_socket_id(iface) != static_cast<int>(rte_socket_id())) {
212 0 : fmt::print(
213 : "WARNING, iface {} is on remote NUMA node to polling thread.\n\tPerformance will not be optimal.\n", iface
214 : );
215 : }
216 :
217 0 : auto stats = std::thread([&]() {
218 0 : while (true) {
219 0 : uint64_t packets_per_second = num_packets / time_per_report;
220 0 : uint64_t bytes_per_second = num_bytes / time_per_report;
221 0 : fmt::print(
222 : "Since the last report {} seconds ago:\n"
223 : "Packets/s: {} Bytes/s: {} Total packets: {} Non-IPV4 packets: {} Total UDP packets: {}\n"
224 : "Packets with wrong sequence id: {}, Max wrong seq_id jump {}, Total Packets with Wrong seq_id {}\n"
225 : "Max udp payload: {}, Min udp payload: {}\n",
226 : time_per_report,
227 : packets_per_second, bytes_per_second, total_packets, non_ipv4_packets, udp_pkt_counter,
228 : num_bad_seq_id, max_seq_id_skip, total_bad_seq_id,
229 : max_payload_size, min_payload_size
230 : );
231 :
232 0 : if (expected_packet_size){
233 0 : fmt::print(
234 : "Packets with wrong payload size: {}, Total Packets with Wrong size {}\n",
235 : num_bad_payload_size, total_bad_payload_size
236 : );
237 : }
238 :
239 0 : if (check_timestamp) {
240 0 : fmt::print(
241 : "Wrong Timestamp difference Packets: {}, Max wrong Timestamp difference {}, Total Packets with Wrong Timestamp {}\n",
242 : num_bad_timestamp, max_timestamp_skip, total_bad_timestamp
243 : );
244 : }
245 :
246 0 : fmt::print("\n");
247 : // for (auto stream = stream_stats.begin(); stream != stream_stats.end(); stream++) {
248 0 : for ( auto& [suid, stats] : stream_stats) {
249 0 : fmt::print("Stream {:15}: n.pkts {} (tot. {})", (std::string)suid, stats.num_packets, stats.total_packets);
250 0 : if (per_stream_reports){
251 0 : float stream_bytes_per_second = (float)stats.num_bytes / (1024.*1024.) / (float)time_per_report;
252 0 : fmt::print(
253 : " {:8.3f} MB/s, seq. jumps: {}",
254 0 : stream_bytes_per_second, stats.num_bad_seq_id
255 : );
256 : }
257 0 : stats.reset();
258 0 : fmt::print("\n");
259 : }
260 :
261 0 : fmt::print("\n");
262 0 : num_packets.exchange(0);
263 0 : num_bytes.exchange(0);
264 0 : num_bad_timestamp.exchange(0);
265 0 : num_bad_seq_id.exchange(0);
266 0 : num_bad_payload_size.exchange(0);
267 0 : max_payload_size.exchange(0);
268 0 : min_payload_size.exchange(0);
269 0 : max_seq_id_skip.exchange(0);
270 0 : max_timestamp_skip.exchange(0);
271 0 : std::this_thread::sleep_for(std::chrono::seconds(time_per_report));
272 0 : }
273 0 : });
274 :
275 : //
276 : // Prepare and start GARP thread
277 : //
278 0 : std::vector<rte_be32_t> ip_addr_bin_vector;
279 0 : for( const auto& ip_addr_str : garp_ip_addr_strs ) {
280 0 : TLOG() << "IP address for ARP responses: " << ip_addr_str;
281 0 : IpAddr ip_addr(ip_addr_str);
282 0 : rte_be32_t ip_addr_bin = ip_address_dotdecimal_to_binary(
283 0 : ip_addr.addr_bytes[0],
284 0 : ip_addr.addr_bytes[1],
285 0 : ip_addr.addr_bytes[2],
286 0 : ip_addr.addr_bytes[3]
287 0 : );
288 0 : ip_addr_bin_vector.push_back(ip_addr_bin);
289 : }
290 :
291 0 : struct rte_mbuf **tx_bufs = (rte_mbuf**) malloc(sizeof(struct rte_mbuf*) * burst_size);
292 0 : rte_pktmbuf_alloc_bulk(mbuf_pool, tx_bufs, burst_size);
293 :
294 0 : auto garp = std::thread([&]() {
295 0 : while (true) {
296 : // TLOG() << "Packets/s: " << num_packets << " Bytes/s: " << num_bytes << " Total packets: " << total_packets << " Failed packets: " << failed_packets;
297 : // num_packets.exchange(0);
298 : // num_bytes.exchange(0);
299 :
300 0 : for(const auto& ip_addr_bin : ip_addr_bin_vector ) {
301 0 : arp::pktgen_send_garp(tx_bufs[0], iface, ip_addr_bin);
302 0 : ++garps_sent;
303 : }
304 :
305 0 : std::this_thread::sleep_for(std::chrono::seconds(1)); // If we sample for anything other than 1s, the rate calculation will need to change
306 0 : }
307 0 : });
308 :
309 :
310 0 : struct rte_mbuf **rx_bufs = (rte_mbuf**) malloc(sizeof(struct rte_mbuf*) * burst_size);
311 0 : rte_pktmbuf_alloc_bulk(mbuf_pool, rx_bufs, burst_size);
312 :
313 : // datafile.open(output_data_filename, std::ios::out | std::ios::binary);
314 : // if ( (datafile.rdstate() & std::ofstream::failbit ) != 0 ) {
315 : // fmt::print("WARNING: Unable to open output file \"{}\"\n", output_data_filename);
316 : // }
317 :
318 0 : while (true) {
319 : /* Get burst of RX packets, from first iface of pair. */
320 0 : const uint16_t nb_rx = rte_eth_rx_burst(iface, 0, rx_bufs, burst_size);
321 :
322 0 : num_packets += nb_rx;
323 0 : total_packets += nb_rx;
324 :
325 0 : for (int i_b = 0; i_b < nb_rx; ++i_b) {
326 0 : num_bytes += rx_bufs[i_b]->pkt_len;
327 :
328 :
329 0 : bool dump_packet = false;
330 0 : if (not RTE_ETH_IS_IPV4_HDR(rx_bufs[i_b]->packet_type)) {
331 0 : non_ipv4_packets++;
332 0 : dump_packet = true;
333 0 : continue;
334 : }
335 0 : ++udp_pkt_counter;
336 :
337 0 : char* udp_payload = udp::get_udp_payload(rx_bufs[i_b]);
338 0 : const detdataformats::DAQEthHeader* daq_hdr = reinterpret_cast<const detdataformats::DAQEthHeader*>(udp_payload);
339 :
340 : // uint64_t unique_str_id = (daq_hdr->det_id<<22) + (daq_hdr->crate_id<<12) + (daq_hdr->slot_id<<8) + daq_hdr->stream_id;
341 0 : StreamUID unique_str_id(*daq_hdr);
342 :
343 : // if ((udp_pkt_counter % 1000000) == 0 ) {
344 : // std::cout << "\nDAQ HEADER:\n" << *daq_hdr<< "\n";
345 : // }
346 0 : if (stream_stats.find(unique_str_id) == stream_stats.end()) {
347 0 : stream_stats[unique_str_id];
348 0 : stream_stats[unique_str_id].prev_seq_id = daq_hdr->seq_id - 1;
349 : }
350 0 : stream_stats[unique_str_id].num_bytes += rx_bufs[i_b]->pkt_len;
351 :
352 0 : if (check_against_previous_stream(daq_hdr, 2048) != 0){
353 0 : dump_packet = true;
354 : }
355 0 : if (check_packet_size(rx_bufs[i_b], unique_str_id) != 0){
356 0 : dump_packet = true;
357 : }
358 :
359 0 : ++stream_stats[unique_str_id].total_packets;
360 0 : ++stream_stats[unique_str_id].num_packets;
361 0 : stream_stats[unique_str_id].prev_seq_id = daq_hdr->seq_id;
362 :
363 0 : if (dump_packet && dumped_packet_count < max_packets_to_dump) {
364 0 : dumped_packet_count++;
365 :
366 0 : rte_pktmbuf_dump(stdout, rx_bufs[i_b], rx_bufs[i_b]->pkt_len);
367 : }
368 : }
369 :
370 0 : rte_pktmbuf_free_bulk(rx_bufs, nb_rx);
371 : }
372 :
373 : return 0;
374 0 : }
375 :
376 : // Define the function to be called when ctrl-c (SIGINT) is sent to process
377 0 : void signal_callback_handler(int signum){
378 0 : fmt::print("Caught signal {}\n", signum);
379 0 : std::exit(signum);
380 : }
381 :
382 0 : int main(int argc, char** argv){
383 0 : uint64_t time_per_report = 1;
384 0 : uint16_t iface = 0;
385 0 : std::vector<std::string> garp_ip_addresses;
386 0 : std::vector<std::string> pcie_addresses;
387 :
388 0 : CLI::App app{"test frame receiver"};
389 0 : app.add_option("-g,--garp-ip-address", garp_ip_addresses, "IP Addresses");
390 0 : app.add_option("-m,--pcie-mask", pcie_addresses, "PCIE Addresses device mask");
391 0 : app.add_option("-s,--exp-frame-size", expected_packet_size, "Expected frame size");
392 0 : app.add_option("-i,--iface", iface, "Interface to init");
393 0 : app.add_option("-t,--report-interval-time", time_per_report, "Time Per Report");
394 0 : app.add_flag("--check-time", check_timestamp, "Report back differences in timestamp");
395 0 : app.add_flag("-p,--per-stream-reports", per_stream_reports, "Detailed per stream reports");
396 :
397 0 : CLI11_PARSE(app, argc, argv);
398 :
399 : // Validate arguments
400 0 : fmt::print("ips : {}\n", fmt::join(garp_ip_addresses," | "));
401 0 : fmt::print("pcies : {}\n", fmt::join(pcie_addresses," | "));
402 :
403 0 : std::regex re_ipv4("[0-9]{1,3}.[0-9]{1,3}.[0-9]{1,3}.[0-9]{1,3}");
404 0 : std::regex re_pcie("^0{0,4}:[a-fA-F0-9]{2}:[a-fA-F0-9]{2}.[0-9]$");
405 :
406 0 : fmt::print("IP addresses\n");
407 0 : bool all_ip_ok = true;
408 0 : for( const auto& ip: garp_ip_addresses) {
409 0 : bool ip_ok = std::regex_match(ip, re_ipv4);
410 0 : fmt::print("- {} {}\n", ip, ip_ok);
411 0 : all_ip_ok &= ip_ok;
412 : }
413 :
414 0 : fmt::print("PCIE addresses\n");
415 0 : bool all_pcie_ok = true;
416 0 : for( const auto& pcie: pcie_addresses) {
417 0 : bool pcie_ok = std::regex_match(pcie, re_pcie);
418 0 : fmt::print("- {} {}\n", pcie, pcie_ok);
419 0 : all_pcie_ok &= pcie_ok;
420 : }
421 :
422 0 : if (!all_ip_ok or !all_pcie_ok) {
423 : return -1;
424 : }
425 :
426 :
427 : // define function to be called when ctrl+c is called.
428 0 : std::signal(SIGINT, signal_callback_handler);
429 :
430 0 : std::vector<std::string> eal_args;
431 0 : eal_args.push_back("dpdklibds_test_garp");
432 0 : for( const auto& pcie: pcie_addresses) {
433 0 : eal_args.push_back("-a");
434 0 : eal_args.push_back(pcie);
435 : }
436 :
437 :
438 0 : dunedaq::dpdklibs::ealutils::init_eal(eal_args);
439 :
440 0 : auto n_ifaces = rte_eth_dev_count_avail();
441 0 : fmt::print("# of available ifaces: {}\n", n_ifaces);
442 0 : if (n_ifaces == 0){
443 0 : std::cout << "WARNING: no available ifaces. exiting...\n";
444 0 : rte_eal_cleanup();
445 0 : return 1;
446 : }
447 :
448 : // Allocate pools and mbufs per queue
449 0 : std::map<int, std::unique_ptr<rte_mempool>> mbuf_pools;
450 0 : std::map<int, struct rte_mbuf **> bufs;
451 0 : const uint16_t n_rx_qs = 1;
452 0 : const uint16_t n_tx_qs = 1;
453 0 : const uint16_t rx_ring_size = 2048;
454 0 : const uint16_t tx_ring_size = 2048;
455 :
456 0 : std::cout << "Allocating pools and mbufs.\n";
457 0 : for (size_t i=0; i<n_rx_qs; ++i) {
458 0 : std::stringstream ss;
459 0 : ss << "MBP-" << i;
460 0 : fmt::print("Pool acquire: {}\n", ss.str());
461 0 : mbuf_pools[i] = ealutils::get_mempool(ss.str());
462 0 : bufs[i] = (rte_mbuf**) malloc(sizeof(struct rte_mbuf*) * burst_size);
463 0 : rte_pktmbuf_alloc_bulk(mbuf_pools[i].get(), bufs[i], burst_size);
464 0 : }
465 :
466 : // Setting up only one iface
467 0 : fmt::print("Initialize only iface {}!\n", iface);
468 0 : ealutils::iface_init(iface, n_rx_qs, n_tx_qs, rx_ring_size, tx_ring_size, mbuf_pools); // just init iface, no TX queues
469 :
470 0 : lcore_main(mbuf_pools[0].get(), iface, time_per_report, garp_ip_addresses);
471 :
472 : rte_eal_cleanup();
473 :
474 : return 0;
475 0 : }
|