9#ifndef DPDKLIBS_SRC_IFACEWRAPPER_HPP_
10#define DPDKLIBS_SRC_IFACEWRAPPER_HPP_
28#include <nlohmann/json.hpp>
38#include <folly/ProducerConsumerQueue.h>
44 "Field " << field <<
" was not reported",
50 "Unexpected stream ID " << src_id <<
" in UDP payoad. Total counter: " << counter,
51 ((
int)src_id)((
size_t)counter)
59 using sid_to_source_map_t = std::map<int, std::shared_ptr<SourceConcept>>;
62 const std::vector<const appmodel::NWDetDataSender*>& senders,
63 sid_to_source_map_t& sources, std::atomic<bool>&
run_marker);
66 IfaceWrapper(
const IfaceWrapper&) =
delete;
67 IfaceWrapper& operator=(
const IfaceWrapper&) =
delete;
68 IfaceWrapper(IfaceWrapper&&) =
delete;
69 IfaceWrapper& operator=(IfaceWrapper&&) =
delete;
75 void generate_opmon_data()
override;
77 void allocate_mbufs();
78 void setup_interface();
79 void setup_flow_steering();
82 void enable_flow() { m_lcore_enable_flow.store(
true);}
83 void disable_flow() { m_lcore_enable_flow.store(
false);}
85 const std::vector<uint16_t>& get_rte_cores()
const {
return m_rte_cores; }
90 std::string m_iface_id_str;
95 std::vector<std::string> m_ip_addr;
96 std::vector<rte_be32_t> m_ip_addr_bin;
97 std::string m_mac_addr;
100 unsigned m_max_block_words;
101 uint16_t m_rx_ring_size;
102 uint16_t m_tx_ring_size;
105 uint32_t m_lcore_sleep_ns;
106 int m_mbuf_cache_size;
109 int m_num_ip_sources;
111 std::set<std::string> m_ips;
112 std::set<int> m_rx_qs;
113 std::set<int> m_tx_qs;
114 std::vector<uint16_t> m_rte_cores;
117 std::map<int, std::map<int, std::string>> m_rx_core_map;
118 unsigned m_arp_rx_queue = 0;
121 std::atomic<bool> m_lcore_quit_signal{
false };
123 std::atomic<bool> m_lcore_enable_flow{
false };
126 std::map<int, std::unique_ptr<rte_mempool>> m_mbuf_pools;
127 std::map<int, struct rte_mbuf **> m_bufs;
130 std::map<int, std::atomic<std::size_t>> m_num_frames_rxq;
131 std::map<int, std::atomic<std::size_t>> m_num_bytes_rxq;
132 std::map<int, std::atomic<std::size_t>> m_num_full_bursts;
133 std::map<int, std::atomic<uint16_t>> m_max_burst_size;
136 std::map<int, std::atomic<std::size_t>> m_num_unhandled_non_ipv4;
137 std::map<int, std::atomic<std::size_t>> m_num_unhandled_non_udp;
138 std::map<int, std::atomic<std::size_t>> m_num_unhandled_non_jumbo_udp;
141 std::map<int, std::atomic<std::size_t>> m_num_unexid_frames;
149 std::map<int, std::map<uint, uint>> m_stream_id_to_source_id;
150 sid_to_source_map_t& m_sources;
151 bool m_strict_parsing {
true};
154 std::atomic<bool>& m_run_marker;
157 std::unique_ptr<rte_mempool> m_garp_mbuf_pool;
158 std::map<int, struct rte_mbuf **> m_garp_bufs;
159 std::thread m_garp_thread;
161 std::atomic<uint64_t> m_garps_sent{0};
164 std::unique_ptr<rte_mempool> m_arp_mbuf_pool;
165 std::map<int, struct rte_mbuf **> m_arp_bufs;
166 std::thread m_arp_thread;
168 std::atomic<uint64_t> m_arps_sent{0};
171 int rx_runner(
void *arg __rte_unused);
172 int arp_response_runner(
void *arg __rte_unused);
175 void parse_udp_payload(
int src_rx_q,
char* payload, std::size_t
size);
178 void passthrough_udp_payload(
int src_rx_q,
char* payload, std::size_t
size);
#define ERS_DECLARE_ISSUE(namespace_name, class_name, message, attributes)
std::atomic< bool > run_marker
Global atomic for process lifetime.
FELIX Initialization std::string initerror FELIX queue timed std::string queuename Unexpected chunk size