9IfaceWrapper::rx_runner(
void *arg __rte_unused) {
12 struct timespec sleep_request = { 0, (long)m_lcore_sleep_ns };
15 uint16_t
iface = m_iface_id;
17 const uint16_t lid = rte_lcore_id();
18 auto queues = m_rx_core_map[lid];
20 if (rte_eth_dev_socket_id(iface) >= 0 && rte_eth_dev_socket_id(iface) != (
int)rte_socket_id()) {
21 TLOG() <<
"WARNING, iface " <<
iface <<
" is on remote NUMA node to polling thread! "
22 <<
"Performance will not be optimal.";
25 TLOG() <<
"LCore RX runner on CPU[" << lid <<
"]: Main loop starts for iface " <<
iface <<
" !";
27 std::map<int, int> nb_rx_map;
29 while(!this->m_lcore_quit_signal.load()) {
33 for (
const auto& q : queues) {
34 auto src_rx_q = q.first;
35 auto* q_bufs = m_bufs[src_rx_q];
38 const uint16_t nb_rx = rte_eth_rx_burst(iface, src_rx_q, q_bufs, m_burst_size);
39 nb_rx_map[src_rx_q] = nb_rx;
43 for (
const auto& q : queues) {
45 auto src_rx_q = q.first;
46 auto* q_bufs = m_bufs[src_rx_q];
47 const uint16_t nb_rx = nb_rx_map[src_rx_q];
50 if (nb_rx != 0) [[likely]] {
53 m_max_burst_size[src_rx_q] = std::max(nb_rx, m_max_burst_size[src_rx_q].load());
57 for (
int i_b=0; i_b<nb_rx; ++i_b) {
66 auto pkt_type = q_bufs[i_b]->packet_type;
68 if (not RTE_ETH_IS_IPV4_HDR(pkt_type)) [[unlikely]] {
70 if (pkt_type == RTE_PTYPE_L2_ETHER_ARP) {
72 }
else if (pkt_type == RTE_PTYPE_L2_ETHER_LLDP) {
78 ++m_num_unhandled_non_ipv4[lid];
83 if ((pkt_type & RTE_PTYPE_L4_MASK) != RTE_PTYPE_L4_UDP) [[unlikely]] {
84 ++m_num_unhandled_non_udp[lid];
89 if (q_bufs[i_b]->pkt_len > 1500) [[likely]] {
92 std::size_t data_len = q_bufs[i_b]->data_len;
95 if ( m_lcore_enable_flow.load() ) [[likely]] {
97 handle_eth_payload(src_rx_q, message, data_len);
101 ++m_num_frames_rxq[src_rx_q];
102 m_num_bytes_rxq[src_rx_q] += data_len;
104 ++m_num_unhandled_non_jumbo_udp[lid];
109 rte_pktmbuf_free_bulk(q_bufs, nb_rx);
115 if (nb_rx == m_burst_size) {
117 ++m_num_full_bursts[src_rx_q];
123 if (m_lcore_sleep_ns) {
125 nanosleep(&sleep_request,
nullptr);
131 TLOG() <<
"LCore RX runner on CPU[" << lid <<
"] returned.";
137IfaceWrapper::arp_response_runner(
void *arg __rte_unused) {
140 struct timespec sleep_request = { 0, (long)900000 };
143 uint16_t
iface = m_iface_id;
145 const uint16_t lid = rte_lcore_id();
146 unsigned arp_rx_queue = m_arp_rx_queue;
148 TLOG() <<
"LCore ARP responder on CPU[" << lid <<
"]: Main loop starts for iface " <<
iface <<
" rx queue: " << arp_rx_queue;
151 while(!this->m_lcore_quit_signal.load()) {
153 const uint16_t nb_rx = rte_eth_rx_burst(iface, arp_rx_queue, m_arp_bufs[arp_rx_queue], m_burst_size);
158 for (
int i_b=0; i_b<nb_rx; ++i_b) {
161 auto pkt_type = m_arp_bufs[arp_rx_queue][i_b]->packet_type;
163 if (not RTE_ETH_IS_IPV4_HDR(pkt_type)) {
165 if (pkt_type == RTE_PTYPE_L2_ETHER_ARP) {
167 struct rte_ether_hdr* eth_hdr = rte_pktmbuf_mtod(m_arp_bufs[arp_rx_queue][i_b],
struct rte_ether_hdr *);
168 struct rte_arp_hdr* arp_hdr = (
struct rte_arp_hdr *)(eth_hdr + 1);
175 for(
const auto& ip_addr_bin : m_ip_addr_bin) {
181 if (std::find(m_ip_addr_bin.begin(), m_ip_addr_bin.end(), arp_hdr->arp_data.arp_tip) != m_ip_addr_bin.end()) {
186 }
else if (pkt_type == RTE_PTYPE_L2_ETHER_LLDP) {
197 rte_pktmbuf_free_bulk(m_arp_bufs[arp_rx_queue], nb_rx);
202 if (m_lcore_sleep_ns) {
204 nanosleep(&sleep_request,
nullptr);
209 TLOG() <<
"LCore ARP responder on CPU[" << lid <<
"] returned.";
#define TLOG_DEBUG(lvl,...)
void pktgen_process_arp(struct rte_mbuf *m, uint32_t pid, rte_be32_t binary_ip_address)
char * get_udp_payload(const rte_mbuf *mbuf)
std::string get_ipv4_decimal_addr_str(struct ipaddr ipv4_address)
struct ipaddr ip_address_binary_to_dotdecimal(rte_le32_t binary_ipv4_address)