DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
IfaceWrapper.cpp
Go to the documentation of this file.
1
8#include "logging/Logging.hpp"
10
11#include "opmonlib/Utils.hpp"
12
13#include "dpdklibs/Issues.hpp"
14
16
17#include "dpdklibs/EALSetup.hpp"
21#include "dpdklibs/arp/ARP.hpp"
23#include "IfaceWrapper.hpp"
24
26// #include "confmodel/DROStreamConf.hpp"
27// #include "confmodel/StreamParameters.hpp"
28#include "confmodel/GeoId.hpp"
32// #include "confmodel/NetworkDevice.hpp"
33// #include "appmodel/NICInterfaceConfiguration.hpp"
34// #include "appmodel/NICStatsConf.hpp"
35// #include "appmodel/EthStreamParameters.hpp"
36
38
39#include <chrono>
40#include <memory>
41#include <string>
42#include <regex>
43#include <stdexcept>
44
48enum
49{
51 TLVL_WORK_STEPS = 10,
52 TLVL_BOOKKEEPING = 15
53};
54
55namespace dunedaq {
56namespace dpdklibs {
57
58
59//-----------------------------------------------------------------------------
60IfaceWrapper::IfaceWrapper(
61 uint iface_id,
62 const appmodel::DPDKReceiver* receiver,
63 const std::vector<const appmodel::NWDetDataSender*>& nw_senders,
64 sid_to_source_map_t& sources,
65 std::atomic<bool>& run_marker
66 )
67 : m_sources(sources)
68 , m_run_marker(run_marker)
69{
70 auto net_device = receiver->get_uses();
71
72 m_iface_id = iface_id;
73 m_mac_addr = net_device->get_mac_address();
74 m_ip_addr = net_device->get_ip_address();
75
76 for( const std::string& ip_addr : m_ip_addr) {
77 IpAddr ip_addr_struct(ip_addr);
78 m_ip_addr_bin.push_back(udp::ip_address_dotdecimal_to_binary(
79 ip_addr_struct.addr_bytes[0],
80 ip_addr_struct.addr_bytes[1],
81 ip_addr_struct.addr_bytes[2],
82 ip_addr_struct.addr_bytes[3]
83 ));
84 }
85
86
87 auto iface_cfg = receiver->get_configuration();
88
89 m_with_flow = iface_cfg->get_flow_control();
90 m_prom_mode = iface_cfg->get_promiscuous_mode();;
91 m_mtu = iface_cfg->get_mtu();
92 m_rx_ring_size = iface_cfg->get_rx_ring_size();
93 m_tx_ring_size = iface_cfg->get_tx_ring_size();
94 m_num_mbufs = iface_cfg->get_num_bufs();
95 m_burst_size = iface_cfg->get_burst_size();
96 m_mbuf_cache_size = iface_cfg->get_mbuf_cache_size();
97
98 m_lcore_sleep_ns = iface_cfg->get_lcore_sleep_us() * 1000;
99 m_socket_id = rte_eth_dev_socket_id(m_iface_id);
100
101 m_iface_id_str = iface_cfg->UID();
102
103
104 // Here is my list of cores
105 for( const auto* proc_res : iface_cfg->get_used_lcores()) {
106 m_rte_cores.insert(m_rte_cores.end(), proc_res->get_cpu_cores().begin(), proc_res->get_cpu_cores().end());
107 }
108 if(std::find(m_rte_cores.begin(), m_rte_cores.end(), rte_get_main_lcore())!=m_rte_cores.end()) {
109 TLOG() << "ERROR! Throw ERS error here that LCore=0 should not be used, as it's a control RTE core!";
110 throw std::runtime_error(std::string("ERROR! Throw ERS here that LCore=0 should not be used, as it's a control RTE core!"));
111 }
112
113 // iterate through active streams
114
115 // Create a map of sender ni (ip) to streams
116 std::map<std::string, std::map<uint, uint>> ip_to_stream_src_groups;
117
118 for( auto nw_sender : nw_senders ) {
119 auto sender_ni = nw_sender->get_uses();
120
121 std::string tx_ip = sender_ni->get_ip_address().at(0);
122
123 for ( auto det_stream : nw_sender->get_streams() ) {
124
125 uint32_t tx_geo_stream_id = det_stream->get_geo_id()->get_stream_id();
126 ip_to_stream_src_groups[tx_ip][tx_geo_stream_id] = det_stream->get_source_id();
127
128 }
129
130 }
131
132// RS FIXME: Is this RX_Q bump is enough??? I don't remember how the RX_Qs are assigned...
133 uint32_t core_idx(0), rx_q(0); // RS FIXME: Ensure that no RX_Q=0 is used for UDP RX, ever.
134
135 m_rx_qs.insert(rx_q);
136 m_arp_rx_queue = rx_q;
137 ++rx_q;
138
139 for( const auto& [tx_ip, strm_src] : ip_to_stream_src_groups) {
140 m_ips.insert(tx_ip);
141 m_rx_qs.insert(rx_q);
142 m_num_frames_rxq[rx_q] = { 0 };
143 m_num_bytes_rxq[rx_q] = { 0 };
144
145 m_rx_core_map[m_rte_cores[core_idx]][rx_q] = tx_ip;
146 m_stream_id_to_source_id[rx_q] = strm_src;
147
148 ++rx_q;
149 if ( ++core_idx == m_rte_cores.size()) {
150 core_idx = 0;
151 }
152 }
153
154 // Log mapping
155 for (auto const& [lcore, rx_qs] : m_rx_core_map) {
156 TLOG() << "Lcore=" << lcore << " handles: ";
157 for (auto const& [rx_q, src_ip] : rx_qs) {
158 TLOG() << " rx_q=" << rx_q << " src_ip=" << src_ip;
159 }
160 }
161
162 // Adding single TX queue for ARP responses
163 TLOG() << "Append TX_Q=0 for ARP responses.";
164 m_tx_qs.insert(0);
165
166}
167
168
169//-----------------------------------------------------------------------------
170IfaceWrapper::~IfaceWrapper()
171{
172 TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << "IfaceWrapper destructor called. First stop check, then closing iface.";
173
174 struct rte_flow_error error;
175 rte_flow_flush(m_iface_id, &error);
176 //graceful_stop();
177 //close_iface();
178 TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << "IfaceWrapper destroyed.";
179}
180
181
182//-----------------------------------------------------------------------------
183void
184IfaceWrapper::allocate_mbufs()
185{
186 TLOG() << "Allocating pools and mbufs for UDP, GARP, and ARP.";
187
188 // Pools for UDP RX messages
189 for (size_t i=0; i<m_rx_qs.size(); ++i) {
190 std::stringstream bufss;
191 bufss << "MBP-" << m_iface_id << '-' << i;
192 TLOG() << "Acquire pool with name=" << bufss.str() << " for iface_id=" << m_iface_id << " rxq=" << i;
193 m_mbuf_pools[i] = ealutils::get_mempool(bufss.str(), m_num_mbufs, m_mbuf_cache_size, 16384, m_socket_id);
194 m_bufs[i] = (rte_mbuf**) malloc(sizeof(struct rte_mbuf*) * m_burst_size);
195 // No need to alloc?
196 // rte_pktmbuf_alloc_bulk(m_mbuf_pools[i].get(), m_bufs[i], m_burst_size);
197 }
198
199 // Pools for GARP messages
200 std::stringstream garpss;
201 garpss << "GARPMBP-" << m_iface_id;
202 TLOG() << "Acquire GARP pool with name=" << garpss.str() << " for iface_id=" << m_iface_id;
203 m_garp_mbuf_pool = ealutils::get_mempool(garpss.str());
204 m_garp_bufs[0] = (rte_mbuf**) malloc(sizeof(struct rte_mbuf*) * m_burst_size);
205 rte_pktmbuf_alloc_bulk(m_garp_mbuf_pool.get(), m_garp_bufs[0], m_burst_size);
206
207 // Pools for ARP request/responses
208 std::stringstream arpss;
209 arpss << "ARPMBP-" << m_iface_id;
210 TLOG() << "Acquire ARP pool with name=" << arpss.str() << " for iface_id=" << m_iface_id;
211 m_arp_mbuf_pool = ealutils::get_mempool(arpss.str());
212 m_arp_bufs[0] = (rte_mbuf**) malloc(sizeof(struct rte_mbuf*) * m_burst_size);
213 rte_pktmbuf_alloc_bulk(m_arp_mbuf_pool.get(), m_arp_bufs[0], m_burst_size);
214
215}
216
217
218//-----------------------------------------------------------------------------
219void
220IfaceWrapper::setup_interface()
221{
222 TLOG() << "Initialize interface " << m_iface_id;
223 bool with_reset = true, with_mq_mode = true; // go to config
224 bool check_link_status = false;
225
226 int retval = ealutils::iface_init(m_iface_id, m_rx_qs.size(), m_tx_qs.size(), m_rx_ring_size, m_tx_ring_size, m_mbuf_pools, with_reset, with_mq_mode, check_link_status);
227 if (retval != 0 ) {
228 throw FailedToSetupInterface(ERS_HERE, m_iface_id, retval);
229 }
230 // Promiscuous mode
231 ealutils::iface_promiscuous_mode(m_iface_id, m_prom_mode); // should come from config
232}
233
234
235//-----------------------------------------------------------------------------
236void
237IfaceWrapper::setup_flow_steering()
238{
239 // Flow steering setup
240 TLOG() << "Configuring Flow steering rules for iface=" << m_iface_id;
241 struct rte_flow_error error;
242 struct rte_flow *flow;
243 TLOG() << "Attempt to flush previous flow rules...";
244 rte_flow_flush(m_iface_id, &error);
245#warning RS: FIXME -> Check for flow flush return!
246
247 TLOG() << "Create control flow rules (ARP) assinged to rxq=" << m_arp_rx_queue;
248 flow = generate_arp_flow(m_iface_id, m_arp_rx_queue, &error);
249 if (not flow) { // ers::fatal
250 TLOG() << "ARP flow can't be created for " << m_arp_rx_queue
251 << " Error type: " << (unsigned)error.type
252 << " Message: '" << error.message << "'";
253 ers::fatal(dunedaq::datahandlinglibs::InitializationError(
254 ERS_HERE, "Couldn't create ARP flow API rules!"));
255 rte_exit(EXIT_FAILURE, "error in creating ARP flow");
256 }
257
258 TLOG() << "Create flow rules for UDP RX.";
259 for (auto const& [lcoreid, rxqs] : m_rx_core_map) {
260 for (auto const& [rxqid, srcip] : rxqs) {
261 // Put the IP numbers temporarily in a vector, so they can be converted easily to uint32_t
262 TLOG() << "Creating flow rule for src_ip=" << srcip << " assigned to rxq=" << rxqid;
263 size_t ind = 0, current_ind = 0;
264 std::vector<uint8_t> v;
265 for (int i = 0; i < 4; ++i) {
266 v.push_back(std::stoi(srcip.substr(current_ind, srcip.size() - current_ind), &ind));
267 current_ind += ind + 1;
268 }
269
270 flow = generate_ipv4_flow(m_iface_id, rxqid,
271 RTE_IPV4(v[0], v[1], v[2], v[3]), 0xffffffff, 0, 0, &error);
272
273 if (not flow) { // ers::fatal
274 TLOG() << "Flow can't be created for " << rxqid
275 << " Error type: " << (unsigned)error.type
276 << " Message: '" << error.message << "'";
277 ers::fatal(dunedaq::datahandlinglibs::InitializationError(
278 ERS_HERE, "Couldn't create Flow API rules!"));
279 rte_exit(EXIT_FAILURE, "error in creating flow");
280 }
281 }
282 }
283
284 return;
285}
286
287//-----------------------------------------------------------------------------
288void
289IfaceWrapper::setup_xstats()
290{
291 // Stats setup
292 m_iface_xstats.setup(m_iface_id);
293 m_iface_xstats.reset_counters();
294}
295
296
297//-----------------------------------------------------------------------------
298void
299IfaceWrapper::start()
300{
301 // Reset counters for RX queues
302 for (auto const& [rx_q, _] : m_num_frames_rxq ) {
303 m_num_frames_rxq[rx_q] = { 0 };
304 m_num_bytes_rxq[rx_q] = { 0 };
305 m_num_full_bursts[rx_q] = { 0 };
306 m_max_burst_size[rx_q] = { 0 };
307 }
308
309 // Reset counters for rte_workers
310 for (auto const& [lcore, _] : m_rx_core_map) {
311 m_num_unhandled_non_ipv4[lcore] = { 0 };
312 m_num_unhandled_non_udp[lcore] = { 0 };
313 m_num_unhandled_non_jumbo_udp[lcore] = { 0 };
314 }
315
316 m_lcore_enable_flow.store(false);
317 m_lcore_quit_signal.store(false);
318 TLOG() << "Interface id=" << m_iface_id <<" Launching GARP thread with garp_func...";
319 m_garp_thread = std::thread(&IfaceWrapper::garp_func, this);
320
321 TLOG() << "Interface id=" << m_iface_id << " starting ARP LCore processor:";
322 m_arp_thread = std::thread(&IfaceWrapper::IfaceWrapper::arp_response_runner, this, nullptr);
323
324
325 TLOG() << "Interface id=" << m_iface_id << " starting LCore processors:";
326 for (auto const& [lcoreid, _] : m_rx_core_map) {
327 int ret = rte_eal_remote_launch((int (*)(void*))(&IfaceWrapper::rx_runner), this, lcoreid);
328 TLOG() << " -> LCore[" << lcoreid << "] launched with return code=" << ret << " " << (ret < 0 ? rte_strerror(-ret) : "");
329 }
330}
331
332//-----------------------------------------------------------------------------
333void
334IfaceWrapper::stop()
335{
336 m_lcore_enable_flow.store(false);
337 m_lcore_quit_signal.store(true);
338 // Stop GARP sender thread
339 if (m_garp_thread.joinable()) {
340 m_garp_thread.join();
341 } else {
342 TLOG() << "GARP thread is not joinable!";
343 }
344
345 if (m_arp_thread.joinable()) {
346 m_arp_thread.join();
347 } else {
348 TLOG() << "ARP thread is not joinable!";
349 }
350}
351/*
352void
353IfaceWrapper::scrap()
354{
355 struct rte_flow_error error;
356 rte_flow_flush(m_iface_id, &error);
357}
358*/
359
360
361//-----------------------------------------------------------------------------
362void
363IfaceWrapper::generate_opmon_data() {
364
365 // Poll stats from HW
366 m_iface_xstats.poll();
367
368 opmon::EthStats s;
369 s.set_ipackets( m_iface_xstats.m_eth_stats.ipackets );
370 s.set_opackets( m_iface_xstats.m_eth_stats.opackets );
371 s.set_ibytes( m_iface_xstats.m_eth_stats.ibytes );
372 s.set_obytes( m_iface_xstats.m_eth_stats.obytes );
373 s.set_imissed( m_iface_xstats.m_eth_stats.imissed );
374 s.set_ierrors( m_iface_xstats.m_eth_stats.ierrors );
375 s.set_oerrors( m_iface_xstats.m_eth_stats.oerrors );
376 s.set_rx_nombuf( m_iface_xstats.m_eth_stats.rx_nombuf );
377 publish( std::move(s) );
378
379 if(m_iface_xstats.m_eth_stats.imissed > 0){
380 ers::warning(PacketErrors(ERS_HERE, m_iface_id_str, "missed", m_iface_xstats.m_eth_stats.imissed));
381 }
382 if(m_iface_xstats.m_eth_stats.ierrors > 0){
383 ers::warning(PacketErrors(ERS_HERE, m_iface_id_str, "dropped", m_iface_xstats.m_eth_stats.ierrors));
384 }
385
386 // loop over all the xstats information
387 opmon::EthXStatsInfo xinfos;
388 opmon::EthXStatsErrors xerrs;
389 std::map<std::string, opmon::QueueEthXStats> xq;
390
391 for (int i = 0; i < m_iface_xstats.m_len; ++i) {
392
393 std::string name(m_iface_xstats.m_xstats_names[i].name);
394
395 // first we select the info from the queue
396 static std::regex queue_regex(R"((rx|tx)_q(\d+)_([^_]+))");
397 std::smatch match;
398
399 if ( std::regex_match(name, match, queue_regex) ) {
400 auto queue_name = match[1].str() + '-' + match[2].str();
401 auto & entry = xq[queue_name];
402 try {
403 opmonlib::set_value( entry, match[3], m_iface_xstats.m_xstats_values[i] );
404 } catch ( const ers::Issue & e ) {
405 ers::warning( MetricPublishFailed( ERS_HERE, name, e) );
406 }
407 continue;
408 }
409
410 google::protobuf::Message * metric_p = nullptr;
411 static std::regex err_regex(R"(.+error.*)");
412 if ( std::regex_match( name, err_regex ) ) metric_p = & xerrs;
413 else metric_p = & xinfos;
414
415 try {
416 opmonlib::set_value(*metric_p, name, m_iface_xstats.m_xstats_values[i]);
417 } catch ( const ers::Issue & e ) {
418 ers::warning( MetricPublishFailed( ERS_HERE, name, e) );
419 }
420
421 } // loop over xstats
422
423 // Reset HW counters
424 m_iface_xstats.reset_counters();
425
426 // finally we publish the information
427 publish( std::move(xinfos) );
428 publish( std::move(xerrs) );
429 for ( auto [id, stat] : xq ) {
430 publish( std::move(stat), {{"queue", id}} );
431 }
432
433 for( const auto& [src_rx_q,_] : m_num_frames_rxq) {
434 opmon::QueueInfo i;
435 i.set_packets_received( m_num_frames_rxq[src_rx_q].load() );
436 i.set_bytes_received( m_num_bytes_rxq[src_rx_q].load() );
437 i.set_full_rx_burst( m_num_full_bursts[src_rx_q].load() );
438 i.set_max_burst_size( m_max_burst_size[src_rx_q].exchange(0) );
439
440 publish( std::move(i), {{"queue", std::to_string(src_rx_q)}} );
441 }
442}
443
444//-----------------------------------------------------------------------------
445void
446IfaceWrapper::garp_func()
447{
448 TLOG() << "Launching GARP sender...";
449 while(m_run_marker.load()) {
450 for( const auto& ip_addr_bin : m_ip_addr_bin ) {
451 arp::pktgen_send_garp(m_garp_bufs[0][0], m_iface_id, ip_addr_bin);
452 }
453 ++m_garps_sent;
454 std::this_thread::sleep_for(std::chrono::seconds(1));
455 }
456 TLOG() << "GARP function joins.";
457}
458
459//-----------------------------------------------------------------------------
460void
461IfaceWrapper::handle_eth_payload(int src_rx_q, char* payload, std::size_t size)
462{
463 // Get DAQ Header and its StreamID
464 auto* daq_header = reinterpret_cast<dunedaq::detdataformats::DAQEthHeader*>(payload);
465 auto src_id = m_stream_id_to_source_id[src_rx_q][(unsigned)daq_header->stream_id];
466
467 if ( auto src_it = m_sources.find(src_id); src_it != m_sources.end()) {
468 src_it->second->handle_payload(payload, size);
469 } else {
470 // Really bad -> unexpeced StreamID in UDP Payload.
471 // This check is needed in order to avoid dynamically add thousands
472 // of Sources on the fly, in case the data corruption is extremely severe.
473 if (m_num_unexid_frames.count(src_id) == 0) {
474 m_num_unexid_frames[src_id] = 0;
475 }
476 m_num_unexid_frames[src_id]++;
477 }
478}
479
480} // namespace dpdklibs
481} // namespace dunedaq
482
483//
@ TLVL_ENTER_EXIT_METHODS
#define ERS_HERE
Base class for any user define issue.
Definition Issue.hpp:69
std::atomic< bool > run_marker
Global atomic for process lifetime.
#define TLOG_DEBUG(lvl,...)
Definition Logging.hpp:112
#define TLOG(...)
Definition macro.hpp:22
dunedaq::conffwk::relationship_t match(T const &, T const &)
void pktgen_send_garp(struct rte_mbuf *m, uint32_t port_id, rte_be32_t binary_ip_address)
Definition ARP.cpp:23
std::unique_ptr< rte_mempool > get_mempool(const std::string &pool_name, int num_mbufs=NUM_MBUFS, int mbuf_cache_size=MBUF_CACHE_SIZE, int data_room_size=9800, int socket_id=0)
Definition EALSetup.cpp:260
int iface_promiscuous_mode(std::uint16_t iface, bool mode=false)
Definition EALSetup.cpp:74
int iface_init(uint16_t iface, uint16_t rx_rings, uint16_t tx_rings, uint16_t rx_ring_size, uint16_t tx_ring_size, std::map< int, std::unique_ptr< rte_mempool > > &mbuf_pool, bool with_reset=false, bool with_mq_rss=false, bool check_link_status=false)
Definition EALSetup.cpp:95
rte_be32_t ip_address_dotdecimal_to_binary(std::uint8_t byte1, std::uint8_t byte2, std::uint8_t byte3, std::uint8_t byte4)
Definition Utils.cpp:41
struct rte_flow * generate_ipv4_flow(uint16_t port_id, uint16_t rx_q, uint32_t src_ip, uint32_t src_mask, uint32_t dest_ip, uint32_t dest_mask, struct rte_flow_error *error)
struct rte_flow * generate_arp_flow(uint16_t port_id, uint16_t rx_q, struct rte_flow_error *error)
void set_value(google::protobuf::Message &m, const std::string &name, T value)
Definition Utils.hxx:17
The DUNE-DAQ namespace.
Definition DataStore.hpp:57
FELIX Initialization std::string initerror FELIX queue timed std::string queuename Unexpected chunk size
default char v[0]
void warning(const Issue &issue)
Definition ers.hpp:115
void fatal(const Issue &issue)
Definition ers.hpp:88
DAQEthHeader is a versioned and unified structure for every FE electronics.
#define TLVL_ENTER_EXIT_METHODS
Definition Issues.hpp:24
Factory couldn t std::string alg_name Invalid configuration error
Definition Issues.hpp:34