10IfaceWrapper::rx_runner(
void *arg __rte_unused) {
13 struct timespec sleep_request = { 0, (long)m_lcore_sleep_ns };
16 uint16_t
iface = m_iface_id;
18 const uint16_t lid = rte_lcore_id();
19 auto queues = m_rx_core_map[lid];
21 if (rte_eth_dev_socket_id(iface) >= 0 && rte_eth_dev_socket_id(iface) != (
int)rte_socket_id()) {
22 TLOG() <<
"WARNING, iface " <<
iface <<
" is on remote NUMA node to polling thread! "
23 <<
"Performance will not be optimal.";
26 TLOG() <<
"LCore RX runner on CPU[" << lid <<
"]: Main loop starts for iface " <<
iface <<
" !";
28 std::map<int, int> nb_rx_map;
30 while(!this->m_lcore_quit_signal.load()) {
34 for (
const auto& q : queues) {
35 auto src_rx_q = q.first;
36 auto* q_bufs = m_bufs[src_rx_q];
39 const uint16_t nb_rx = rte_eth_rx_burst(iface, src_rx_q, q_bufs, m_burst_size);
40 nb_rx_map[src_rx_q] = nb_rx;
44 for (
const auto& q : queues) {
46 auto src_rx_q = q.first;
47 auto* q_bufs = m_bufs[src_rx_q];
48 const uint16_t nb_rx = nb_rx_map[src_rx_q];
51 if (nb_rx != 0) [[likely]] {
54 m_max_burst_size[src_rx_q] = std::max(nb_rx, m_max_burst_size[src_rx_q].load());
58 for (
int i_b=0; i_b<nb_rx; ++i_b) {
67 auto pkt_type = q_bufs[i_b]->packet_type;
69 if (not RTE_ETH_IS_IPV4_HDR(pkt_type)) [[unlikely]] {
71 if (pkt_type == RTE_PTYPE_L2_ETHER_ARP) {
73 }
else if (pkt_type == RTE_PTYPE_L2_ETHER_LLDP) {
79 ++m_num_unhandled_non_ipv4[lid];
84 if ((pkt_type & RTE_PTYPE_L4_MASK) != RTE_PTYPE_L4_UDP) [[unlikely]] {
85 ++m_num_unhandled_non_udp[lid];
90 if (q_bufs[i_b]->pkt_len > 1500) [[likely]] {
93 if ( m_lcore_enable_flow.load() ) [[likely]] {
95 struct udp::ipv4_udp_packet_hdr* udp_packet = rte_pktmbuf_mtod(q_bufs[i_b],
struct udp::ipv4_udp_packet_hdr*);
99 if ( m_strict_parsing ) {
100 parse_udp_payload(src_rx_q, message, udp_payload_len);
102 passthrough_udp_payload(src_rx_q, message, udp_payload_len);
107 ++m_num_frames_rxq[src_rx_q];
108 std::size_t data_len = q_bufs[i_b]->data_len;
109 m_num_bytes_rxq[src_rx_q] += data_len;
111 ++m_num_unhandled_non_jumbo_udp[lid];
116 rte_pktmbuf_free_bulk(q_bufs, nb_rx);
122 if (nb_rx == m_burst_size) {
124 ++m_num_full_bursts[src_rx_q];
130 if (m_lcore_sleep_ns) {
132 nanosleep(&sleep_request,
nullptr);
138 TLOG() <<
"LCore RX runner on CPU[" << lid <<
"] returned.";
144IfaceWrapper::arp_response_runner(
void *arg __rte_unused) {
147 struct timespec sleep_request = { 0, (long)900000 };
150 uint16_t
iface = m_iface_id;
152 const uint16_t lid = rte_lcore_id();
153 unsigned arp_rx_queue = m_arp_rx_queue;
155 TLOG() <<
"LCore ARP responder on CPU[" << lid <<
"]: Main loop starts for iface " <<
iface <<
" rx queue: " << arp_rx_queue;
158 while(!this->m_lcore_quit_signal.load()) {
160 const uint16_t nb_rx = rte_eth_rx_burst(iface, arp_rx_queue, m_arp_bufs[arp_rx_queue], m_burst_size);
165 for (
int i_b=0; i_b<nb_rx; ++i_b) {
168 auto pkt_type = m_arp_bufs[arp_rx_queue][i_b]->packet_type;
170 if (not RTE_ETH_IS_IPV4_HDR(pkt_type)) {
172 if (pkt_type == RTE_PTYPE_L2_ETHER_ARP) {
174 struct rte_ether_hdr* eth_hdr = rte_pktmbuf_mtod(m_arp_bufs[arp_rx_queue][i_b],
struct rte_ether_hdr *);
175 struct rte_arp_hdr* arp_hdr = (
struct rte_arp_hdr *)(eth_hdr + 1);
182 for(
const auto& ip_addr_bin : m_ip_addr_bin) {
188 if (std::find(m_ip_addr_bin.begin(), m_ip_addr_bin.end(), arp_hdr->arp_data.arp_tip) != m_ip_addr_bin.end()) {
193 }
else if (pkt_type == RTE_PTYPE_L2_ETHER_LLDP) {
204 rte_pktmbuf_free_bulk(m_arp_bufs[arp_rx_queue], nb_rx);
209 if (m_lcore_sleep_ns) {
211 nanosleep(&sleep_request,
nullptr);
216 TLOG() <<
"LCore ARP responder on CPU[" << lid <<
"] returned.";
#define TLOG_DEBUG(lvl,...)
void pktgen_process_arp(struct rte_mbuf *m, uint32_t pid, rte_be32_t binary_ip_address)
char * get_udp_payload(const rte_mbuf *mbuf)
std::string get_ipv4_decimal_addr_str(struct ipaddr ipv4_address)
struct ipaddr ip_address_binary_to_dotdecimal(rte_le32_t binary_ipv4_address)
std::uint16_t get_payload_size_udp_hdr(struct rte_udp_hdr *udp_hdr)