DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
IfaceWrapper.hpp
Go to the documentation of this file.
1
9#ifndef DPDKLIBS_SRC_IFACEWRAPPER_HPP_
10#define DPDKLIBS_SRC_IFACEWRAPPER_HPP_
11
12//#include "dpdklibs/nicreader/Structs.hpp"
14
15#include "dpdklibs/EALSetup.hpp"
18#include "dpdklibs/arp/ARP.hpp"
21#include "SourceConcept.hpp"
22
23#include <confmodel/Session.hpp>
24// #include <confmodel/NetworkDevice.hpp>
27
28#include <nlohmann/json.hpp>
29
30#include <ers/ers.hpp>
31#include "logging/Logging.hpp" // NOTE: if ISSUES ARE DECLARED BEFORE include logging/Logging.hpp, TLOG_DEBUG<<issue wont work.
32
33#include <memory>
34#include <sstream>
35#include <string>
36#include <set>
37
38#include <folly/ProducerConsumerQueue.h>
39
40namespace dunedaq {
41
43 MetricPublishFailed,
44 "Field " << field << " was not reported",
45 ((std::string)field)
46 )
47
50 "Unexpected stream ID " << src_id << " in UDP payoad. Total counter: " << counter,
51 ((int)src_id)((size_t)counter)
52 )
53
54namespace dpdklibs {
55
56 class IfaceWrapper : public opmonlib::MonitorableObject
57{
58public:
59 using sid_to_source_map_t = std::map<int, std::shared_ptr<SourceConcept>>;
60
61 IfaceWrapper(uint iface_id, const appmodel::DPDKReceiver* receiver,
62 const std::vector<const appmodel::NWDetDataSender*>& senders,
63 sid_to_source_map_t& sources, std::atomic<bool>& run_marker);
64 ~IfaceWrapper();
65
66 IfaceWrapper(const IfaceWrapper&) = delete;
67 IfaceWrapper& operator=(const IfaceWrapper&) = delete;
68 IfaceWrapper(IfaceWrapper&&) = delete;
69 IfaceWrapper& operator=(IfaceWrapper&&) = delete;
70
71 //void init();
72 void start();
73 void stop();
74
75 void generate_opmon_data() override;
76
77 void allocate_mbufs();
78 void setup_interface();
79 void setup_flow_steering();
80 void setup_xstats();
81
82 void enable_flow() { m_lcore_enable_flow.store(true);}
83 void disable_flow() { m_lcore_enable_flow.store(false);}
84
85 const std::vector<uint16_t>& get_rte_cores() const { return m_rte_cores; }
86
87protected:
88 //iface_conf_t m_cfg;
89 int m_iface_id;
90 std::string m_iface_id_str;
91 bool m_configured;
92
93 bool m_with_flow;
94 bool m_prom_mode;
95 std::vector<std::string> m_ip_addr;
96 std::vector<rte_be32_t> m_ip_addr_bin;
97 std::string m_mac_addr;
98 int m_socket_id;
99 int m_mtu;
100 unsigned m_max_block_words;
101 uint16_t m_rx_ring_size;
102 uint16_t m_tx_ring_size;
103 int m_num_mbufs;
104 int m_burst_size;
105 uint32_t m_lcore_sleep_ns;
106 int m_mbuf_cache_size;
107
108private:
109 int m_num_ip_sources;
110 int m_num_rx_cores;
111 std::set<std::string> m_ips;
112 std::set<int> m_rx_qs;
113 std::set<int> m_tx_qs;
114 std::vector<uint16_t> m_rte_cores;
115
116 // CPU core ID -> [queue -> ip]
117 std::map<int, std::map<int, std::string>> m_rx_core_map;
118 unsigned m_arp_rx_queue = 0; // RS TODO: make it configurable, and queue use conf check for exclusiveness!
119
120 // Lcore stop signal
121 std::atomic<bool> m_lcore_quit_signal{ false };
122
123 std::atomic<bool> m_lcore_enable_flow{ false };
124
125 // Mbufs and pools
126 std::map<int, std::unique_ptr<rte_mempool>> m_mbuf_pools;
127 std::map<int, struct rte_mbuf **> m_bufs; // by queue
128
129 // Stats by queues
130 std::map<int, std::atomic<std::size_t>> m_num_frames_rxq;
131 std::map<int, std::atomic<std::size_t>> m_num_bytes_rxq;
132 std::map<int, std::atomic<std::size_t>> m_num_full_bursts;
133 std::map<int, std::atomic<uint16_t>> m_max_burst_size;
134
135 // Stats by rte_workers
136 std::map<int, std::atomic<std::size_t>> m_num_unhandled_non_ipv4;
137 std::map<int, std::atomic<std::size_t>> m_num_unhandled_non_udp;
138 std::map<int, std::atomic<std::size_t>> m_num_unhandled_non_jumbo_udp;
139
140 // Unexpected stream ID count
141 std::map<int, std::atomic<std::size_t>> m_num_unexid_frames;
142
143
144 // DPDK HW stats
145 dpdklibs::IfaceXstats m_iface_xstats;
146
147 // stream -> source id map indexed by queue id
148 // queue -> [stream_id -> sid]
149 std::map<int, std::map<uint, uint>> m_stream_id_to_source_id;
150 sid_to_source_map_t& m_sources;
151 bool m_strict_parsing {true};
152
153 // Run marker
154 std::atomic<bool>& m_run_marker;
155
156 // GARP
157 std::unique_ptr<rte_mempool> m_garp_mbuf_pool;
158 std::map<int, struct rte_mbuf **> m_garp_bufs;
159 std::thread m_garp_thread;
160 void garp_func();
161 std::atomic<uint64_t> m_garps_sent{0};
162
163 // ARP
164 std::unique_ptr<rte_mempool> m_arp_mbuf_pool;
165 std::map<int, struct rte_mbuf **> m_arp_bufs;
166 std::thread m_arp_thread;
167 void arp_func();
168 std::atomic<uint64_t> m_arps_sent{0};
169
170 // Lcore processor
171 int rx_runner(void *arg __rte_unused);
172 int arp_response_runner(void *arg __rte_unused);
173
174 // Parse UDP payloads as DAQ frames
175 void parse_udp_payload(int src_rx_q, char* payload, std::size_t size);
176
177 // Pass through UDP payloads as is
178 void passthrough_udp_payload(int src_rx_q, char* payload, std::size_t size);
179
180};
181
182} // namespace dpdklibs
183} // namespace dunedaq
184
185// #include "detail/IfaceWrapper.hxx"
186
187#endif // DPDKLIBS_SRC_IFACEWRAPPER_HPP_
#define ERS_DECLARE_ISSUE(namespace_name, class_name, message, attributes)
std::atomic< bool > run_marker
Global atomic for process lifetime.
The DUNE-DAQ namespace.
FELIX Initialization std::string initerror FELIX queue timed std::string queuename Unexpected chunk size