8IfaceWrapper::rx_runner(
void *arg __rte_unused) {
11 struct timespec sleep_request = { 0, (long)m_lcore_sleep_ns };
14 uint16_t
iface = m_iface_id;
16 const uint16_t lid = rte_lcore_id();
17 auto queues = m_rx_core_map[lid];
19 if (rte_eth_dev_socket_id(iface) >= 0 && rte_eth_dev_socket_id(iface) != (
int)rte_socket_id()) {
20 TLOG() <<
"WARNING, iface " <<
iface <<
" is on remote NUMA node to polling thread! "
21 <<
"Performance will not be optimal.";
24 TLOG() <<
"LCore RX runner on CPU[" << lid <<
"]: Main loop starts for iface " <<
iface <<
" !";
26 std::map<int, int> nb_rx_map;
28 while(!this->m_lcore_quit_signal.load()) {
32 for (
const auto& q : queues) {
33 auto src_rx_q = q.first;
34 auto* q_bufs = m_bufs[src_rx_q];
37 const uint16_t nb_rx = rte_eth_rx_burst(iface, src_rx_q, q_bufs, m_burst_size);
38 nb_rx_map[src_rx_q] = nb_rx;
42 for (
const auto& q : queues) {
44 auto src_rx_q = q.first;
45 auto* q_bufs = m_bufs[src_rx_q];
46 const uint16_t nb_rx = nb_rx_map[src_rx_q];
49 if (nb_rx != 0) [[likely]] {
51 m_max_burst_size[src_rx_q] = std::max(nb_rx, m_max_burst_size[src_rx_q].load());
54 for (
int i_b=0; i_b<nb_rx; ++i_b) {
66 if (not RTE_ETH_IS_IPV4_HDR(pkt_type)) [[unlikely]] {
67 //TLOG_DEBUG(10) << "Non-Ethernet packet type: " << (unsigned)pkt_type << " original: " << pkt_type;
68 if (pkt_type == RTE_PTYPE_L2_ETHER_ARP) {
69 //TLOG() << "Unexpected: Should handle an ARP request from lcore=" << lid << " rx_q=" << src_rx_q << "! Flow should be steered to dedicated RX Queue.";
70 } else if (pkt_type == RTE_PTYPE_L2_ETHER_LLDP) {
71 //TLOG_DEBUG(10) << "TODO: Handle LLDP packet!";
73 //TLOG_DEBUG(10) << "Unidentified! Dumping...";
74 //rte_pktmbuf_dump(stdout, q_bufs[i_b], m_bufs[src_rx_q][i_b]->pkt_len);
83 if (q_bufs[i_b]->pkt_len > 7000) [[likely]] {
85 std::size_t data_len = q_bufs[i_b]->data_len;
87 if ( m_lcore_enable_flow.load() ) [[likely]] {
89 handle_eth_payload(src_rx_q, message, data_len);
91 ++m_num_frames_rxq[src_rx_q];
92 m_num_bytes_rxq[src_rx_q] += data_len;
97 rte_pktmbuf_free_bulk(q_bufs, nb_rx);
104 if (nb_rx == m_burst_size) {
106 ++m_num_full_bursts[src_rx_q];
112 if (m_lcore_sleep_ns) {
114 nanosleep(&sleep_request,
nullptr);
120 TLOG() <<
"LCore RX runner on CPU[" << lid <<
"] returned.";
126IfaceWrapper::arp_response_runner(
void *arg __rte_unused) {
129 struct timespec sleep_request = { 0, (long)900000 };
132 uint16_t
iface = m_iface_id;
134 const uint16_t lid = rte_lcore_id();
135 unsigned arp_rx_queue = m_arp_rx_queue;
137 TLOG() <<
"LCore ARP responder on CPU[" << lid <<
"]: Main loop starts for iface " <<
iface <<
" rx queue: " << arp_rx_queue;
140 while(!this->m_lcore_quit_signal.load()) {
142 const uint16_t nb_rx = rte_eth_rx_burst(iface, arp_rx_queue, m_arp_bufs[arp_rx_queue], m_burst_size);
147 for (
int i_b=0; i_b<nb_rx; ++i_b) {
150 auto pkt_type = m_arp_bufs[arp_rx_queue][i_b]->packet_type;
152 if (not RTE_ETH_IS_IPV4_HDR(pkt_type)) {
154 if (pkt_type == RTE_PTYPE_L2_ETHER_ARP) {
156 struct rte_ether_hdr* eth_hdr = rte_pktmbuf_mtod(m_arp_bufs[arp_rx_queue][i_b],
struct rte_ether_hdr *);
157 struct rte_arp_hdr* arp_hdr = (
struct rte_arp_hdr *)(eth_hdr + 1);
164 for(
const auto& ip_addr_bin : m_ip_addr_bin) {
170 if (std::find(m_ip_addr_bin.begin(), m_ip_addr_bin.end(), arp_hdr->arp_data.arp_tip) != m_ip_addr_bin.end()) {
175 }
else if (pkt_type == RTE_PTYPE_L2_ETHER_LLDP) {
186 rte_pktmbuf_free_bulk(m_arp_bufs[arp_rx_queue], nb_rx);
191 if (m_lcore_sleep_ns) {
193 nanosleep(&sleep_request,
nullptr);
198 TLOG() <<
"LCore ARP responder on CPU[" << lid <<
"] returned.";
#define TLOG_DEBUG(lvl,...)
void pktgen_process_arp(struct rte_mbuf *m, uint32_t pid, rte_be32_t binary_ip_address)
char * get_udp_payload(const rte_mbuf *mbuf)
std::string get_ipv4_decimal_addr_str(struct ipaddr ipv4_address)
struct ipaddr ip_address_binary_to_dotdecimal(rte_le32_t binary_ipv4_address)