DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
IfaceWrapper.hxx
Go to the documentation of this file.
1#include <time.h>
2#include <rte_arp.h>
3
4namespace dunedaq {
5namespace dpdklibs {
6
7int
8IfaceWrapper::rx_runner(void *arg __rte_unused) {
9
10 // Timespec for opportunistic sleep. Nanoseconds configured in conf.
11 struct timespec sleep_request = { 0, (long)m_lcore_sleep_ns };
12
13 bool once = true; // One shot action variable.
14 uint16_t iface = m_iface_id;
15
16 const uint16_t lid = rte_lcore_id();
17 auto queues = m_rx_core_map[lid];
18
19 if (rte_eth_dev_socket_id(iface) >= 0 && rte_eth_dev_socket_id(iface) != (int)rte_socket_id()) {
20 TLOG() << "WARNING, iface " << iface << " is on remote NUMA node to polling thread! "
21 << "Performance will not be optimal.";
22 }
23
24 TLOG() << "LCore RX runner on CPU[" << lid << "]: Main loop starts for iface " << iface << " !";
25
26 std::map<int, int> nb_rx_map;
27 // While loop of quit atomic member in IfaceWrapper
28 while(!this->m_lcore_quit_signal.load()) {
29
30 // Loop over assigned queues to process
31 uint8_t fb_count(0);
32 for (const auto& q : queues) {
33 auto src_rx_q = q.first;
34 auto* q_bufs = m_bufs[src_rx_q];
35
36 // Get burst from queue
37 const uint16_t nb_rx = rte_eth_rx_burst(iface, src_rx_q, q_bufs, m_burst_size);
38 nb_rx_map[src_rx_q] = nb_rx;
39 }
40
41
42 for (const auto& q : queues) {
43
44 auto src_rx_q = q.first;
45 auto* q_bufs = m_bufs[src_rx_q];
46 const uint16_t nb_rx = nb_rx_map[src_rx_q];
47
48 // We got packets from burst on this queue
49 if (nb_rx != 0) [[likely]] {
50
51 m_max_burst_size[src_rx_q] = std::max(nb_rx, m_max_burst_size[src_rx_q].load());
52 // -------
53 // Iterate on burst packets
54 for (int i_b=0; i_b<nb_rx; ++i_b) {
55
56/*
57 // Check if packet is segmented. Implement support for it if needed.
58 if (q_bufs[i_b]->nb_segs > 1) [[unlikely]] {
59 //TLOG_DEBUG(10) << "It appears a packet is spread across more than one receiving buffer;"
60 // << " there's currently no logic in this program to handle this";
61 }
62
63 // Check packet type, ommit/drop unexpected ones.
64 auto pkt_type = q_bufs[i_b]->packet_type;
66 if (not RTE_ETH_IS_IPV4_HDR(pkt_type)) [[unlikely]] {
67 //TLOG_DEBUG(10) << "Non-Ethernet packet type: " << (unsigned)pkt_type << " original: " << pkt_type;
68 if (pkt_type == RTE_PTYPE_L2_ETHER_ARP) {
69 //TLOG() << "Unexpected: Should handle an ARP request from lcore=" << lid << " rx_q=" << src_rx_q << "! Flow should be steered to dedicated RX Queue.";
70 } else if (pkt_type == RTE_PTYPE_L2_ETHER_LLDP) {
71 //TLOG_DEBUG(10) << "TODO: Handle LLDP packet!";
72 } else {
73 //TLOG_DEBUG(10) << "Unidentified! Dumping...";
74 //rte_pktmbuf_dump(stdout, q_bufs[i_b], m_bufs[src_rx_q][i_b]->pkt_len);
75 }
76 continue;
77 }
78*/
79
80 // Check for UDP frames
81 //if (pkt_type == RTE_PTYPE_L4_UDP) { // RS FIXME: doesn't work. Why? What is the PKT_TYPE in our ETH frames?
82 // Check for JUMBO frames
83 if (q_bufs[i_b]->pkt_len > 7000) [[likely]] { // RS FIXME: do proper check on data length later
84 // Handle them!
85 std::size_t data_len = q_bufs[i_b]->data_len;
86
87 if ( m_lcore_enable_flow.load() ) [[likely]] {
88 char* message = udp::get_udp_payload(q_bufs[i_b]);
89 handle_eth_payload(src_rx_q, message, data_len);
90 }
91 ++m_num_frames_rxq[src_rx_q];
92 m_num_bytes_rxq[src_rx_q] += data_len;
93 }
94 }
95
96 // Bulk free of mbufs
97 rte_pktmbuf_free_bulk(q_bufs, nb_rx);
98
99 // -------
100
101 } // per burst
102
103 // Full burst counter
104 if (nb_rx == m_burst_size) {
105 ++fb_count;
106 ++m_num_full_bursts[src_rx_q];
107 }
108 } // per queue
109
110 // If no full buffers in burst...
111 if (!fb_count) {
112 if (m_lcore_sleep_ns) {
113 // Sleep n nanoseconds... (value from config, timespec initialized in lcore first lines)
114 /*int response =*/ nanosleep(&sleep_request, nullptr);
115 }
116 }
117
118 } // main while(quit) loop
119
120 TLOG() << "LCore RX runner on CPU[" << lid << "] returned.";
121 return 0;
122}
123
124
125int
126IfaceWrapper::arp_response_runner(void *arg __rte_unused) {
127
128 // Timespec for opportunistic sleep. Nanoseconds configured in conf.
129 struct timespec sleep_request = { 0, (long)900000 };
130
131 bool once = true; // One shot action variable.
132 uint16_t iface = m_iface_id;
133
134 const uint16_t lid = rte_lcore_id();
135 unsigned arp_rx_queue = m_arp_rx_queue;
136
137 TLOG() << "LCore ARP responder on CPU[" << lid << "]: Main loop starts for iface " << iface << " rx queue: " << arp_rx_queue;
138
139 // While loop of quit atomic member in IfaceWrapper
140 while(!this->m_lcore_quit_signal.load()) {
141
142 const uint16_t nb_rx = rte_eth_rx_burst(iface, arp_rx_queue, m_arp_bufs[arp_rx_queue], m_burst_size);
143
144 // We got packets from burst on this queue
145 if (nb_rx != 0) {
146 // Iterate on burst packets
147 for (int i_b=0; i_b<nb_rx; ++i_b) {
148
149 // Check packet type, ommit/drop unexpected ones.
150 auto pkt_type = m_arp_bufs[arp_rx_queue][i_b]->packet_type;
152 if (not RTE_ETH_IS_IPV4_HDR(pkt_type)) {
153 //TLOG_DEBUG(10) << "Non-Ethernet packet type: " << (unsigned)pkt_type << " original: " << pkt_type;
154 if (pkt_type == RTE_PTYPE_L2_ETHER_ARP) {
155 TLOG_DEBUG(10) << "Handling ARP request";
156 struct rte_ether_hdr* eth_hdr = rte_pktmbuf_mtod(m_arp_bufs[arp_rx_queue][i_b], struct rte_ether_hdr *);
157 struct rte_arp_hdr* arp_hdr = (struct rte_arp_hdr *)(eth_hdr + 1);
158
159 std::string srcaddr = dunedaq::dpdklibs::udp::get_ipv4_decimal_addr_str(dunedaq::dpdklibs::udp::ip_address_binary_to_dotdecimal(rte_be_to_cpu_32(arp_hdr->arp_data.arp_sip)));
160 TLOG_DEBUG(10) << "SRC IP: " << srcaddr;
161 std::string dstaddr = dunedaq::dpdklibs::udp::get_ipv4_decimal_addr_str(dunedaq::dpdklibs::udp::ip_address_binary_to_dotdecimal(rte_be_to_cpu_32(arp_hdr->arp_data.arp_tip)));
162 TLOG_DEBUG(10) << "DEST IP: " << dstaddr;
163
164 for( const auto& ip_addr_bin : m_ip_addr_bin) {
166 TLOG_DEBUG(10) << "LOCAL IP: " << localaddr;
167 }
168
169
170 if (std::find(m_ip_addr_bin.begin(), m_ip_addr_bin.end(), arp_hdr->arp_data.arp_tip) != m_ip_addr_bin.end()) {
171 arp::pktgen_process_arp(m_arp_bufs[arp_rx_queue][i_b], 0, arp_hdr->arp_data.arp_tip);
172 } else {
173 TLOG_DEBUG(10) << "I'm not the ARP target";
174 }
175 } else if (pkt_type == RTE_PTYPE_L2_ETHER_LLDP) {
176 //TLOG_DEBUG(10) << "TODO: Handle LLDP packet!";
177 } else {
178 //TLOG_DEBUG(10) << "Unidentified! Dumping...";
179 //rte_pktmbuf_dump(stdout, m_arp_bufs[arp_rx_queue][i_b], m_bufs[src_rx_q][i_b]->pkt_len);
180 }
181 continue;
182 }
183 }
184
185 // Bulk free of mbufs
186 rte_pktmbuf_free_bulk(m_arp_bufs[arp_rx_queue], nb_rx);
187
188 } // per burst
189
190 // If no full buffers in burst...
191 if (m_lcore_sleep_ns) {
192 // Sleep n nanoseconds... (value from config, timespec initialized in lcore first lines)
193 /*int response =*/ nanosleep(&sleep_request, nullptr);
194 }
195
196 } // main while(quit) loop
197
198 TLOG() << "LCore ARP responder on CPU[" << lid << "] returned.";
199 return 0;
200}
201
202} // namespace dpdklibs
203} // namespace dunedaq
#define TLOG_DEBUG(lvl,...)
Definition Logging.hpp:112
#define TLOG(...)
Definition macro.hpp:22
void pktgen_process_arp(struct rte_mbuf *m, uint32_t pid, rte_be32_t binary_ip_address)
Definition ARP.cpp:77
char * get_udp_payload(const rte_mbuf *mbuf)
Definition Utils.cpp:70
std::string get_ipv4_decimal_addr_str(struct ipaddr ipv4_address)
Definition Utils.cpp:56
struct ipaddr ip_address_binary_to_dotdecimal(rte_le32_t binary_ipv4_address)
Definition Utils.cpp:48
Including Qt Headers.