9#ifndef DPDKLIBS_SRC_IFACEWRAPPER_HPP_
10#define DPDKLIBS_SRC_IFACEWRAPPER_HPP_
28#include <nlohmann/json.hpp>
38#include <folly/ProducerConsumerQueue.h>
44 "Field " << field <<
" was not reported",
50 "Unexpected stream ID " << src_id <<
" in UDP payoad. Total counter: " << counter,
51 ((
int)src_id)((
size_t)counter)
59 using sid_to_source_map_t = std::map<int, std::shared_ptr<SourceConcept>>;
62 const std::vector<const appmodel::NWDetDataSender*>& senders,
63 const std::vector<const confmodel::DetectorStream*>& active_streams,
64 sid_to_source_map_t& sources, std::atomic<bool>&
run_marker);
67 IfaceWrapper(
const IfaceWrapper&) =
delete;
68 IfaceWrapper& operator=(
const IfaceWrapper&) =
delete;
69 IfaceWrapper(IfaceWrapper&&) =
delete;
70 IfaceWrapper& operator=(IfaceWrapper&&) =
delete;
76 void generate_opmon_data()
override;
78 void allocate_mbufs();
79 void setup_interface();
80 void setup_flow_steering();
83 void enable_flow() { m_lcore_enable_flow.store(
true);}
84 void disable_flow() { m_lcore_enable_flow.store(
false);}
86 const std::vector<uint16_t>& get_rte_cores()
const {
return m_rte_cores; }
91 std::string m_iface_id_str;
96 std::vector<std::string> m_ip_addr;
97 std::vector<rte_be32_t> m_ip_addr_bin;
98 std::string m_mac_addr;
101 unsigned m_max_block_words;
102 uint16_t m_rx_ring_size;
103 uint16_t m_tx_ring_size;
106 uint32_t m_lcore_sleep_ns;
107 int m_mbuf_cache_size;
110 int m_num_ip_sources;
112 std::set<std::string> m_ips;
113 std::set<int> m_rx_qs;
114 std::set<int> m_tx_qs;
115 std::vector<uint16_t> m_rte_cores;
118 std::map<int, std::map<int, std::string>> m_rx_core_map;
119 unsigned m_arp_rx_queue = 0;
122 std::atomic<bool> m_lcore_quit_signal{
false };
124 std::atomic<bool> m_lcore_enable_flow{
false };
127 std::map<int, std::unique_ptr<rte_mempool>> m_mbuf_pools;
128 std::map<int, struct rte_mbuf **> m_bufs;
131 std::map<int, std::atomic<std::size_t>> m_num_frames_rxq;
132 std::map<int, std::atomic<std::size_t>> m_num_bytes_rxq;
133 std::map<int, std::atomic<std::size_t>> m_num_full_bursts;
134 std::map<int, std::atomic<uint16_t>> m_max_burst_size;
137 std::map<int, std::atomic<std::size_t>> m_num_unhandled_non_ipv4;
138 std::map<int, std::atomic<std::size_t>> m_num_unhandled_non_udp;
139 std::map<int, std::atomic<std::size_t>> m_num_unhandled_non_jumbo_udp;
142 std::map<int, std::atomic<std::size_t>> m_num_unexid_frames;
150 std::map<int, std::map<uint, uint>> m_stream_id_to_source_id;
151 sid_to_source_map_t& m_sources;
152 bool m_strict_parsing {
true};
155 std::atomic<bool>& m_run_marker;
158 std::unique_ptr<rte_mempool> m_garp_mbuf_pool;
159 std::map<int, struct rte_mbuf **> m_garp_bufs;
160 std::thread m_garp_thread;
162 std::atomic<uint64_t> m_garps_sent{0};
165 std::unique_ptr<rte_mempool> m_arp_mbuf_pool;
166 std::map<int, struct rte_mbuf **> m_arp_bufs;
167 std::thread m_arp_thread;
169 std::atomic<uint64_t> m_arps_sent{0};
172 int rx_runner(
void *arg __rte_unused);
173 int arp_response_runner(
void *arg __rte_unused);
176 void parse_udp_payload(
int src_rx_q,
char* payload, std::size_t
size);
179 void passthrough_udp_payload(
int src_rx_q,
char* payload, std::size_t
size);
#define ERS_DECLARE_ISSUE(namespace_name, class_name, message, attributes)
std::atomic< bool > run_marker
Global atomic for process lifetime.
FELIX Initialization std::string initerror FELIX queue timed std::string queuename Unexpected chunk size