LCOV - code coverage report
Current view: top level - dpdklibs/src/detail - IfaceWrapper.hxx (source / functions) Coverage Total Hit
Test: code.result Lines: 0.0 % 90 0
Test Date: 2025-12-21 13:07:08 Functions: 0.0 % 12 0

            Line data    Source code
       1              : #include <time.h>
       2              : #include <rte_arp.h>
       3              : #include <rte_ethdev.h>
       4              : #include "dpdklibs/udp/IPV4UDPPacket.hpp"
       5              : 
       6              : namespace dunedaq {
       7              : namespace dpdklibs {
       8              : 
       9              : int 
      10            0 : IfaceWrapper::rx_runner(void *arg __rte_unused) {
      11              : 
      12              :   // Timespec for opportunistic sleep. Nanoseconds configured in conf.
      13            0 :         struct timespec sleep_request = { 0, (long)m_lcore_sleep_ns };
      14              : 
      15              :   //bool once = true; // One shot action variable.
      16            0 :   uint16_t iface = m_iface_id;
      17              : 
      18            0 :   const uint16_t lid = rte_lcore_id();
      19            0 :   auto queues = m_rx_core_map[lid];
      20              : 
      21            0 :   if (rte_eth_dev_socket_id(iface) >= 0 && rte_eth_dev_socket_id(iface) != (int)rte_socket_id()) {
      22            0 :     TLOG() << "WARNING, iface " << iface << " is on remote NUMA node to polling thread! "
      23            0 :            << "Performance will not be optimal.";
      24              :   }
      25              : 
      26            0 :   TLOG() << "LCore RX runner on CPU[" << lid << "]: Main loop starts for iface " << iface << " !";
      27              : 
      28            0 :   std::map<int, int> nb_rx_map;
      29              :   // While loop of quit atomic member in IfaceWrapper
      30            0 :   while(!this->m_lcore_quit_signal.load()) {
      31              : 
      32              :     // Loop over assigned queues to process
      33            0 :     uint8_t fb_count(0);
      34            0 :     for (const auto& q : queues) {
      35            0 :       auto src_rx_q = q.first;
      36            0 :       auto* q_bufs = m_bufs[src_rx_q];
      37              : 
      38              :       // Get burst from queue
      39            0 :       const uint16_t nb_rx = rte_eth_rx_burst(iface, src_rx_q, q_bufs, m_burst_size);
      40            0 :       nb_rx_map[src_rx_q] = nb_rx;
      41              :     }
      42              : 
      43              : 
      44            0 :     for (const auto& q : queues) {
      45              : 
      46            0 :       auto src_rx_q = q.first;
      47            0 :       auto* q_bufs = m_bufs[src_rx_q];
      48            0 :       const uint16_t nb_rx = nb_rx_map[src_rx_q];
      49              :   
      50              :       // We got packets from burst on this queue
      51            0 :       if (nb_rx != 0) [[likely]] {
      52              : 
      53              :         // Update max burst size counter of this queue
      54            0 :         m_max_burst_size[src_rx_q] = std::max(nb_rx, m_max_burst_size[src_rx_q].load());
      55              : 
      56              :         // -------
      57              :               // Iterate on burst packets
      58            0 :         for (int i_b=0; i_b<nb_rx; ++i_b) {
      59              : 
      60              :           // Check if packet is segmented. Implement support for it if needed.
      61              :           //if (q_bufs[i_b]->nb_segs > 1) [[unlikely]] {
      62              :           //  TLOG_DEBUG(10) << "It appears a packet is spread across more than one receiving buffer;" 
      63              :           //                 << " there's currently no logic in this program to handle this";
      64              :           //}
      65              : 
      66              :           // Check packet type, decide their fate: ignore unexpected ones, FIXME: monitor occurrences
      67            0 :           auto pkt_type = q_bufs[i_b]->packet_type;
      68              :           // Handle non IPV4 frames.
      69            0 :           if (not RTE_ETH_IS_IPV4_HDR(pkt_type)) [[unlikely]] {
      70              :             //TLOG_DEBUG(10) << "Non-Ethernet packet type: " << (unsigned)pkt_type << " original: " << pkt_type;
      71            0 :             if (pkt_type == RTE_PTYPE_L2_ETHER_ARP) {
      72              :               //TLOG() << "Unexpected: Should handle an ARP request from lcore=" << lid << " rx_q=" << src_rx_q << "! Flow should be steered to dedicated RX Queue.";
      73            0 :             } else if (pkt_type == RTE_PTYPE_L2_ETHER_LLDP) {
      74              :               //TLOG_DEBUG(10) << "TODO: Handle LLDP packet!";
      75              :             } else {
      76              :               //TLOG_DEBUG(10) << "Unidentified! Dumping...";
      77              :               //rte_pktmbuf_dump(stdout, q_bufs[i_b], m_bufs[src_rx_q][i_b]->pkt_len);
      78              :             }
      79            0 :             ++m_num_unhandled_non_ipv4[lid];
      80            0 :             continue;
      81              :           }
      82              : 
      83              :           // Check if frame is non UDP: in that case, ignore it.
      84            0 :           if ((pkt_type & RTE_PTYPE_L4_MASK) != RTE_PTYPE_L4_UDP) [[unlikely]] {
      85            0 :             ++m_num_unhandled_non_udp[lid];
      86            0 :             continue; // ommit it
      87              :           }
      88              : 
      89              :           // Check for JUMBO frames (bigger than 1500 Bytes)
      90            0 :           if (q_bufs[i_b]->pkt_len > 1500) [[likely]] { // RS FIXME: do proper check on data length later
      91              : 
      92              :             // If flow enabled, handle the payload.
      93            0 :             if ( m_lcore_enable_flow.load() ) [[likely]] {
      94              :               // Get length of user payload. (Ethernet headers excluded.)
      95            0 :               struct udp::ipv4_udp_packet_hdr* udp_packet = rte_pktmbuf_mtod(q_bufs[i_b], struct udp::ipv4_udp_packet_hdr*);
      96            0 :               char* message = udp::get_udp_payload(q_bufs[i_b]);
      97            0 :               std::size_t udp_payload_len = udp::get_payload_size_udp_hdr(&udp_packet->udp_hdr);
      98              : 
      99            0 :               if ( m_strict_parsing ) { // all sources maintain DAQ protocol
     100            0 :                 parse_udp_payload(src_rx_q, message, udp_payload_len);
     101              :               } else { // avoid size checks and scattering
     102            0 :                 passthrough_udp_payload(src_rx_q, message, udp_payload_len);
     103              :               }
     104              :             }
     105              : 
     106              :             // Update metrics of queue: frame and Byte counters
     107            0 :             ++m_num_frames_rxq[src_rx_q];
     108            0 :             std::size_t data_len = q_bufs[i_b]->data_len;
     109            0 :             m_num_bytes_rxq[src_rx_q] += data_len;
     110              :           } else {
     111            0 :             ++m_num_unhandled_non_jumbo_udp[lid];
     112              :           }
     113              :         }
     114              : 
     115              :         // Bulk free of mbufs
     116            0 :         rte_pktmbuf_free_bulk(q_bufs, nb_rx);
     117              : 
     118              :       } // per burst
     119              :       // -----------
     120              : 
     121              :       // Full burst counter
     122            0 :       if (nb_rx == m_burst_size) {
     123            0 :         ++fb_count;
     124            0 :         ++m_num_full_bursts[src_rx_q];
     125              :       }
     126              :     } // per queue
     127              : 
     128              :     // If no full buffers in burst...
     129            0 :     if (!fb_count) {
     130            0 :       if (m_lcore_sleep_ns) {
     131              :         // Sleep n nanoseconds... (value from config, timespec initialized in lcore first lines)
     132            0 :         /*int response =*/ nanosleep(&sleep_request, nullptr);
     133              :       }
     134              :     }
     135              : 
     136              :   } // main while(quit) loop
     137              :  
     138            0 :   TLOG() << "LCore RX runner on CPU[" << lid << "] returned.";
     139            0 :   return 0;
     140            0 : }
     141              : 
     142              : 
     143              : int 
     144            0 : IfaceWrapper::arp_response_runner(void *arg __rte_unused) {
     145              : 
     146              :   // Timespec for opportunistic sleep. Nanoseconds configured in conf.
     147            0 :   struct timespec sleep_request = { 0, (long)900000 };
     148              : 
     149              :   //bool once = true; // One shot action variable.
     150            0 :   uint16_t iface = m_iface_id;
     151              : 
     152            0 :   const uint16_t lid = rte_lcore_id();
     153            0 :   unsigned arp_rx_queue = m_arp_rx_queue;
     154              : 
     155            0 :   TLOG() << "LCore ARP responder on CPU[" << lid << "]: Main loop starts for iface " << iface << " rx queue: " << arp_rx_queue;
     156              : 
     157              :   // While loop of quit atomic member in IfaceWrapper
     158            0 :   while(!this->m_lcore_quit_signal.load()) {
     159              : 
     160            0 :     const uint16_t nb_rx = rte_eth_rx_burst(iface, arp_rx_queue, m_arp_bufs[arp_rx_queue], m_burst_size);
     161              : 
     162              :     // We got packets from burst on this queue
     163            0 :     if (nb_rx != 0) {
     164              :       // Iterate on burst packets
     165            0 :       for (int i_b=0; i_b<nb_rx; ++i_b) {
     166              : 
     167              :         // Check packet type, ommit/drop unexpected ones.
     168            0 :         auto pkt_type = m_arp_bufs[arp_rx_queue][i_b]->packet_type;
     169              :         //// Handle non IPV4 packets
     170            0 :         if (not RTE_ETH_IS_IPV4_HDR(pkt_type)) {
     171              :           //TLOG_DEBUG(10) << "Non-Ethernet packet type: " << (unsigned)pkt_type << " original: " << pkt_type;
     172            0 :           if (pkt_type == RTE_PTYPE_L2_ETHER_ARP) {
     173            0 :             TLOG_DEBUG(10) << "Handling ARP request";
     174            0 :             struct rte_ether_hdr* eth_hdr = rte_pktmbuf_mtod(m_arp_bufs[arp_rx_queue][i_b], struct rte_ether_hdr *);
     175            0 :             struct rte_arp_hdr* arp_hdr = (struct rte_arp_hdr *)(eth_hdr + 1);
     176              : 
     177            0 :             std::string srcaddr = dunedaq::dpdklibs::udp::get_ipv4_decimal_addr_str(dunedaq::dpdklibs::udp::ip_address_binary_to_dotdecimal(rte_be_to_cpu_32(arp_hdr->arp_data.arp_sip)));
     178            0 :             TLOG_DEBUG(10) << "SRC IP: " << srcaddr;
     179            0 :             std::string dstaddr = dunedaq::dpdklibs::udp::get_ipv4_decimal_addr_str(dunedaq::dpdklibs::udp::ip_address_binary_to_dotdecimal(rte_be_to_cpu_32(arp_hdr->arp_data.arp_tip)));
     180            0 :             TLOG_DEBUG(10) << "DEST IP: " << dstaddr;
     181              : 
     182            0 :             for( const auto& ip_addr_bin : m_ip_addr_bin) {
     183            0 :               std::string localaddr = dunedaq::dpdklibs::udp::get_ipv4_decimal_addr_str(dunedaq::dpdklibs::udp::ip_address_binary_to_dotdecimal(rte_be_to_cpu_32(ip_addr_bin)));
     184            0 :               TLOG_DEBUG(10) << "LOCAL IP: " << localaddr;
     185            0 :             }
     186              : 
     187              : 
     188            0 :             if (std::find(m_ip_addr_bin.begin(), m_ip_addr_bin.end(), arp_hdr->arp_data.arp_tip) != m_ip_addr_bin.end()) {
     189            0 :               arp::pktgen_process_arp(m_arp_bufs[arp_rx_queue][i_b], m_iface_id, arp_hdr->arp_data.arp_tip);
     190              :             } else {
     191            0 :               TLOG_DEBUG(10) << "I'm not the ARP target";
     192              :             }
     193            0 :           } else if (pkt_type == RTE_PTYPE_L2_ETHER_LLDP) {
     194              :             //TLOG_DEBUG(10) << "TODO: Handle LLDP packet!";
     195              :           } else {
     196              :             //TLOG_DEBUG(10) << "Unidentified! Dumping...";
     197              :             //rte_pktmbuf_dump(stdout, m_arp_bufs[arp_rx_queue][i_b], m_bufs[src_rx_q][i_b]->pkt_len);
     198              :           }
     199            0 :           continue;
     200            0 :         }
     201              :       }
     202              : 
     203              :       // Bulk free of mbufs
     204            0 :       rte_pktmbuf_free_bulk(m_arp_bufs[arp_rx_queue], nb_rx);
     205              :       
     206              :     } // per burst
     207              : 
     208              :     // If no full buffers in burst...
     209            0 :     if (m_lcore_sleep_ns) {
     210              :       // Sleep n nanoseconds... (value from config, timespec initialized in lcore first lines)
     211            0 :       /*int response =*/ nanosleep(&sleep_request, nullptr);
     212              :     }
     213              : 
     214              :   } // main while(quit) loop
     215              :  
     216            0 :   TLOG() << "LCore ARP responder on CPU[" << lid << "] returned.";
     217            0 :   return 0;
     218              : }
     219              : 
     220              : } // namespace dpdklibs
     221              : } // namespace dunedaq
        

Generated by: LCOV version 2.0-1