DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
IfaceWrapper.hxx
Go to the documentation of this file.
1#include <time.h>
2#include <rte_arp.h>
3#include <rte_ethdev.h>
5
6namespace dunedaq {
7namespace dpdklibs {
8
9int
10IfaceWrapper::rx_runner(void *arg __rte_unused) {
11
12 // Timespec for opportunistic sleep. Nanoseconds configured in conf.
13 struct timespec sleep_request = { 0, (long)m_lcore_sleep_ns };
14
15 //bool once = true; // One shot action variable.
16 uint16_t iface = m_iface_id;
17
18 const uint16_t lid = rte_lcore_id();
19 auto queues = m_rx_core_map[lid];
20
21 if (rte_eth_dev_socket_id(iface) >= 0 && rte_eth_dev_socket_id(iface) != (int)rte_socket_id()) {
22 TLOG() << "WARNING, iface " << iface << " is on remote NUMA node to polling thread! "
23 << "Performance will not be optimal.";
24 }
25
26 TLOG() << "LCore RX runner on CPU[" << lid << "]: Main loop starts for iface " << iface << " !";
27
28 std::map<int, int> nb_rx_map;
29 // While loop of quit atomic member in IfaceWrapper
30 while(!this->m_lcore_quit_signal.load()) {
31
32 // Loop over assigned queues to process
33 uint8_t fb_count(0);
34 for (const auto& q : queues) {
35 auto src_rx_q = q.first;
36 auto* q_bufs = m_bufs[src_rx_q];
37
38 // Get burst from queue
39 const uint16_t nb_rx = rte_eth_rx_burst(iface, src_rx_q, q_bufs, m_burst_size);
40 nb_rx_map[src_rx_q] = nb_rx;
41 }
42
43
44 for (const auto& q : queues) {
45
46 auto src_rx_q = q.first;
47 auto* q_bufs = m_bufs[src_rx_q];
48 const uint16_t nb_rx = nb_rx_map[src_rx_q];
49
50 // We got packets from burst on this queue
51 if (nb_rx != 0) [[likely]] {
52
53 // Update max burst size counter of this queue
54 m_max_burst_size[src_rx_q] = std::max(nb_rx, m_max_burst_size[src_rx_q].load());
55
56 // -------
57 // Iterate on burst packets
58 for (int i_b=0; i_b<nb_rx; ++i_b) {
59
60 // Check if packet is segmented. Implement support for it if needed.
61 //if (q_bufs[i_b]->nb_segs > 1) [[unlikely]] {
62 // TLOG_DEBUG(10) << "It appears a packet is spread across more than one receiving buffer;"
63 // << " there's currently no logic in this program to handle this";
64 //}
65
66 // Check packet type, decide their fate: ignore unexpected ones, FIXME: monitor occurrences
67 auto pkt_type = q_bufs[i_b]->packet_type;
68 // Handle non IPV4 frames.
69 if (not RTE_ETH_IS_IPV4_HDR(pkt_type)) [[unlikely]] {
70 //TLOG_DEBUG(10) << "Non-Ethernet packet type: " << (unsigned)pkt_type << " original: " << pkt_type;
71 if (pkt_type == RTE_PTYPE_L2_ETHER_ARP) {
72 //TLOG() << "Unexpected: Should handle an ARP request from lcore=" << lid << " rx_q=" << src_rx_q << "! Flow should be steered to dedicated RX Queue.";
73 } else if (pkt_type == RTE_PTYPE_L2_ETHER_LLDP) {
74 //TLOG_DEBUG(10) << "TODO: Handle LLDP packet!";
75 } else {
76 //TLOG_DEBUG(10) << "Unidentified! Dumping...";
77 //rte_pktmbuf_dump(stdout, q_bufs[i_b], m_bufs[src_rx_q][i_b]->pkt_len);
78 }
79 ++m_num_unhandled_non_ipv4[lid];
80 continue;
81 }
82
83 // Check if frame is non UDP: in that case, ignore it.
84 if ((pkt_type & RTE_PTYPE_L4_MASK) != RTE_PTYPE_L4_UDP) [[unlikely]] {
85 ++m_num_unhandled_non_udp[lid];
86 continue; // ommit it
87 }
88
89 // Check for JUMBO frames (bigger than 1500 Bytes)
90 if (q_bufs[i_b]->pkt_len > 1500) [[likely]] { // RS FIXME: do proper check on data length later
91
92 // If flow enabled, handle the payload.
93 if ( m_lcore_enable_flow.load() ) [[likely]] {
94 // Get length of user payload. (Ethernet headers excluded.)
95 struct udp::ipv4_udp_packet_hdr* udp_packet = rte_pktmbuf_mtod(q_bufs[i_b], struct udp::ipv4_udp_packet_hdr*);
96 char* message = udp::get_udp_payload(q_bufs[i_b]);
97 std::size_t udp_payload_len = udp::get_payload_size_udp_hdr(&udp_packet->udp_hdr);
98
99 if ( m_strict_parsing ) { // all sources maintain DAQ protocol
100 parse_udp_payload(src_rx_q, message, udp_payload_len);
101 } else { // avoid size checks and scattering
102 passthrough_udp_payload(src_rx_q, message, udp_payload_len);
103 }
104 }
105
106 // Update metrics of queue: frame and Byte counters
107 ++m_num_frames_rxq[src_rx_q];
108 std::size_t data_len = q_bufs[i_b]->data_len;
109 m_num_bytes_rxq[src_rx_q] += data_len;
110 } else {
111 ++m_num_unhandled_non_jumbo_udp[lid];
112 }
113 }
114
115 // Bulk free of mbufs
116 rte_pktmbuf_free_bulk(q_bufs, nb_rx);
117
118 } // per burst
119 // -----------
120
121 // Full burst counter
122 if (nb_rx == m_burst_size) {
123 ++fb_count;
124 ++m_num_full_bursts[src_rx_q];
125 }
126 } // per queue
127
128 // If no full buffers in burst...
129 if (!fb_count) {
130 if (m_lcore_sleep_ns) {
131 // Sleep n nanoseconds... (value from config, timespec initialized in lcore first lines)
132 /*int response =*/ nanosleep(&sleep_request, nullptr);
133 }
134 }
135
136 } // main while(quit) loop
137
138 TLOG() << "LCore RX runner on CPU[" << lid << "] returned.";
139 return 0;
140}
141
142
143int
144IfaceWrapper::arp_response_runner(void *arg __rte_unused) {
145
146 // Timespec for opportunistic sleep. Nanoseconds configured in conf.
147 struct timespec sleep_request = { 0, (long)900000 };
148
149 //bool once = true; // One shot action variable.
150 uint16_t iface = m_iface_id;
151
152 const uint16_t lid = rte_lcore_id();
153 unsigned arp_rx_queue = m_arp_rx_queue;
154
155 TLOG() << "LCore ARP responder on CPU[" << lid << "]: Main loop starts for iface " << iface << " rx queue: " << arp_rx_queue;
156
157 // While loop of quit atomic member in IfaceWrapper
158 while(!this->m_lcore_quit_signal.load()) {
159
160 const uint16_t nb_rx = rte_eth_rx_burst(iface, arp_rx_queue, m_arp_bufs[arp_rx_queue], m_burst_size);
161
162 // We got packets from burst on this queue
163 if (nb_rx != 0) {
164 // Iterate on burst packets
165 for (int i_b=0; i_b<nb_rx; ++i_b) {
166
167 // Check packet type, ommit/drop unexpected ones.
168 auto pkt_type = m_arp_bufs[arp_rx_queue][i_b]->packet_type;
170 if (not RTE_ETH_IS_IPV4_HDR(pkt_type)) {
171 //TLOG_DEBUG(10) << "Non-Ethernet packet type: " << (unsigned)pkt_type << " original: " << pkt_type;
172 if (pkt_type == RTE_PTYPE_L2_ETHER_ARP) {
173 TLOG_DEBUG(10) << "Handling ARP request";
174 struct rte_ether_hdr* eth_hdr = rte_pktmbuf_mtod(m_arp_bufs[arp_rx_queue][i_b], struct rte_ether_hdr *);
175 struct rte_arp_hdr* arp_hdr = (struct rte_arp_hdr *)(eth_hdr + 1);
176
177 std::string srcaddr = dunedaq::dpdklibs::udp::get_ipv4_decimal_addr_str(dunedaq::dpdklibs::udp::ip_address_binary_to_dotdecimal(rte_be_to_cpu_32(arp_hdr->arp_data.arp_sip)));
178 TLOG_DEBUG(10) << "SRC IP: " << srcaddr;
179 std::string dstaddr = dunedaq::dpdklibs::udp::get_ipv4_decimal_addr_str(dunedaq::dpdklibs::udp::ip_address_binary_to_dotdecimal(rte_be_to_cpu_32(arp_hdr->arp_data.arp_tip)));
180 TLOG_DEBUG(10) << "DEST IP: " << dstaddr;
181
182 for( const auto& ip_addr_bin : m_ip_addr_bin) {
184 TLOG_DEBUG(10) << "LOCAL IP: " << localaddr;
185 }
186
187
188 if (std::find(m_ip_addr_bin.begin(), m_ip_addr_bin.end(), arp_hdr->arp_data.arp_tip) != m_ip_addr_bin.end()) {
189 arp::pktgen_process_arp(m_arp_bufs[arp_rx_queue][i_b], m_iface_id, arp_hdr->arp_data.arp_tip);
190 } else {
191 TLOG_DEBUG(10) << "I'm not the ARP target";
192 }
193 } else if (pkt_type == RTE_PTYPE_L2_ETHER_LLDP) {
194 //TLOG_DEBUG(10) << "TODO: Handle LLDP packet!";
195 } else {
196 //TLOG_DEBUG(10) << "Unidentified! Dumping...";
197 //rte_pktmbuf_dump(stdout, m_arp_bufs[arp_rx_queue][i_b], m_bufs[src_rx_q][i_b]->pkt_len);
198 }
199 continue;
200 }
201 }
202
203 // Bulk free of mbufs
204 rte_pktmbuf_free_bulk(m_arp_bufs[arp_rx_queue], nb_rx);
205
206 } // per burst
207
208 // If no full buffers in burst...
209 if (m_lcore_sleep_ns) {
210 // Sleep n nanoseconds... (value from config, timespec initialized in lcore first lines)
211 /*int response =*/ nanosleep(&sleep_request, nullptr);
212 }
213
214 } // main while(quit) loop
215
216 TLOG() << "LCore ARP responder on CPU[" << lid << "] returned.";
217 return 0;
218}
219
220} // namespace dpdklibs
221} // namespace dunedaq
#define TLOG_DEBUG(lvl,...)
Definition Logging.hpp:112
#define TLOG(...)
Definition macro.hpp:22
void pktgen_process_arp(struct rte_mbuf *m, uint32_t pid, rte_be32_t binary_ip_address)
Definition ARP.cpp:78
char * get_udp_payload(const rte_mbuf *mbuf)
Definition Utils.cpp:70
std::string get_ipv4_decimal_addr_str(struct ipaddr ipv4_address)
Definition Utils.cpp:56
struct ipaddr ip_address_binary_to_dotdecimal(rte_le32_t binary_ipv4_address)
Definition Utils.cpp:48
std::uint16_t get_payload_size_udp_hdr(struct rte_udp_hdr *udp_hdr)
Definition Utils.cpp:29
The DUNE-DAQ namespace.