Line data Source code
1 :
2 : /* Application will run until quit or killed. */
3 :
4 :
5 : #undef RTE_ETH_DEV_FLOW_OPS_THREAD_SAFE
6 : #define RTE_ETH_DEV_FLOW_OPS_THREAD_SAFE 0
7 :
8 : #include <fmt/core.h>
9 : #include <inttypes.h>
10 : #include <rte_cycles.h>
11 : #include <rte_eal.h>
12 : #include <rte_ethdev.h>
13 : #include <rte_lcore.h>
14 : #include <rte_mbuf.h>
15 : #include <stdint.h>
16 : #include <tuple>
17 :
18 : #include <csignal>
19 : #include <fstream>
20 : #include <iomanip>
21 : #include <limits>
22 : #include <sstream>
23 :
24 : #include "CLI/App.hpp"
25 : #include "CLI/Config.hpp"
26 : #include "CLI/Formatter.hpp"
27 :
28 :
29 : #undef RTE_ETH_DEV_FLOW_OPS_THREAD_SAFE
30 : #define RTE_ETH_DEV_FLOW_OPS_THREAD_SAFE 0
31 :
32 : #include "detdataformats/DAQEthHeader.hpp"
33 : #include "dpdklibs/udp/PacketCtor.hpp"
34 : #include "dpdklibs/udp/Utils.hpp"
35 : #include "logging/Logging.hpp"
36 : #include "dpdklibs/EALSetup.hpp"
37 : #include "dpdklibs/FlowControl.hpp"
38 :
39 : #define RX_RING_SIZE 1024
40 : #define TX_RING_SIZE 1024
41 :
42 : #define NUM_MBUFS 8191
43 : #define MBUF_CACHE_SIZE 250
44 :
45 : #define NSTREAM 128
46 :
47 : #define PG_JUMBO_FRAME_LEN (9600 + RTE_ETHER_CRC_LEN + RTE_ETHER_HDR_LEN)
48 : #ifndef RTE_JUMBO_ETHER_MTU
49 : #define RTE_JUMBO_ETHER_MTU (PG_JUMBO_FRAME_LEN - RTE_ETHER_HDR_LEN - RTE_ETHER_CRC_LEN) /*< Ethernet MTU. */
50 : #endif
51 :
52 :
53 : using namespace dunedaq;
54 : using namespace dpdklibs;
55 : using namespace udp;
56 :
57 :
58 : namespace {
59 :
60 : std::ostream & operator <<(std::ostream &out, const StreamUID &obj) {
61 : return out << static_cast<std::string>(obj);
62 : }
63 :
64 : struct StreamStats {
65 : std::atomic<uint64_t> total_packets = 0;
66 : std::atomic<uint64_t> num_packets = 0;
67 : std::atomic<uint64_t> num_bytes = 0;
68 : std::atomic<uint64_t> num_bad_timestamp = 0;
69 : std::atomic<uint64_t> max_timestamp_skip = 0;
70 : std::atomic<uint64_t> num_bad_seq_id = 0;
71 : std::atomic<uint64_t> max_seq_id_skip = 0;
72 : std::atomic<uint64_t> num_bad_payload_size = 0;
73 : std::atomic<uint64_t> min_payload_size = 0;
74 : std::atomic<uint64_t> max_payload_size = 0;
75 : std::atomic<uint64_t> prev_seq_id = 0;
76 : std::atomic<uint64_t> prev_timestamp = 0;
77 :
78 0 : void reset() {
79 0 : num_packets.exchange(0);
80 0 : num_bytes.exchange(0);
81 0 : num_bad_timestamp.exchange(0);
82 0 : max_timestamp_skip.exchange(0);
83 0 : num_bad_seq_id.exchange(0);
84 0 : max_seq_id_skip.exchange(0);
85 0 : num_bad_payload_size.exchange(0);
86 0 : min_payload_size.exchange(0);
87 0 : max_payload_size.exchange(0);
88 0 : }
89 : };
90 :
91 : std::map<StreamUID, StreamStats> stream_stats;
92 :
93 : // Apparently only 8 and above works for "burst_size"
94 :
95 : // From the dpdk documentation, describing the rte_eth_rx_burst
96 : // function (and keeping in mind that their "nb_pkts" variable is the
97 : // same as our "burst size" variable below):
98 : // "Some drivers using vector instructions require that nb_pkts is
99 : // divisible by 4 or 8, depending on the driver implementation."
100 :
101 : bool check_timestamp = false;
102 : bool per_stream_reports = false;
103 :
104 : constexpr int burst_size = 256;
105 :
106 : constexpr uint32_t expected_packet_type = 0x291;
107 :
108 : constexpr int default_mbuf_size = 9000; // As opposed to RTE_MBUF_DEFAULT_BUF_SIZE
109 :
110 : constexpr int max_packets_to_dump = 10;
111 : int dumped_packet_count = 0;
112 :
113 : uint16_t expected_packet_size = 0; //7243; // i.e., every packet that isn't the initial one
114 :
115 : std::atomic<uint64_t> num_packets = 0;
116 : std::atomic<uint64_t> num_bytes = 0;
117 : std::atomic<uint64_t> total_packets = 0;
118 : std::atomic<uint64_t> non_ipv4_packets = 0;
119 : std::atomic<uint64_t> total_bad_timestamp = 0;
120 : std::atomic<uint64_t> num_bad_timestamp = 0;
121 : std::atomic<uint64_t> max_timestamp_skip = 0;
122 : std::atomic<uint64_t> total_bad_seq_id = 0;
123 : std::atomic<uint64_t> num_bad_seq_id = 0;
124 : std::atomic<uint64_t> max_seq_id_skip = 0;
125 : std::atomic<uint64_t> total_bad_payload_size = 0;
126 : std::atomic<uint64_t> num_bad_payload_size = 0;
127 : std::atomic<uint64_t> min_payload_size = 0;
128 : std::atomic<uint64_t> max_payload_size = 0;
129 : std::atomic<uint64_t> udp_pkt_counter = 0;
130 :
131 : std::ofstream datafile;
132 : const std::string output_data_filename = "dpdklibs_test_frame_receiver.dat";
133 :
134 :
135 : } // namespace
136 :
137 0 : std::vector<char*> construct_argv(std::vector<std::string> &std_argv){
138 0 : std::vector<char*> vec_argv;
139 0 : for (int i=0; i < std_argv.size() ; i++){
140 0 : vec_argv.insert(vec_argv.end(), std_argv[i].data());
141 : }
142 0 : return vec_argv;
143 0 : }
144 :
145 0 : static inline int check_against_previous_stream(const detdataformats::DAQEthHeader* daq_hdr, uint64_t exp_ts_diff){
146 : // uint64_t unique_str_id = (daq_hdr->det_id<<22) + (daq_hdr->crate_id<<12) + (daq_hdr->slot_id<<8) + daq_hdr->stream_id;
147 0 : StreamUID unique_str_id(*daq_hdr);
148 0 : uint64_t stream_ts = daq_hdr->timestamp;
149 0 : uint64_t seq_id = daq_hdr->seq_id;
150 0 : int ret_val = 0;
151 :
152 0 : if (check_timestamp) {
153 0 : if (stream_stats[unique_str_id].prev_timestamp == 0 ) {
154 0 : stream_stats[unique_str_id].prev_timestamp = stream_ts;
155 : }else{
156 0 : uint64_t expected_ts = stream_stats[unique_str_id].prev_timestamp + exp_ts_diff;
157 0 : if (stream_ts != expected_ts) {
158 0 : uint64_t ts_difference = stream_ts - stream_stats[unique_str_id].prev_timestamp;
159 0 : ret_val = 1;
160 0 : ++num_bad_timestamp;
161 0 : ++stream_stats[unique_str_id].num_bad_timestamp;
162 0 : ++total_bad_timestamp;
163 0 : if (ts_difference > max_timestamp_skip) {max_timestamp_skip = ts_difference;}
164 0 : if (ts_difference > stream_stats[unique_str_id].max_timestamp_skip) {stream_stats[unique_str_id].max_timestamp_skip = ts_difference;}
165 : }
166 0 : stream_stats[unique_str_id].prev_timestamp = stream_ts;
167 : }
168 : }
169 :
170 :
171 0 : uint64_t expected_seq_id = (stream_stats[unique_str_id].prev_seq_id == 4095) ? 0 : stream_stats[unique_str_id].prev_seq_id + 1;
172 0 : if (seq_id != expected_seq_id) {
173 0 : uint64_t adj_expected_seq_id = (expected_seq_id == 0) ? 4096 : expected_seq_id;
174 0 : uint64_t adj_seq_id = (seq_id < adj_expected_seq_id) ? (4096 + seq_id) : seq_id;
175 0 : uint64_t seq_id_difference = adj_seq_id - adj_expected_seq_id;
176 0 : ret_val += 2;
177 0 : ++num_bad_seq_id;
178 0 : ++total_bad_seq_id;
179 0 : ++stream_stats[unique_str_id].num_bad_seq_id;
180 0 : if (seq_id_difference > max_seq_id_skip) {max_seq_id_skip = seq_id_difference;}
181 0 : if (seq_id_difference > stream_stats[unique_str_id].max_seq_id_skip) {stream_stats[unique_str_id].max_seq_id_skip = seq_id_difference;}
182 : }
183 :
184 0 : return ret_val;
185 : }
186 :
187 0 : static inline int check_packet_size(struct rte_mbuf* mbuf, StreamUID unique_str_id){
188 0 : std::size_t packet_size = mbuf->data_len;
189 :
190 0 : if (packet_size > max_payload_size) {max_payload_size = packet_size;}
191 0 : if (packet_size < min_payload_size) {min_payload_size = packet_size;}
192 0 : if (packet_size > stream_stats[unique_str_id].max_payload_size) {stream_stats[unique_str_id].max_payload_size = packet_size;}
193 0 : if (packet_size < stream_stats[unique_str_id].min_payload_size) {stream_stats[unique_str_id].min_payload_size = packet_size;}
194 :
195 0 : if (expected_packet_size and (packet_size != expected_packet_size)){
196 0 : ++num_bad_payload_size;
197 0 : ++total_bad_payload_size;
198 0 : ++stream_stats[unique_str_id].num_bad_payload_size;
199 0 : return 1;
200 : }
201 : return 0;
202 : }
203 :
204 0 : static int lcore_main(struct rte_mempool* mbuf_pool, uint16_t iface, uint64_t time_per_report){
205 : /*
206 : * Check that the iface is on the same NUMA node as the polling thread
207 : * for best performance.
208 : */
209 :
210 0 : if (rte_eth_dev_socket_id(iface) >= 0 && rte_eth_dev_socket_id(iface) != static_cast<int>(rte_socket_id())) {
211 0 : fmt::print(
212 : "WARNING, iface {} is on remote NUMA node to polling thread.\n\tPerformance will not be optimal.\n", iface
213 : );
214 : }
215 :
216 0 : auto stats = std::thread([&]() {
217 0 : while (true) {
218 0 : uint64_t packets_per_second = num_packets / time_per_report;
219 0 : uint64_t bytes_per_second = num_bytes / time_per_report;
220 0 : fmt::print(
221 : "Since the last report {} seconds ago:\n"
222 : "Packets/s: {} Bytes/s: {} Total packets: {} Non-IPV4 packets: {} Total UDP packets: {}\n"
223 : "Packets with wrong sequence id: {}, Max wrong seq_id jump {}, Total Packets with Wrong seq_id {}\n"
224 : "Max udp payload: {}, Min udp payload: {}\n",
225 : time_per_report,
226 : packets_per_second, bytes_per_second, total_packets, non_ipv4_packets, udp_pkt_counter,
227 : num_bad_seq_id, max_seq_id_skip, total_bad_seq_id,
228 : max_payload_size, min_payload_size
229 : );
230 :
231 0 : if (expected_packet_size){
232 0 : fmt::print(
233 : "Packets with wrong payload size: {}, Total Packets with Wrong size {}\n",
234 : num_bad_payload_size, total_bad_payload_size
235 : );
236 : }
237 :
238 0 : if (check_timestamp) {
239 0 : fmt::print(
240 : "Wrong Timestamp difference Packets: {}, Max wrong Timestamp difference {}, Total Packets with Wrong Timestamp {}\n",
241 : num_bad_timestamp, max_timestamp_skip, total_bad_timestamp
242 : );
243 : }
244 :
245 0 : fmt::print("\n");
246 : // for (auto stream = stream_stats.begin(); stream != stream_stats.end(); stream++) {
247 0 : for ( auto& [suid, stats] : stream_stats) {
248 0 : fmt::print("Stream {:15}: n.pkts {} (tot. {})", (std::string)suid, stats.num_packets, stats.total_packets);
249 0 : if (per_stream_reports){
250 0 : float stream_bytes_per_second = (float)stats.num_bytes / (1024.*1024.) / (float)time_per_report;
251 0 : fmt::print(
252 : " {:8.3f} MB/s, seq. jumps: {}",
253 0 : stream_bytes_per_second, stats.num_bad_seq_id
254 : );
255 : }
256 0 : stats.reset();
257 0 : fmt::print("\n");
258 : }
259 :
260 0 : fmt::print("\n");
261 0 : num_packets.exchange(0);
262 0 : num_bytes.exchange(0);
263 0 : num_bad_timestamp.exchange(0);
264 0 : num_bad_seq_id.exchange(0);
265 0 : num_bad_payload_size.exchange(0);
266 0 : max_payload_size.exchange(0);
267 0 : min_payload_size.exchange(0);
268 0 : max_seq_id_skip.exchange(0);
269 0 : max_timestamp_skip.exchange(0);
270 0 : std::this_thread::sleep_for(std::chrono::seconds(time_per_report));
271 0 : }
272 0 : });
273 :
274 0 : struct rte_mbuf **bufs = (rte_mbuf**) malloc(sizeof(struct rte_mbuf*) * burst_size);
275 0 : rte_pktmbuf_alloc_bulk(mbuf_pool, bufs, burst_size);
276 :
277 0 : datafile.open(output_data_filename, std::ios::out | std::ios::binary);
278 0 : if ( (datafile.rdstate() & std::ofstream::failbit ) != 0 ) {
279 0 : fmt::print("WARNING: Unable to open output file \"{}\"\n", output_data_filename);
280 : }
281 :
282 0 : while (true) {
283 : /* Get burst of RX packets, from first iface of pair. */
284 0 : const uint16_t nb_rx = rte_eth_rx_burst(iface, 0, bufs, burst_size);
285 :
286 0 : num_packets += nb_rx;
287 0 : total_packets += nb_rx;
288 :
289 0 : for (int i_b = 0; i_b < nb_rx; ++i_b) {
290 0 : num_bytes += bufs[i_b]->pkt_len;
291 :
292 :
293 0 : bool dump_packet = false;
294 0 : if (not RTE_ETH_IS_IPV4_HDR(bufs[i_b]->packet_type)) {
295 0 : non_ipv4_packets++;
296 0 : dump_packet = true;
297 0 : continue;
298 : }
299 0 : ++udp_pkt_counter;
300 :
301 0 : char* udp_payload = udp::get_udp_payload(bufs[i_b]);
302 0 : const detdataformats::DAQEthHeader* daq_hdr = reinterpret_cast<const detdataformats::DAQEthHeader*>(udp_payload);
303 :
304 : // uint64_t unique_str_id = (daq_hdr->det_id<<22) + (daq_hdr->crate_id<<12) + (daq_hdr->slot_id<<8) + daq_hdr->stream_id;
305 0 : StreamUID unique_str_id(*daq_hdr);
306 :
307 : // if ((udp_pkt_counter % 1000000) == 0 ) {
308 : // std::cout << "\nDAQ HEADER:\n" << *daq_hdr<< "\n";
309 : // }
310 0 : if (stream_stats.find(unique_str_id) == stream_stats.end()) {
311 0 : stream_stats[unique_str_id];
312 0 : stream_stats[unique_str_id].prev_seq_id = daq_hdr->seq_id - 1;
313 : }
314 0 : stream_stats[unique_str_id].num_bytes += bufs[i_b]->pkt_len;
315 :
316 0 : if (check_against_previous_stream(daq_hdr, 2048) != 0){
317 0 : dump_packet = true;
318 : }
319 0 : if (check_packet_size(bufs[i_b], unique_str_id) != 0){
320 0 : dump_packet = true;
321 : }
322 :
323 0 : ++stream_stats[unique_str_id].total_packets;
324 0 : ++stream_stats[unique_str_id].num_packets;
325 0 : stream_stats[unique_str_id].prev_seq_id = daq_hdr->seq_id;
326 :
327 0 : if (dump_packet && dumped_packet_count < max_packets_to_dump) {
328 0 : dumped_packet_count++;
329 :
330 0 : rte_pktmbuf_dump(stdout, bufs[i_b], bufs[i_b]->pkt_len);
331 : }
332 : }
333 :
334 0 : rte_pktmbuf_free_bulk(bufs, nb_rx);
335 : }
336 :
337 : return 0;
338 0 : }
339 :
340 : // Define the function to be called when ctrl-c (SIGINT) is sent to process
341 0 : void signal_callback_handler(int signum){
342 0 : fmt::print("Caught signal {}\n", signum);
343 0 : if (datafile.is_open()) {
344 0 : datafile.close();
345 : }
346 :
347 : // proc type on exit:
348 0 : auto proc_type = rte_eal_process_type();
349 0 : fmt::print("Proc type on exit is: {}\n", proc_type);
350 :
351 : // Terminate program
352 0 : std::exit(signum);
353 : }
354 :
355 0 : int main(int argc, char** argv){
356 0 : uint64_t time_per_report = 1;
357 0 : uint16_t iface = 0;
358 0 : std::string allow_dev = "";
359 :
360 0 : CLI::App app{"test frame receiver"};
361 0 : app.add_option("-s", expected_packet_size, "Expected frame size");
362 0 : app.add_option("-i", iface, "Interface to init");
363 0 : app.add_option("-t", time_per_report, "Time Per Report");
364 0 : app.add_option("-a", allow_dev, "device to allow");
365 :
366 0 : app.add_flag("--check-time", check_timestamp, "Report back differences in timestamp");
367 0 : app.add_flag("-p", per_stream_reports, "Detailed per stream reports");
368 0 : CLI11_PARSE(app, argc, argv);
369 :
370 : // define function to be called when ctrl+c is called.
371 0 : std::signal(SIGINT, signal_callback_handler);
372 :
373 0 : std::vector<std::string> eal_args;
374 0 : eal_args.push_back("dpdklibds_test_frame_receiver");
375 0 : eal_args.push_back("--proc-type=primary");
376 :
377 0 : if (!allow_dev.empty()) {
378 0 : eal_args.push_back(fmt::format("--allow={}",allow_dev));
379 : // Use the device id as file prefix to
380 0 : eal_args.push_back(fmt::format("--file-prefix={}", allow_dev));
381 : }
382 :
383 0 : TLOG() << "Status of RTE_ETH_DEV_FLOW_OPS_THREAD_SAFE=" << RTE_ETH_DEV_FLOW_OPS_THREAD_SAFE;
384 :
385 : // initialise eal with constructed argc and argv
386 0 : std::vector<char*> vec_argv = construct_argv(eal_args);
387 0 : char** constructed_argv = vec_argv.data();
388 0 : int constructed_argc = eal_args.size();
389 :
390 : // Check if primary process is alive
391 0 : auto alive = rte_eal_primary_proc_alive(nullptr);
392 0 : fmt::print("Is primary process running: {}\n", alive);
393 :
394 0 : int ret = 0;
395 : //if (!alive) {
396 0 : ret = rte_eal_init(constructed_argc, constructed_argv);
397 0 : if (ret < 0) {rte_exit(EXIT_FAILURE, "Error with EAL initialization\n");}
398 : //}
399 :
400 0 : auto n_ifaces = rte_eth_dev_count_avail();
401 0 : fmt::print("# of available ifaces: {}\n", n_ifaces);
402 0 : if (n_ifaces == 0){
403 0 : std::cout << "WARNING: no available ifaces. exiting...\n";
404 0 : rte_eal_cleanup();
405 0 : return 1;
406 : }
407 :
408 0 : auto proc_type = rte_eal_process_type();
409 0 : fmt::print("Proc type is: {}\n", proc_type);
410 :
411 : //std::this_thread::sleep_for(std::chrono::seconds(10));
412 :
413 : //return 0;
414 :
415 : // Allocate pools and mbufs per queue
416 0 : std::map<int, std::unique_ptr<rte_mempool>> mbuf_pools;
417 0 : std::map<int, struct rte_mbuf **> bufs;
418 0 : uint16_t n_rx_qs = 1;
419 0 : const uint16_t rx_ring_size = 1024;
420 0 : const uint16_t tx_ring_size = 1024;
421 :
422 0 : std::cout << "Allocating pools and mbufs.\n";
423 0 : for (size_t i=0; i<n_rx_qs; ++i) {
424 0 : std::stringstream ss;
425 0 : ss << "MBP-" << iface << "-" << i;
426 0 : fmt::print("Pool acquire: {}\n", ss.str());
427 0 : mbuf_pools[i] = ealutils::get_mempool(ss.str());
428 0 : bufs[i] = (rte_mbuf**) malloc(sizeof(struct rte_mbuf*) * burst_size);
429 0 : rte_pktmbuf_alloc_bulk(mbuf_pools[i].get(), bufs[i], burst_size);
430 0 : }
431 :
432 : // Setting up only one iface
433 0 : fmt::print("Initialize only iface {}!\n", iface);
434 0 : ealutils::iface_init(iface, n_rx_qs, 0, rx_ring_size, tx_ring_size, mbuf_pools); // just init iface, no TX queues
435 :
436 0 : for (unsigned i=0; i<10; ++i) {
437 0 : std::string rnd_ip("192.168.0.1");
438 0 : TLOG() << "Creating flow rule for src_ip=" << rnd_ip << " assigned to rxq=0";
439 0 : size_t ind = 0, current_ind = 0;
440 0 : std::vector<uint8_t> v;
441 0 : for (int i = 0; i < 4; ++i) {
442 0 : v.push_back(std::stoi(rnd_ip.substr(current_ind, rnd_ip.size() - current_ind), &ind));
443 0 : current_ind += ind + 1;
444 : }
445 0 : struct rte_flow_error error;
446 0 : auto *flow = generate_ipv4_flow(iface, 0,
447 0 : RTE_IPV4(v[0], v[1], v[2], v[3]), 0xffffffff, 0, 0, &error);
448 :
449 0 : std::this_thread::sleep_for(std::chrono::seconds(2));
450 0 : }
451 :
452 0 : std::this_thread::sleep_for(std::chrono::seconds(100));
453 :
454 0 : lcore_main(mbuf_pools[0].get(), iface, time_per_report);
455 :
456 : rte_eal_cleanup();
457 :
458 : return 0;
459 0 : }
|