DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
IfaceWrapper.hxx
Go to the documentation of this file.
1#include <time.h>
2#include <rte_arp.h>
3#include <rte_ethdev.h>
4
5namespace dunedaq {
6namespace dpdklibs {
7
8int
9IfaceWrapper::rx_runner(void *arg __rte_unused) {
10
11 // Timespec for opportunistic sleep. Nanoseconds configured in conf.
12 struct timespec sleep_request = { 0, (long)m_lcore_sleep_ns };
13
14 //bool once = true; // One shot action variable.
15 uint16_t iface = m_iface_id;
16
17 const uint16_t lid = rte_lcore_id();
18 auto queues = m_rx_core_map[lid];
19
20 if (rte_eth_dev_socket_id(iface) >= 0 && rte_eth_dev_socket_id(iface) != (int)rte_socket_id()) {
21 TLOG() << "WARNING, iface " << iface << " is on remote NUMA node to polling thread! "
22 << "Performance will not be optimal.";
23 }
24
25 TLOG() << "LCore RX runner on CPU[" << lid << "]: Main loop starts for iface " << iface << " !";
26
27 std::map<int, int> nb_rx_map;
28 // While loop of quit atomic member in IfaceWrapper
29 while(!this->m_lcore_quit_signal.load()) {
30
31 // Loop over assigned queues to process
32 uint8_t fb_count(0);
33 for (const auto& q : queues) {
34 auto src_rx_q = q.first;
35 auto* q_bufs = m_bufs[src_rx_q];
36
37 // Get burst from queue
38 const uint16_t nb_rx = rte_eth_rx_burst(iface, src_rx_q, q_bufs, m_burst_size);
39 nb_rx_map[src_rx_q] = nb_rx;
40 }
41
42
43 for (const auto& q : queues) {
44
45 auto src_rx_q = q.first;
46 auto* q_bufs = m_bufs[src_rx_q];
47 const uint16_t nb_rx = nb_rx_map[src_rx_q];
48
49 // We got packets from burst on this queue
50 if (nb_rx != 0) [[likely]] {
51
52 // Update max burst size counter of this queue
53 m_max_burst_size[src_rx_q] = std::max(nb_rx, m_max_burst_size[src_rx_q].load());
54
55 // -------
56 // Iterate on burst packets
57 for (int i_b=0; i_b<nb_rx; ++i_b) {
58
59 // Check if packet is segmented. Implement support for it if needed.
60 //if (q_bufs[i_b]->nb_segs > 1) [[unlikely]] {
61 // TLOG_DEBUG(10) << "It appears a packet is spread across more than one receiving buffer;"
62 // << " there's currently no logic in this program to handle this";
63 //}
64
65 // Check packet type, decide their fate: ignore unexpected ones, FIXME: monitor occurrences
66 auto pkt_type = q_bufs[i_b]->packet_type;
67 // Handle non IPV4 frames.
68 if (not RTE_ETH_IS_IPV4_HDR(pkt_type)) [[unlikely]] {
69 //TLOG_DEBUG(10) << "Non-Ethernet packet type: " << (unsigned)pkt_type << " original: " << pkt_type;
70 if (pkt_type == RTE_PTYPE_L2_ETHER_ARP) {
71 //TLOG() << "Unexpected: Should handle an ARP request from lcore=" << lid << " rx_q=" << src_rx_q << "! Flow should be steered to dedicated RX Queue.";
72 } else if (pkt_type == RTE_PTYPE_L2_ETHER_LLDP) {
73 //TLOG_DEBUG(10) << "TODO: Handle LLDP packet!";
74 } else {
75 //TLOG_DEBUG(10) << "Unidentified! Dumping...";
76 //rte_pktmbuf_dump(stdout, q_bufs[i_b], m_bufs[src_rx_q][i_b]->pkt_len);
77 }
78 ++m_num_unhandled_non_ipv4[lid];
79 continue;
80 }
81
82 // Check if frame is non UDP: in that case, ignore it.
83 if ((pkt_type & RTE_PTYPE_L4_MASK) != RTE_PTYPE_L4_UDP) [[unlikely]] {
84 ++m_num_unhandled_non_udp[lid];
85 continue; // ommit it
86 }
87
88 // Check for JUMBO frames (bigger than 1500 Bytes)
89 if (q_bufs[i_b]->pkt_len > 1500) [[likely]] { // RS FIXME: do proper check on data length later
90
91 // Get length of user payload. (Ethernet headers excluded.)
92 std::size_t data_len = q_bufs[i_b]->data_len;
93
94 // If flow enabled, handle the payload.
95 if ( m_lcore_enable_flow.load() ) [[likely]] {
96 char* message = udp::get_udp_payload(q_bufs[i_b]);
97 handle_eth_payload(src_rx_q, message, data_len);
98 }
99
100 // Update metrics of queue: frame and Byte counters
101 ++m_num_frames_rxq[src_rx_q];
102 m_num_bytes_rxq[src_rx_q] += data_len;
103 } else {
104 ++m_num_unhandled_non_jumbo_udp[lid];
105 }
106 }
107
108 // Bulk free of mbufs
109 rte_pktmbuf_free_bulk(q_bufs, nb_rx);
110
111 } // per burst
112 // -----------
113
114 // Full burst counter
115 if (nb_rx == m_burst_size) {
116 ++fb_count;
117 ++m_num_full_bursts[src_rx_q];
118 }
119 } // per queue
120
121 // If no full buffers in burst...
122 if (!fb_count) {
123 if (m_lcore_sleep_ns) {
124 // Sleep n nanoseconds... (value from config, timespec initialized in lcore first lines)
125 /*int response =*/ nanosleep(&sleep_request, nullptr);
126 }
127 }
128
129 } // main while(quit) loop
130
131 TLOG() << "LCore RX runner on CPU[" << lid << "] returned.";
132 return 0;
133}
134
135
136int
137IfaceWrapper::arp_response_runner(void *arg __rte_unused) {
138
139 // Timespec for opportunistic sleep. Nanoseconds configured in conf.
140 struct timespec sleep_request = { 0, (long)900000 };
141
142 //bool once = true; // One shot action variable.
143 uint16_t iface = m_iface_id;
144
145 const uint16_t lid = rte_lcore_id();
146 unsigned arp_rx_queue = m_arp_rx_queue;
147
148 TLOG() << "LCore ARP responder on CPU[" << lid << "]: Main loop starts for iface " << iface << " rx queue: " << arp_rx_queue;
149
150 // While loop of quit atomic member in IfaceWrapper
151 while(!this->m_lcore_quit_signal.load()) {
152
153 const uint16_t nb_rx = rte_eth_rx_burst(iface, arp_rx_queue, m_arp_bufs[arp_rx_queue], m_burst_size);
154
155 // We got packets from burst on this queue
156 if (nb_rx != 0) {
157 // Iterate on burst packets
158 for (int i_b=0; i_b<nb_rx; ++i_b) {
159
160 // Check packet type, ommit/drop unexpected ones.
161 auto pkt_type = m_arp_bufs[arp_rx_queue][i_b]->packet_type;
163 if (not RTE_ETH_IS_IPV4_HDR(pkt_type)) {
164 //TLOG_DEBUG(10) << "Non-Ethernet packet type: " << (unsigned)pkt_type << " original: " << pkt_type;
165 if (pkt_type == RTE_PTYPE_L2_ETHER_ARP) {
166 TLOG_DEBUG(10) << "Handling ARP request";
167 struct rte_ether_hdr* eth_hdr = rte_pktmbuf_mtod(m_arp_bufs[arp_rx_queue][i_b], struct rte_ether_hdr *);
168 struct rte_arp_hdr* arp_hdr = (struct rte_arp_hdr *)(eth_hdr + 1);
169
170 std::string srcaddr = dunedaq::dpdklibs::udp::get_ipv4_decimal_addr_str(dunedaq::dpdklibs::udp::ip_address_binary_to_dotdecimal(rte_be_to_cpu_32(arp_hdr->arp_data.arp_sip)));
171 TLOG_DEBUG(10) << "SRC IP: " << srcaddr;
172 std::string dstaddr = dunedaq::dpdklibs::udp::get_ipv4_decimal_addr_str(dunedaq::dpdklibs::udp::ip_address_binary_to_dotdecimal(rte_be_to_cpu_32(arp_hdr->arp_data.arp_tip)));
173 TLOG_DEBUG(10) << "DEST IP: " << dstaddr;
174
175 for( const auto& ip_addr_bin : m_ip_addr_bin) {
177 TLOG_DEBUG(10) << "LOCAL IP: " << localaddr;
178 }
179
180
181 if (std::find(m_ip_addr_bin.begin(), m_ip_addr_bin.end(), arp_hdr->arp_data.arp_tip) != m_ip_addr_bin.end()) {
182 arp::pktgen_process_arp(m_arp_bufs[arp_rx_queue][i_b], 0, arp_hdr->arp_data.arp_tip);
183 } else {
184 TLOG_DEBUG(10) << "I'm not the ARP target";
185 }
186 } else if (pkt_type == RTE_PTYPE_L2_ETHER_LLDP) {
187 //TLOG_DEBUG(10) << "TODO: Handle LLDP packet!";
188 } else {
189 //TLOG_DEBUG(10) << "Unidentified! Dumping...";
190 //rte_pktmbuf_dump(stdout, m_arp_bufs[arp_rx_queue][i_b], m_bufs[src_rx_q][i_b]->pkt_len);
191 }
192 continue;
193 }
194 }
195
196 // Bulk free of mbufs
197 rte_pktmbuf_free_bulk(m_arp_bufs[arp_rx_queue], nb_rx);
198
199 } // per burst
200
201 // If no full buffers in burst...
202 if (m_lcore_sleep_ns) {
203 // Sleep n nanoseconds... (value from config, timespec initialized in lcore first lines)
204 /*int response =*/ nanosleep(&sleep_request, nullptr);
205 }
206
207 } // main while(quit) loop
208
209 TLOG() << "LCore ARP responder on CPU[" << lid << "] returned.";
210 return 0;
211}
212
213} // namespace dpdklibs
214} // namespace dunedaq
#define TLOG_DEBUG(lvl,...)
Definition Logging.hpp:112
#define TLOG(...)
Definition macro.hpp:22
void pktgen_process_arp(struct rte_mbuf *m, uint32_t pid, rte_be32_t binary_ip_address)
Definition ARP.cpp:78
char * get_udp_payload(const rte_mbuf *mbuf)
Definition Utils.cpp:70
std::string get_ipv4_decimal_addr_str(struct ipaddr ipv4_address)
Definition Utils.cpp:56
struct ipaddr ip_address_binary_to_dotdecimal(rte_le32_t binary_ipv4_address)
Definition Utils.cpp:48
The DUNE-DAQ namespace.
Definition DataStore.hpp:57