Line data Source code
1 : /**
2 : * @file IfaceWrapper.hpp IfaceWrapper for holding resources of
3 : * a DPDK controlled NIC interface/port
4 : *
5 : * This is part of the DUNE DAQ , copyright 2020.
6 : * Licensing/copyright details are in the COPYING file that you should have
7 : * received with this code.
8 : */
9 : #ifndef DPDKLIBS_SRC_IFACEWRAPPER_HPP_
10 : #define DPDKLIBS_SRC_IFACEWRAPPER_HPP_
11 :
12 : //#include "dpdklibs/nicreader/Structs.hpp"
13 : #include "confmodel/NetworkDevice.hpp"
14 :
15 : #include "dpdklibs/EALSetup.hpp"
16 : #include "dpdklibs/udp/Utils.hpp"
17 : #include "dpdklibs/udp/PacketCtor.hpp"
18 : #include "dpdklibs/arp/ARP.hpp"
19 : #include "dpdklibs/ipv4_addr.hpp"
20 : #include "dpdklibs/XstatsHelper.hpp"
21 : #include "SourceConcept.hpp"
22 :
23 : #include <confmodel/Session.hpp>
24 : // #include <confmodel/NetworkDevice.hpp>
25 : #include "appmodel/DPDKReceiver.hpp"
26 : #include "appmodel/NWDetDataSender.hpp"
27 :
28 : #include <nlohmann/json.hpp>
29 :
30 : #include <ers/ers.hpp>
31 : #include "logging/Logging.hpp" // NOTE: if ISSUES ARE DECLARED BEFORE include logging/Logging.hpp, TLOG_DEBUG<<issue wont work.
32 :
33 : #include <memory>
34 : #include <sstream>
35 : #include <string>
36 : #include <set>
37 :
38 : #include <folly/ProducerConsumerQueue.h>
39 :
40 : namespace dunedaq {
41 :
42 2 : ERS_DECLARE_ISSUE( dpdklibs,
43 : MetricPublishFailed,
44 : "Field " << field << " was not reported",
45 : ((std::string)field)
46 : )
47 :
48 2 : ERS_DECLARE_ISSUE( dpdklibs,
49 : UnexpectedStreamID,
50 : "Unexpected stream ID " << src_id << " in UDP payoad. Total counter: " << counter,
51 : ((int)src_id)((size_t)counter)
52 : )
53 :
54 : namespace dpdklibs {
55 :
56 : class IfaceWrapper : public opmonlib::MonitorableObject
57 : {
58 : public:
59 : using sid_to_source_map_t = std::map<int, std::shared_ptr<SourceConcept>>;
60 :
61 : IfaceWrapper(uint iface_id, const appmodel::DPDKReceiver* receiver,
62 : const std::vector<const appmodel::NWDetDataSender*>& senders,
63 : const std::vector<const confmodel::DetectorStream*>& active_streams,
64 : sid_to_source_map_t& sources, std::atomic<bool>& run_marker);
65 : ~IfaceWrapper();
66 :
67 : IfaceWrapper(const IfaceWrapper&) = delete; ///< IfaceWrapper is not copy-constructible
68 : IfaceWrapper& operator=(const IfaceWrapper&) = delete; ///< IfaceWrapper is not copy-assginable
69 : IfaceWrapper(IfaceWrapper&&) = delete; ///< IfaceWrapper is not move-constructible
70 : IfaceWrapper& operator=(IfaceWrapper&&) = delete; ///< IfaceWrapper is not move-assignable
71 :
72 : //void init();
73 : void start();
74 : void stop();
75 :
76 : void generate_opmon_data() override;
77 :
78 : void allocate_mbufs();
79 : void setup_interface();
80 : void setup_flow_steering();
81 : void setup_xstats();
82 :
83 0 : void enable_flow() { m_lcore_enable_flow.store(true);}
84 0 : void disable_flow() { m_lcore_enable_flow.store(false);}
85 :
86 : const std::vector<uint16_t>& get_rte_cores() const { return m_rte_cores; }
87 :
88 : protected:
89 : //iface_conf_t m_cfg;
90 : int m_iface_id;
91 : std::string m_iface_id_str;
92 : bool m_configured;
93 :
94 : bool m_with_flow;
95 : bool m_prom_mode;
96 : std::vector<std::string> m_ip_addr;
97 : std::vector<rte_be32_t> m_ip_addr_bin;
98 : std::string m_mac_addr;
99 : int m_socket_id;
100 : int m_mtu;
101 : unsigned m_max_block_words;
102 : uint16_t m_rx_ring_size;
103 : uint16_t m_tx_ring_size;
104 : int m_num_mbufs;
105 : int m_burst_size;
106 : uint32_t m_lcore_sleep_ns;
107 : int m_mbuf_cache_size;
108 :
109 : private:
110 : int m_num_ip_sources;
111 : int m_num_rx_cores;
112 : std::set<std::string> m_ips;
113 : std::set<int> m_rx_qs;
114 : std::set<int> m_tx_qs;
115 : std::vector<uint16_t> m_rte_cores;
116 :
117 : // CPU core ID -> [queue -> ip]
118 : std::map<int, std::map<int, std::string>> m_rx_core_map;
119 : unsigned m_arp_rx_queue = 0; // RS TODO: make it configurable, and queue use conf check for exclusiveness!
120 :
121 : // Lcore stop signal
122 : std::atomic<bool> m_lcore_quit_signal{ false };
123 :
124 : std::atomic<bool> m_lcore_enable_flow{ false };
125 :
126 : // Mbufs and pools
127 : std::map<int, std::unique_ptr<rte_mempool>> m_mbuf_pools;
128 : std::map<int, struct rte_mbuf **> m_bufs; // by queue
129 :
130 : // Stats by queues
131 : std::map<int, std::atomic<std::size_t>> m_num_frames_rxq;
132 : std::map<int, std::atomic<std::size_t>> m_num_bytes_rxq;
133 : std::map<int, std::atomic<std::size_t>> m_num_full_bursts;
134 : std::map<int, std::atomic<uint16_t>> m_max_burst_size;
135 :
136 : // Stats by rte_workers
137 : std::map<int, std::atomic<std::size_t>> m_num_unhandled_non_ipv4;
138 : std::map<int, std::atomic<std::size_t>> m_num_unhandled_non_udp;
139 : std::map<int, std::atomic<std::size_t>> m_num_unhandled_non_jumbo_udp;
140 :
141 : // Unexpected stream ID count
142 : std::map<int, std::atomic<std::size_t>> m_num_unexid_frames;
143 :
144 :
145 : // DPDK HW stats
146 : dpdklibs::IfaceXstats m_iface_xstats;
147 :
148 : // stream -> source id map indexed by queue id
149 : // queue -> [stream_id -> sid]
150 : std::map<int, std::map<uint, uint>> m_stream_id_to_source_id;
151 : sid_to_source_map_t& m_sources;
152 : bool m_strict_parsing {true};
153 :
154 : // Run marker
155 : std::atomic<bool>& m_run_marker;
156 :
157 : // GARP
158 : std::unique_ptr<rte_mempool> m_garp_mbuf_pool;
159 : std::map<int, struct rte_mbuf **> m_garp_bufs;
160 : std::thread m_garp_thread;
161 : void garp_func();
162 : std::atomic<uint64_t> m_garps_sent{0};
163 :
164 : // ARP
165 : std::unique_ptr<rte_mempool> m_arp_mbuf_pool;
166 : std::map<int, struct rte_mbuf **> m_arp_bufs;
167 : std::thread m_arp_thread;
168 : void arp_func();
169 : std::atomic<uint64_t> m_arps_sent{0};
170 :
171 : // Lcore processor
172 : int rx_runner(void *arg __rte_unused);
173 : int arp_response_runner(void *arg __rte_unused);
174 :
175 : // Parse UDP payloads as DAQ frames
176 : void parse_udp_payload(int src_rx_q, char* payload, std::size_t size);
177 :
178 : // Pass through UDP payloads as is
179 : void passthrough_udp_payload(int src_rx_q, char* payload, std::size_t size);
180 :
181 : };
182 :
183 : } // namespace dpdklibs
184 : } // namespace dunedaq
185 :
186 : // #include "detail/IfaceWrapper.hxx"
187 :
188 : #endif // DPDKLIBS_SRC_IFACEWRAPPER_HPP_
|