LCOV - code coverage report
Current view: top level - dpdklibs/test/apps - test_transmit_and_receive.cxx (source / functions) Coverage Total Hit
Test: code.result Lines: 0.0 % 146 0
Test Date: 2025-12-21 13:07:08 Functions: 0.0 % 17 0

            Line data    Source code
       1              : /**
       2              :  *
       3              :  * @file test_transmit_and_receive.cxx
       4              :  *
       5              :  * This is a standalone program which will transmit ethernet packets
       6              :  * out on one dpdk-enabled network interface port and receive them on
       7              :  * another. Obviously you actually need two such ports for this to
       8              :  * work.
       9              :  *
      10              :  *
      11              :  * This is part of the DUNE DAQ Application Framework, copyright 2020.
      12              :  * Licensing/copyright details are in the COPYING file that you should have
      13              :  * received with this code.
      14              :  */
      15              : 
      16              : #include "dpdklibs/EALSetup.hpp"
      17              : #include "dpdklibs/udp/PacketCtor.hpp"
      18              : #include "dpdklibs/udp/Utils.hpp"
      19              : 
      20              : #include "detdataformats/DAQEthHeader.hpp"
      21              : #include "logging/Logging.hpp"
      22              : 
      23              : #include "rte_common.h"
      24              : #include "rte_eal.h"
      25              : #include "rte_ethdev.h"
      26              : 
      27              : #include "CLI/App.hpp"
      28              : #include "CLI/Config.hpp"
      29              : #include "CLI/Formatter.hpp"
      30              : 
      31              : #include <limits>
      32              : #include <mutex>
      33              : 
      34              : using namespace dunedaq::dpdklibs;
      35              : using dunedaq::detdataformats::DAQEthHeader;
      36              : 
      37              : namespace {
      38              : 
      39              :   constexpr int burst_size = 256;
      40              : 
      41              :   std::mutex lcore_thread_mutex;
      42              : 
      43              :   int payload_bytes = 9000;
      44              : 
      45              :   std::atomic<long> num_sent_bursts = 0;
      46              :   std::atomic<long> num_sent_packets = 0;
      47              :   std::atomic<long> num_received_packets = 0;
      48              :   std::atomic<long> num_sent_bytes = 0;
      49              :   std::atomic<long> num_received_bytes = 0;
      50              :   std::atomic<long> num_bad_packets = 0;
      51              : 
      52              : }; // namespace
      53              : 
      54              : struct lcore_thread_arg {
      55              :   rte_mempool* pool;
      56              :   int iface_id;
      57              :   bool is_receiver;
      58              : };
      59              : 
      60            0 : inline bool operator!=(const DAQEthHeader& header1, const DAQEthHeader& header2) {
      61            0 :   return (header1.version != header2.version ||
      62            0 :           header1.det_id != header2.det_id ||
      63            0 :           header1.crate_id != header2.crate_id ||
      64            0 :           header1.slot_id != header2.slot_id ||
      65            0 :           header1.stream_id != header2.stream_id ||
      66            0 :           header1.seq_id != header2.seq_id ||
      67            0 :           header1.block_length != header2.block_length ||
      68            0 :           header1.timestamp != header2.timestamp);
      69              : }
      70              : 
      71              : 
      72            0 : int lcore_main(void* arg) {
      73              :   
      74            0 :   uint16_t lid = rte_lcore_id();
      75              :   
      76              :   // 0: main thread (doesn't use this function), 1: receiver thread, 2: transmitter thread
      77            0 :   if (lid == 0 || lid >= 3) {
      78              :     return 0;
      79              :   }
      80              : 
      81            0 :   TLOG() << "From lcore_main, lid == " << lid << ", rte_socket_id() == " << rte_socket_id();
      82              :   
      83            0 :   lcore_thread_arg info;
      84            0 :   const auto ptr_to_vector_of_thread_configs = static_cast<std::vector<lcore_thread_arg>*>(arg);
      85              :   
      86            0 :   {
      87            0 :     std::lock_guard<std::mutex> guard(lcore_thread_mutex);
      88            0 :     info = ptr_to_vector_of_thread_configs->at(lid - 1);
      89            0 :   }
      90            0 :   TLOG() << "In lcore_main, thread #" << lid << ", iface_id == " << info.iface_id << ", is_receiver == " << info.is_receiver;
      91              :   
      92            0 :   if (rte_eth_dev_socket_id(info.iface_id) >= 0 && rte_eth_dev_socket_id(info.iface_id) != static_cast<int>(rte_socket_id())) {
      93            0 :     TLOG(TLVL_WARNING) << "WARNING, dpdk interface " << info.iface_id << " is on remote NUMA node to polling thread.\nrte_eth_dev_socket_id(" << info.iface_id << ") == " << rte_eth_dev_socket_id(info.iface_id) << ", rte_socket_id() == " << static_cast<int>(rte_socket_id()) << "\nPerformance will not be optimal.\n";
      94              :   }
      95              :   
      96            0 :   auto bufs = static_cast<rte_mbuf**>( malloc(sizeof(struct rte_mbuf*) * burst_size) );
      97              :   
      98            0 :   if (bufs == NULL) {
      99            0 :     TLOG(TLVL_ERROR) << "Unable to allocate memory for buffers; returning from thread...";
     100            0 :     return 1;
     101              :   }
     102              : 
     103            0 :   int retval = 0;
     104            0 :   retval = rte_pktmbuf_alloc_bulk(info.pool, bufs, burst_size);
     105              : 
     106            0 :   if (retval != 0) {
     107            0 :     std::stringstream errstr;
     108            0 :     errstr << "A failure occurred calling rte_pktmbuf_alloc_bulk (" << strerror(abs(retval)) << "); returning from thread...";
     109            0 :     return 2;
     110            0 :   }
     111              : 
     112            0 :   if (info.is_receiver) {
     113              : 
     114            0 :     uint16_t nb_rx = 0;
     115            0 :     int bytes_in_burst = 0;
     116            0 :     int bad_packets_in_burst = 0;
     117              :     
     118              :     // Here, I'm taking advantage of the fact that the transmitting
     119              :     // thread calls construct_packets_for_burst, which itself fills
     120              :     // the DAQEthHeader structure inside the packet it sends using
     121              :     // "set_daqethheader_test_values". I'll compare the received
     122              :     // packet's DAQEthHeader to this one and make sure they're equal
     123              :     
     124            0 :     DAQEthHeader daqethheader_reference;
     125            0 :     udp::set_daqethheader_test_values(daqethheader_reference);
     126            0 :     DAQEthHeader* daqethheader_ptr = nullptr;
     127              :     
     128            0 :     while (true) {
     129              : 
     130            0 :       bytes_in_burst = 0; // Quicker to add up the bytes in the packets in a local variables, then attach the total to the atomic variable used in stats reporting
     131            0 :       bad_packets_in_burst = 0; // Same thinking
     132              :       
     133            0 :       nb_rx = rte_eth_rx_burst(info.iface_id, 0, bufs, burst_size);
     134            0 :       num_received_packets += nb_rx;
     135              : 
     136            0 :       for (int i_p = 0; i_p < nb_rx; ++i_p) {
     137            0 :         bytes_in_burst += bufs[i_p]->pkt_len;
     138              : 
     139              :         // DAQEthHeader (should be) the first thing after the ethernet + IPv4 + UDP headers
     140            0 :         daqethheader_ptr = rte_pktmbuf_mtod_offset(bufs[0], DAQEthHeader*, sizeof(udp::ipv4_udp_packet_hdr));
     141            0 :         if (*daqethheader_ptr != daqethheader_reference) {
     142            0 :           bad_packets_in_burst++;
     143              :         } 
     144              :       }
     145              : 
     146            0 :       num_received_bytes += bytes_in_burst;
     147            0 :       num_bad_packets += bad_packets_in_burst;
     148              : 
     149              :       // Free any unsent packets
     150            0 :       if (unlikely(nb_rx < burst_size)) {
     151            0 :         rte_pktmbuf_free_bulk(bufs, burst_size - nb_rx);
     152              :       }
     153              :     }
     154              :     
     155              :   } else {
     156              : 
     157            0 :     rte_ether_addr dst_mac_addr_struct;
     158              :     
     159              :     // Notice that in the snippet of code below, the assumption is that
     160              :     // the first entry in the vector of thread configurations
     161              :     // corresponds to the receiving thread
     162            0 :     uint16_t receiver_iface = std::numeric_limits<uint16_t>::max();
     163            0 :     {
     164            0 :       std::lock_guard<std::mutex> guard(lcore_thread_mutex);
     165            0 :       receiver_iface = ptr_to_vector_of_thread_configs->at(0).iface_id;
     166            0 :     }
     167              : 
     168            0 :     retval = rte_eth_macaddr_get(receiver_iface, &dst_mac_addr_struct);
     169              :     
     170            0 :     if (retval == 0) {
     171            0 :       TLOG() << "Will send to ethernet interface with MAC address " << ealutils::get_mac_addr_str(dst_mac_addr_struct);
     172              :     } else {
     173            0 :       TLOG() << "Problem trying to obtain the MAC address of the destination port; returning...";
     174            0 :       return 3;
     175              :     }
     176              :   
     177            0 :     const std::string dst_mac_addr = ealutils::get_mac_addr_str(dst_mac_addr_struct); 
     178            0 :     udp::construct_packets_for_burst(info.iface_id, dst_mac_addr, payload_bytes, burst_size, bufs);
     179              : 
     180            0 :     TLOG() << "Dump of the first packet header: ";
     181            0 :     TLOG() << udp::get_udp_header_str(bufs[0]);
     182              : 
     183            0 :     TLOG() << "Dump of the first packet rte_mbuf object: ";
     184            0 :     TLOG() << udp::get_rte_mbuf_str(bufs[0]);
     185              :     
     186            0 :     rte_mbuf_sanity_check(bufs[0], 1);
     187              :     
     188            0 :     uint16_t nb_tx = 0;
     189              : 
     190            0 :     while (true) {
     191              : 
     192            0 :       int queue_id = 0; 
     193              :       //TLOG() << "Just about to call rte_eth_tx_burst with arguments info.iface_id == " << info.iface_id << ", second argument " << queue_id << ", bufs at " << bufs << " and burst size " << burst_size;
     194            0 :       nb_tx = rte_eth_tx_burst(info.iface_id, queue_id, bufs, burst_size);
     195            0 :       num_sent_packets += nb_tx;
     196            0 :       num_sent_bytes += nb_tx * bufs[0]->pkt_len;
     197            0 :       num_sent_bursts++;
     198              :       //TLOG() << "Sent " << nb_tx;
     199              :       
     200              :       // Free any unsent packets
     201            0 :       if (unlikely(nb_tx < burst_size)) {
     202            0 :         rte_pktmbuf_free_bulk(bufs, burst_size - nb_tx);
     203              :       }
     204              :     }
     205            0 :   }
     206              :   return 0;
     207              : }
     208              : 
     209            0 : int main(int argc, char** argv) {
     210              : 
     211            0 :   CLI::App app{"test transmit and receive"};
     212              : 
     213            0 :   std::stringstream payload_desc;
     214            0 :   payload_desc << "Bytes of payload past the DAQEthHeader header (default " << payload_bytes << " bytes)";
     215            0 :   app.add_option("--payload", payload_bytes, payload_desc.str());
     216              : 
     217            0 :   CLI11_PARSE(app, argc, argv);
     218              :     
     219            0 :   argc = 1; // So rte_eal_init doesn't interpret what gets passed to the command line
     220              : 
     221            0 :   int retval = rte_eal_init(argc, argv);
     222              : 
     223            0 :   if (retval < 0) {
     224            0 :     rte_exit(EXIT_FAILURE, "ERROR: EAL initialization failed, info: %s\n", strerror(abs(retval)));
     225              :   }
     226              : 
     227            0 :   int nb_ports = rte_eth_dev_count_avail();
     228            0 :   TLOG() << "There are " << nb_ports << " ethernet ports available out of a total of " << rte_eth_dev_count_total();
     229              :   
     230            0 :   if (nb_ports < 2) {
     231            0 :     rte_exit(EXIT_FAILURE, "ERROR: fewer than 2 ethernet ports are available.\nYou need one port for transmitting packets and one port for receiving them\n");
     232              :   }
     233              : 
     234            0 :   auto stats = std::thread([&]() {
     235            0 :                              long sleep_time = 1;
     236            0 :                              while (true) {
     237            0 :                                TLOG() << "\nRates: \n" << num_sent_bursts / static_cast<double>(sleep_time) << " sent bursts/s\n" << num_sent_packets / static_cast<double>(sleep_time) << " sent packets/s\n" << num_received_packets / static_cast<double>(sleep_time) << " received packets/s\n" << num_bad_packets / static_cast<double>(sleep_time) << " bad packets/s\n"<< num_sent_bytes / static_cast<double>(sleep_time) << " sent bytes/s\n" << num_received_bytes / static_cast<double>(sleep_time) << " received bytes/s\n";
     238            0 :                                num_sent_packets.exchange(0);
     239            0 :                                num_sent_bytes.exchange(0);
     240            0 :                                num_sent_bursts.exchange(0);
     241            0 :                                num_received_packets.exchange(0);
     242            0 :                                num_received_bytes.exchange(0);
     243            0 :                                num_bad_packets.exchange(0);
     244            0 :                                std::this_thread::sleep_for(std::chrono::seconds(sleep_time));
     245            0 :                              }
     246            0 :                            });
     247              : 
     248              : 
     249            0 :   uint16_t portid = std::numeric_limits<uint16_t>::max();
     250            0 :   int n_rx_qs = 0;
     251            0 :   int n_tx_qs = 0;
     252            0 :   uint16_t rx_ring_size = 1024;
     253            0 :   uint16_t tx_ring_size = 1024;
     254            0 :   int port_cntr = -1;
     255            0 :   std::map<int, std::unique_ptr<rte_mempool>> mbuf_pools;
     256            0 :   std::vector<lcore_thread_arg> lcore_thread_args;
     257              :   
     258            0 :   RTE_ETH_FOREACH_DEV(portid) { // RTE_ETH_FOREACH_DEV since no guarantee dpdk counts port IDs as 0, 1, 2 ...
     259              : 
     260            0 :     port_cntr++; // Initialize at -1, start at 0
     261              : 
     262              :     // Right now (May-2-2023) iface_init's implementation explicitly
     263              :     // assumes that the pools corresponding to <N> receiver queues
     264              :     // correspond to keys 0...N-1 in a map whose values are
     265              :     // unique_ptrs to pools
     266              :     
     267            0 :     std::stringstream poolname;
     268            0 :     poolname << "MBP-" << portid;
     269            0 :     mbuf_pools[port_cntr] = ealutils::get_mempool(poolname.str());
     270              : 
     271            0 :     bool is_receiver = false;
     272              :     
     273            0 :     if (port_cntr == 0) {
     274              :       n_tx_qs = 0;
     275              :       n_rx_qs = 1;
     276              :       is_receiver = true;
     277            0 :     } else if (port_cntr == 1) {
     278            0 :       n_tx_qs = 1;
     279            0 :       n_rx_qs = 0;
     280            0 :       is_receiver = false;
     281              :     }
     282              :     
     283            0 :     retval = ealutils::iface_init(portid, n_rx_qs, n_tx_qs, rx_ring_size, tx_ring_size, mbuf_pools);
     284              :     
     285            0 :     if (retval != 0) {
     286            0 :       rte_eal_cleanup();
     287              :       
     288            0 :       std::stringstream errstr;
     289            0 :       errstr << "A failure occurred initializing ethernet interface #" << portid << " (" << strerror(abs(retval)) << "); exiting...";
     290            0 :       rte_exit(EXIT_FAILURE, errstr.str().c_str());
     291            0 :     }
     292              : 
     293            0 :     auto pool = mbuf_pools[port_cntr].release();
     294            0 :     mbuf_pools.erase( mbuf_pools.find(port_cntr) );
     295              :     
     296            0 :     lcore_thread_args.emplace_back( lcore_thread_arg { pool, portid, is_receiver } );
     297            0 :   }
     298              : 
     299            0 :   rte_eal_mp_remote_launch(lcore_main, &lcore_thread_args, SKIP_MAIN);
     300              : 
     301            0 :   rte_eal_mp_wait_lcore();
     302            0 :   rte_eal_cleanup();
     303              :   
     304            0 :   int mynum = 55;
     305            0 :   TLOG() << "Things are still OK at this line, and here's a number: " << mynum;
     306              : 
     307            0 :   return 0;
     308            0 : }
        

Generated by: LCOV version 2.0-1