Line data Source code
1 : #include <time.h>
2 : #include <rte_arp.h>
3 : #include <rte_ethdev.h>
4 : #include "dpdklibs/udp/IPV4UDPPacket.hpp"
5 :
6 : namespace dunedaq {
7 : namespace dpdklibs {
8 :
9 : int
10 0 : IfaceWrapper::rx_runner(void *arg __rte_unused) {
11 :
12 : // Timespec for opportunistic sleep. Nanoseconds configured in conf.
13 0 : struct timespec sleep_request = { 0, (long)m_lcore_sleep_ns };
14 :
15 : //bool once = true; // One shot action variable.
16 0 : uint16_t iface = m_iface_id;
17 :
18 0 : const uint16_t lid = rte_lcore_id();
19 0 : auto queues = m_rx_core_map[lid];
20 :
21 0 : if (rte_eth_dev_socket_id(iface) >= 0 && rte_eth_dev_socket_id(iface) != (int)rte_socket_id()) {
22 0 : TLOG() << "WARNING, iface " << iface << " is on remote NUMA node to polling thread! "
23 0 : << "Performance will not be optimal.";
24 : }
25 :
26 0 : TLOG() << "LCore RX runner on CPU[" << lid << "]: Main loop starts for iface " << iface << " !";
27 :
28 0 : std::map<int, int> nb_rx_map;
29 : // While loop of quit atomic member in IfaceWrapper
30 0 : while(!this->m_lcore_quit_signal.load()) {
31 :
32 : // Loop over assigned queues to process
33 0 : uint8_t fb_count(0);
34 0 : for (const auto& q : queues) {
35 0 : auto src_rx_q = q.first;
36 0 : auto* q_bufs = m_bufs[src_rx_q];
37 :
38 : // Get burst from queue
39 0 : const uint16_t nb_rx = rte_eth_rx_burst(iface, src_rx_q, q_bufs, m_burst_size);
40 0 : nb_rx_map[src_rx_q] = nb_rx;
41 : }
42 :
43 :
44 0 : for (const auto& q : queues) {
45 :
46 0 : auto src_rx_q = q.first;
47 0 : auto* q_bufs = m_bufs[src_rx_q];
48 0 : const uint16_t nb_rx = nb_rx_map[src_rx_q];
49 :
50 : // We got packets from burst on this queue
51 0 : if (nb_rx != 0) [[likely]] {
52 :
53 : // Update max burst size counter of this queue
54 0 : m_max_burst_size[src_rx_q] = std::max(nb_rx, m_max_burst_size[src_rx_q].load());
55 :
56 : // -------
57 : // Iterate on burst packets
58 0 : for (int i_b=0; i_b<nb_rx; ++i_b) {
59 :
60 : // Check if packet is segmented. Implement support for it if needed.
61 : //if (q_bufs[i_b]->nb_segs > 1) [[unlikely]] {
62 : // TLOG_DEBUG(10) << "It appears a packet is spread across more than one receiving buffer;"
63 : // << " there's currently no logic in this program to handle this";
64 : //}
65 :
66 : // Check packet type, decide their fate: ignore unexpected ones, FIXME: monitor occurrences
67 0 : auto pkt_type = q_bufs[i_b]->packet_type;
68 : // Handle non IPV4 frames.
69 0 : if (not RTE_ETH_IS_IPV4_HDR(pkt_type)) [[unlikely]] {
70 : //TLOG_DEBUG(10) << "Non-Ethernet packet type: " << (unsigned)pkt_type << " original: " << pkt_type;
71 0 : if (pkt_type == RTE_PTYPE_L2_ETHER_ARP) {
72 : //TLOG() << "Unexpected: Should handle an ARP request from lcore=" << lid << " rx_q=" << src_rx_q << "! Flow should be steered to dedicated RX Queue.";
73 0 : } else if (pkt_type == RTE_PTYPE_L2_ETHER_LLDP) {
74 : //TLOG_DEBUG(10) << "TODO: Handle LLDP packet!";
75 : } else {
76 : //TLOG_DEBUG(10) << "Unidentified! Dumping...";
77 : //rte_pktmbuf_dump(stdout, q_bufs[i_b], m_bufs[src_rx_q][i_b]->pkt_len);
78 : }
79 0 : ++m_num_unhandled_non_ipv4[lid];
80 0 : continue;
81 : }
82 :
83 : // Check if frame is non UDP: in that case, ignore it.
84 0 : if ((pkt_type & RTE_PTYPE_L4_MASK) != RTE_PTYPE_L4_UDP) [[unlikely]] {
85 0 : ++m_num_unhandled_non_udp[lid];
86 0 : continue; // ommit it
87 : }
88 :
89 : // Check for JUMBO frames (bigger than 1500 Bytes)
90 0 : if (q_bufs[i_b]->pkt_len > 1500) [[likely]] { // RS FIXME: do proper check on data length later
91 :
92 : // If flow enabled, handle the payload.
93 0 : if ( m_lcore_enable_flow.load() ) [[likely]] {
94 : // Get length of user payload. (Ethernet headers excluded.)
95 0 : struct udp::ipv4_udp_packet_hdr* udp_packet = rte_pktmbuf_mtod(q_bufs[i_b], struct udp::ipv4_udp_packet_hdr*);
96 0 : char* message = udp::get_udp_payload(q_bufs[i_b]);
97 0 : std::size_t udp_payload_len = udp::get_payload_size_udp_hdr(&udp_packet->udp_hdr);
98 :
99 0 : if ( m_strict_parsing ) { // all sources maintain DAQ protocol
100 0 : parse_udp_payload(src_rx_q, message, udp_payload_len);
101 : } else { // avoid size checks and scattering
102 0 : passthrough_udp_payload(src_rx_q, message, udp_payload_len);
103 : }
104 : }
105 :
106 : // Update metrics of queue: frame and Byte counters
107 0 : ++m_num_frames_rxq[src_rx_q];
108 0 : std::size_t data_len = q_bufs[i_b]->data_len;
109 0 : m_num_bytes_rxq[src_rx_q] += data_len;
110 : } else {
111 0 : ++m_num_unhandled_non_jumbo_udp[lid];
112 : }
113 : }
114 :
115 : // Bulk free of mbufs
116 0 : rte_pktmbuf_free_bulk(q_bufs, nb_rx);
117 :
118 : } // per burst
119 : // -----------
120 :
121 : // Full burst counter
122 0 : if (nb_rx == m_burst_size) {
123 0 : ++fb_count;
124 0 : ++m_num_full_bursts[src_rx_q];
125 : }
126 : } // per queue
127 :
128 : // If no full buffers in burst...
129 0 : if (!fb_count) {
130 0 : if (m_lcore_sleep_ns) {
131 : // Sleep n nanoseconds... (value from config, timespec initialized in lcore first lines)
132 0 : /*int response =*/ nanosleep(&sleep_request, nullptr);
133 : }
134 : }
135 :
136 : } // main while(quit) loop
137 :
138 0 : TLOG() << "LCore RX runner on CPU[" << lid << "] returned.";
139 0 : return 0;
140 0 : }
141 :
142 :
143 : int
144 0 : IfaceWrapper::arp_response_runner(void *arg __rte_unused) {
145 :
146 : // Timespec for opportunistic sleep. Nanoseconds configured in conf.
147 0 : struct timespec sleep_request = { 0, (long)900000 };
148 :
149 : //bool once = true; // One shot action variable.
150 0 : uint16_t iface = m_iface_id;
151 :
152 0 : const uint16_t lid = rte_lcore_id();
153 0 : unsigned arp_rx_queue = m_arp_rx_queue;
154 :
155 0 : TLOG() << "LCore ARP responder on CPU[" << lid << "]: Main loop starts for iface " << iface << " rx queue: " << arp_rx_queue;
156 :
157 : // While loop of quit atomic member in IfaceWrapper
158 0 : while(!this->m_lcore_quit_signal.load()) {
159 :
160 0 : const uint16_t nb_rx = rte_eth_rx_burst(iface, arp_rx_queue, m_arp_bufs[arp_rx_queue], m_burst_size);
161 :
162 : // We got packets from burst on this queue
163 0 : if (nb_rx != 0) {
164 : // Iterate on burst packets
165 0 : for (int i_b=0; i_b<nb_rx; ++i_b) {
166 :
167 : // Check packet type, ommit/drop unexpected ones.
168 0 : auto pkt_type = m_arp_bufs[arp_rx_queue][i_b]->packet_type;
169 : //// Handle non IPV4 packets
170 0 : if (not RTE_ETH_IS_IPV4_HDR(pkt_type)) {
171 : //TLOG_DEBUG(10) << "Non-Ethernet packet type: " << (unsigned)pkt_type << " original: " << pkt_type;
172 0 : if (pkt_type == RTE_PTYPE_L2_ETHER_ARP) {
173 0 : TLOG_DEBUG(10) << "Handling ARP request";
174 0 : struct rte_ether_hdr* eth_hdr = rte_pktmbuf_mtod(m_arp_bufs[arp_rx_queue][i_b], struct rte_ether_hdr *);
175 0 : struct rte_arp_hdr* arp_hdr = (struct rte_arp_hdr *)(eth_hdr + 1);
176 :
177 0 : std::string srcaddr = dunedaq::dpdklibs::udp::get_ipv4_decimal_addr_str(dunedaq::dpdklibs::udp::ip_address_binary_to_dotdecimal(rte_be_to_cpu_32(arp_hdr->arp_data.arp_sip)));
178 0 : TLOG_DEBUG(10) << "SRC IP: " << srcaddr;
179 0 : std::string dstaddr = dunedaq::dpdklibs::udp::get_ipv4_decimal_addr_str(dunedaq::dpdklibs::udp::ip_address_binary_to_dotdecimal(rte_be_to_cpu_32(arp_hdr->arp_data.arp_tip)));
180 0 : TLOG_DEBUG(10) << "DEST IP: " << dstaddr;
181 :
182 0 : for( const auto& ip_addr_bin : m_ip_addr_bin) {
183 0 : std::string localaddr = dunedaq::dpdklibs::udp::get_ipv4_decimal_addr_str(dunedaq::dpdklibs::udp::ip_address_binary_to_dotdecimal(rte_be_to_cpu_32(ip_addr_bin)));
184 0 : TLOG_DEBUG(10) << "LOCAL IP: " << localaddr;
185 0 : }
186 :
187 :
188 0 : if (std::find(m_ip_addr_bin.begin(), m_ip_addr_bin.end(), arp_hdr->arp_data.arp_tip) != m_ip_addr_bin.end()) {
189 0 : arp::pktgen_process_arp(m_arp_bufs[arp_rx_queue][i_b], m_iface_id, arp_hdr->arp_data.arp_tip);
190 : } else {
191 0 : TLOG_DEBUG(10) << "I'm not the ARP target";
192 : }
193 0 : } else if (pkt_type == RTE_PTYPE_L2_ETHER_LLDP) {
194 : //TLOG_DEBUG(10) << "TODO: Handle LLDP packet!";
195 : } else {
196 : //TLOG_DEBUG(10) << "Unidentified! Dumping...";
197 : //rte_pktmbuf_dump(stdout, m_arp_bufs[arp_rx_queue][i_b], m_bufs[src_rx_q][i_b]->pkt_len);
198 : }
199 0 : continue;
200 0 : }
201 : }
202 :
203 : // Bulk free of mbufs
204 0 : rte_pktmbuf_free_bulk(m_arp_bufs[arp_rx_queue], nb_rx);
205 :
206 : } // per burst
207 :
208 : // If no full buffers in burst...
209 0 : if (m_lcore_sleep_ns) {
210 : // Sleep n nanoseconds... (value from config, timespec initialized in lcore first lines)
211 0 : /*int response =*/ nanosleep(&sleep_request, nullptr);
212 : }
213 :
214 : } // main while(quit) loop
215 :
216 0 : TLOG() << "LCore ARP responder on CPU[" << lid << "] returned.";
217 0 : return 0;
218 : }
219 :
220 : } // namespace dpdklibs
221 : } // namespace dunedaq
|