DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
IfaceWrapper.hpp
Go to the documentation of this file.
1
9#ifndef DPDKLIBS_SRC_IFACEWRAPPER_HPP_
10#define DPDKLIBS_SRC_IFACEWRAPPER_HPP_
11
12//#include "dpdklibs/nicreader/Structs.hpp"
14
15#include "dpdklibs/EALSetup.hpp"
18#include "dpdklibs/arp/ARP.hpp"
21#include "SourceConcept.hpp"
22
23#include <confmodel/Session.hpp>
24// #include <confmodel/NetworkDevice.hpp>
27
28#include <nlohmann/json.hpp>
29
30#include <ers/ers.hpp>
31#include "logging/Logging.hpp" // NOTE: if ISSUES ARE DECLARED BEFORE include logging/Logging.hpp, TLOG_DEBUG<<issue wont work.
32
33#include <memory>
34#include <sstream>
35#include <string>
36#include <set>
37
38#include <folly/ProducerConsumerQueue.h>
39
40namespace dunedaq {
41
43 MetricPublishFailed,
44 "Field " << field << " was not reported",
45 ((std::string)field)
46 )
47
48namespace dpdklibs {
49
50 class IfaceWrapper : public opmonlib::MonitorableObject
51{
52public:
53 using sid_to_source_map_t = std::map<int, std::shared_ptr<SourceConcept>>;
54
55 IfaceWrapper(uint iface_id, const appmodel::DPDKReceiver* receiver,
56 const std::vector<const appmodel::NWDetDataSender*>& senders,
57 sid_to_source_map_t& sources, std::atomic<bool>& run_marker);
58 ~IfaceWrapper();
59
60 IfaceWrapper(const IfaceWrapper&) = delete;
61 IfaceWrapper& operator=(const IfaceWrapper&) = delete;
62 IfaceWrapper(IfaceWrapper&&) = delete;
63 IfaceWrapper& operator=(IfaceWrapper&&) = delete;
64
65 //void init();
66 void start();
67 void stop();
68
69 void generate_opmon_data() override;
70
71 void allocate_mbufs();
72 void setup_interface();
73 void setup_flow_steering();
74 void setup_xstats();
75
76 void enable_flow() { m_lcore_enable_flow.store(true);}
77 void disable_flow() { m_lcore_enable_flow.store(false);}
78
79 const std::vector<uint16_t>& get_rte_cores() const { return m_rte_cores; }
80
81protected:
82 //iface_conf_t m_cfg;
83 int m_iface_id;
84 std::string m_iface_id_str;
85 bool m_configured;
86
87 bool m_with_flow;
88 bool m_prom_mode;
89 std::vector<std::string> m_ip_addr;
90 std::vector<rte_be32_t> m_ip_addr_bin;
91 std::string m_mac_addr;
92 int m_socket_id;
93 int m_mtu;
94 uint16_t m_rx_ring_size;
95 uint16_t m_tx_ring_size;
96 int m_num_mbufs;
97 int m_burst_size;
98 uint32_t m_lcore_sleep_ns;
99 int m_mbuf_cache_size;
100
101private:
102 int m_num_ip_sources;
103 int m_num_rx_cores;
104 std::set<std::string> m_ips;
105 std::set<int> m_rx_qs;
106 std::set<int> m_tx_qs;
107 std::vector<uint16_t> m_rte_cores;
108
109 // CPU core ID -> [queue -> ip]
110 std::map<int, std::map<int, std::string>> m_rx_core_map;
111 unsigned m_arp_rx_queue = 0; // RS TODO: make it configurable, and queue use conf check for exclusiveness!
112
113 // Lcore stop signal
114 std::atomic<bool> m_lcore_quit_signal{ false };
115
116 std::atomic<bool> m_lcore_enable_flow{ false };
117
118 // Mbufs and pools
119 std::map<int, std::unique_ptr<rte_mempool>> m_mbuf_pools;
120 std::map<int, struct rte_mbuf **> m_bufs; // by queue
121
122 // Stats by queues
123 std::map<int, std::atomic<std::size_t>> m_num_frames_rxq;
124 std::map<int, std::atomic<std::size_t>> m_num_bytes_rxq;
125 std::map<int, std::atomic<std::size_t>> m_num_unexid_frames;
126 std::map<int, std::atomic<std::size_t>> m_num_full_bursts;
127 std::map<int, std::atomic<uint16_t>> m_max_burst_size;
128
129 // DPDK HW stats
130 dpdklibs::IfaceXstats m_iface_xstats;
131
132 // stream -> source id map indexed by queue id
133 // queue -> [stream_id -> sid]
134 std::map<int, std::map<uint, uint>> m_stream_id_to_source_id;
135 sid_to_source_map_t& m_sources;
136
137 // Run marker
138 std::atomic<bool>& m_run_marker;
139
140 // GARP
141 std::unique_ptr<rte_mempool> m_garp_mbuf_pool;
142 std::map<int, struct rte_mbuf **> m_garp_bufs;
143 std::thread m_garp_thread;
144 void garp_func();
145 std::atomic<uint64_t> m_garps_sent{0};
146
147 // ARP
148 std::unique_ptr<rte_mempool> m_arp_mbuf_pool;
149 std::map<int, struct rte_mbuf **> m_arp_bufs;
150 std::thread m_arp_thread;
151 void arp_func();
152 std::atomic<uint64_t> m_arps_sent{0};
153
154 // Lcore processor
155 int rx_runner(void *arg __rte_unused);
156 int arp_response_runner(void *arg __rte_unused);
157
158 // What to do with every payload
159 void handle_eth_payload(int src_rx_q, char* payload, std::size_t size);
160
161};
162
163} // namespace dpdklibs
164} // namespace dunedaq
165
166// #include "detail/IfaceWrapper.hxx"
167
168#endif // DPDKLIBS_SRC_IFACEWRAPPER_HPP_
#define ERS_DECLARE_ISSUE(namespace_name, class_name, message, attributes)
std::atomic< bool > run_marker
Global atomic for process lifetime.
Including Qt Headers.
FELIX Initialization std::string initerror FELIX queue timed std::string queuename Unexpected chunk size