Line data Source code
1 : /**
2 : * @file IfaceWrapper.cpp DPDK based Interface wrapper
3 : *
4 : * This is part of the DUNE DAQ , copyright 2020.
5 : * Licensing/copyright details are in the COPYING file that you should have
6 : * received with this code.
7 : */
8 : #include "logging/Logging.hpp"
9 : #include "datahandlinglibs/DataHandlingIssues.hpp"
10 :
11 : #include "opmonlib/Utils.hpp"
12 :
13 : #include "dpdklibs/Issues.hpp"
14 :
15 : #include "dpdklibs/nicreader/Structs.hpp"
16 :
17 : #include "dpdklibs/EALSetup.hpp"
18 : #include "dpdklibs/FlowControl.hpp"
19 : #include "dpdklibs/udp/PacketCtor.hpp"
20 : #include "dpdklibs/udp/Utils.hpp"
21 : #include "dpdklibs/arp/ARP.hpp"
22 : #include "dpdklibs/ipv4_addr.hpp"
23 : #include "IfaceWrapper.hpp"
24 :
25 : #include "appfwk/ConfigurationManager.hpp"
26 : // #include "confmodel/DROStreamConf.hpp"
27 : // #include "confmodel/StreamParameters.hpp"
28 : #include "confmodel/GeoId.hpp"
29 : #include "confmodel/DetectorStream.hpp"
30 : #include "confmodel/ProcessingResource.hpp"
31 : #include "appmodel/DPDKPortConfiguration.hpp"
32 : // #include "confmodel/NetworkDevice.hpp"
33 : // #include "appmodel/NICInterfaceConfiguration.hpp"
34 : // #include "appmodel/NICStatsConf.hpp"
35 : // #include "appmodel/EthStreamParameters.hpp"
36 :
37 : #include "dpdklibs/opmon/IfaceWrapper.pb.h"
38 :
39 : #include <chrono>
40 : #include <memory>
41 : #include <string>
42 : #include <regex>
43 : #include <stdexcept>
44 : #include <format>
45 :
46 : /**
47 : * @brief TRACE debug levels used in this source file
48 : */
49 : enum
50 : {
51 : TLVL_ENTER_EXIT_METHODS = 5,
52 : TLVL_WORK_STEPS = 10,
53 : TLVL_BOOKKEEPING = 15
54 : };
55 :
56 : namespace dunedaq {
57 : namespace dpdklibs {
58 :
59 :
60 : //-----------------------------------------------------------------------------
61 : // TODO: the constructor signature shall be reviewd.
62 : // The current constructor takes a set of largely correlated arguments
63 : // - a receiver object
64 : // - the list of active senders
65 : // - the list of active streams
66 : //
67 : // These arguments are created by applying the enable mask to detector2daq connection object.
68 : // They are preferred to the d2d object not to expose the IfaceWrapper code to the System class
69 0 : IfaceWrapper::IfaceWrapper(
70 : uint iface_id,
71 : const appmodel::DPDKReceiver* receiver,
72 : const std::vector<const appmodel::NWDetDataSender*>& nw_senders,
73 : const std::vector<const confmodel::DetectorStream*>& active_streams,
74 : sid_to_source_map_t& sources,
75 : std::atomic<bool>& run_marker
76 0 : )
77 0 : : m_sources(sources)
78 0 : , m_run_marker(run_marker)
79 : {
80 :
81 : // Arguments consistency check: collect source ids in senders
82 0 : std::set<int> src_in_d2d;
83 :
84 0 : for( auto& det_stream : active_streams ) {
85 0 : src_in_d2d.insert(det_stream->get_source_id());
86 : }
87 :
88 : // Arguments consistency check: collect source ids in source model map
89 0 : std::set<int> src_models;
90 0 : for( const auto& [src_id, _] : m_sources ) {
91 0 : src_models.insert(src_id);
92 : }
93 :
94 : // check that the 2 sets are identical.
95 0 : if (!std::includes(src_models.begin(), src_models.end(), src_in_d2d.begin(), src_in_d2d.end())) {
96 :
97 : // TODO: remove, possibly
98 0 : for ( auto src : src_models )
99 0 : TLOG_DEBUG(TLVL_BOOKKEEPING) << "model srcid " << src;
100 0 : for ( auto src : src_in_d2d )
101 0 : TLOG_DEBUG(TLVL_BOOKKEEPING) << "d2d srcid " << src;
102 :
103 : // D2D sources are not included in the source model list
104 : // Extract the differences: src_in_d2d - src_models
105 :
106 0 : std::vector<int> src_missing;
107 0 : std::set_difference(src_models.begin(), src_models.end(),
108 : src_in_d2d.begin(), src_in_d2d.end(),
109 : std::back_inserter(src_missing));
110 :
111 0 : std::stringstream ss;
112 0 : for( int src : src_missing ) {
113 0 : ss << src << " ";
114 : }
115 :
116 : // TLOG() << std::format("WARNING : these source ids are present in the d2d connection but no corresponding source objects are found {}", ss.str());
117 0 : throw MissingSourceIDOutputs(ERS_HERE, m_iface_id, ss.str());
118 0 : }
119 :
120 0 : auto net_device = receiver->get_uses();
121 :
122 0 : m_iface_id = iface_id;
123 0 : m_mac_addr = net_device->get_mac_address();
124 0 : m_ip_addr = net_device->get_ip_address();
125 :
126 0 : TLOG() << "Building IfaceWrapper " << m_iface_id;
127 0 : std::stringstream s;
128 0 : s << 'IfaceWrapper (port ' << m_iface_id << ") responding to : ";
129 0 : for( const std::string& ip_addr : m_ip_addr) {
130 0 : s << ip_addr << " ";
131 : }
132 :
133 0 : TLOG() << s.str();
134 :
135 0 : for( const std::string& ip_addr : m_ip_addr) {
136 0 : IpAddr ip_addr_struct(ip_addr);
137 0 : m_ip_addr_bin.push_back(udp::ip_address_dotdecimal_to_binary(
138 0 : ip_addr_struct.addr_bytes[0],
139 0 : ip_addr_struct.addr_bytes[1],
140 0 : ip_addr_struct.addr_bytes[2],
141 0 : ip_addr_struct.addr_bytes[3]
142 : ));
143 : }
144 :
145 :
146 0 : auto iface_cfg = receiver->get_configuration();
147 :
148 0 : m_with_flow = iface_cfg->get_flow_control();
149 0 : m_prom_mode = iface_cfg->get_promiscuous_mode();;
150 0 : m_mtu = iface_cfg->get_mtu();
151 0 : m_max_block_words = unsigned(m_mtu) / sizeof(uint64_t);
152 0 : m_rx_ring_size = iface_cfg->get_rx_ring_size();
153 0 : m_tx_ring_size = iface_cfg->get_tx_ring_size();
154 0 : m_num_mbufs = iface_cfg->get_num_bufs();
155 0 : m_burst_size = iface_cfg->get_burst_size();
156 0 : m_mbuf_cache_size = iface_cfg->get_mbuf_cache_size();
157 :
158 0 : m_lcore_sleep_ns = iface_cfg->get_lcore_sleep_us() * 1000;
159 0 : m_socket_id = rte_eth_dev_socket_id(m_iface_id);
160 :
161 0 : m_iface_id_str = iface_cfg->UID();
162 :
163 :
164 : // Here is my list of cores
165 0 : for( const auto* proc_res : iface_cfg->get_used_lcores()) {
166 0 : m_rte_cores.insert(m_rte_cores.end(), proc_res->get_cpu_cores().begin(), proc_res->get_cpu_cores().end());
167 : }
168 0 : if(std::find(m_rte_cores.begin(), m_rte_cores.end(), rte_get_main_lcore())!=m_rte_cores.end()) {
169 0 : TLOG() << "ERROR! Throw ERS error here that LCore=0 should not be used, as it's a control RTE core!";
170 0 : throw std::runtime_error(std::string("ERROR! Throw ERS here that LCore=0 should not be used, as it's a control RTE core!"));
171 : }
172 :
173 : // iterate through active streams
174 :
175 : // Create a map of sender ni (ip) to streams from the d2d connection object
176 0 : std::map<std::string, std::map<uint, uint>> ip_to_stream_src_groups;
177 :
178 0 : for( auto nw_sender : nw_senders ) {
179 0 : auto sender_ni = nw_sender->get_uses();
180 :
181 0 : std::string tx_ip = sender_ni->get_ip_address().at(0);
182 :
183 : // Loop over streams
184 0 : for ( auto det_stream : nw_sender->get_streams() ) {
185 :
186 : // Only include active streams
187 0 : if ( std::find(active_streams.begin(), active_streams.end(), det_stream) == active_streams.end())
188 0 : continue;
189 :
190 0 : uint32_t tx_geo_stream_id = det_stream->get_geo_id()->get_stream_id();
191 : // (tx, geo_stream) -> source_id
192 0 : ip_to_stream_src_groups[tx_ip][tx_geo_stream_id] = det_stream->get_source_id();
193 : }
194 0 : }
195 :
196 :
197 : // RS FIXME: Is this RX_Q bump is enough??? I don't remember how the RX_Qs are assigned...
198 0 : uint32_t core_idx(0), rx_q(0); // RS FIXME: Ensure that no RX_Q=0 is used for UDP RX, ever.
199 :
200 0 : m_rx_qs.insert(rx_q);
201 0 : m_arp_rx_queue = rx_q;
202 0 : ++rx_q;
203 :
204 : // Build additional helper maps
205 0 : for( const auto& [tx_ip, strm_src] : ip_to_stream_src_groups) {
206 0 : m_ips.insert(tx_ip);
207 0 : m_rx_qs.insert(rx_q);
208 0 : m_num_frames_rxq[rx_q] = { 0 };
209 0 : m_num_bytes_rxq[rx_q] = { 0 };
210 :
211 0 : m_rx_core_map[m_rte_cores[core_idx]][rx_q] = tx_ip;
212 0 : m_stream_id_to_source_id[rx_q] = strm_src;
213 : // TLOG() << "+++ ip, rx_q : (" << tx_ip << ", " << rx_q << ") -> " << strm_src;
214 :
215 0 : ++rx_q;
216 0 : if ( ++core_idx == m_rte_cores.size()) {
217 0 : core_idx = 0;
218 : }
219 : }
220 :
221 : // Log mapping
222 0 : for (auto const& [lcore, rx_qs] : m_rx_core_map) {
223 0 : TLOG() << "Lcore=" << lcore << " handles: ";
224 0 : for (auto const& [rx_q, src_ip] : rx_qs) {
225 0 : TLOG() << " rx_q=" << rx_q << " src_ip=" << src_ip;
226 : }
227 : }
228 :
229 : // Adding single TX queue for ARP responses
230 0 : TLOG() << "Append TX_Q=0 for ARP responses.";
231 0 : m_tx_qs.insert(0);
232 :
233 : // Strict parsing (DAQ protocol) or pass through of UDP payloads to SourceModels
234 0 : for (auto const& [sid, src_concept] : m_sources) {
235 0 : if (!src_concept->m_daq_protocol_ensured) {
236 0 : m_strict_parsing = false;
237 : }
238 : }
239 :
240 0 : }
241 :
242 :
243 : //-----------------------------------------------------------------------------
244 0 : IfaceWrapper::~IfaceWrapper()
245 : {
246 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << "IfaceWrapper destructor called. First stop check, then closing iface.";
247 :
248 0 : struct rte_flow_error error;
249 0 : rte_flow_flush(m_iface_id, &error);
250 : //graceful_stop();
251 : //close_iface();
252 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << "IfaceWrapper destroyed.";
253 0 : }
254 :
255 :
256 : //-----------------------------------------------------------------------------
257 : void
258 0 : IfaceWrapper::allocate_mbufs()
259 : {
260 0 : TLOG() << "Allocating pools and mbufs for UDP, GARP, and ARP.";
261 :
262 : // Pools for UDP RX messages
263 0 : for (size_t i=0; i<m_rx_qs.size(); ++i) {
264 0 : std::stringstream bufss;
265 0 : bufss << "MBP-" << m_iface_id << '-' << i;
266 0 : TLOG() << "Acquire pool with name=" << bufss.str() << " for iface_id=" << m_iface_id << " rxq=" << i;
267 0 : m_mbuf_pools[i] = ealutils::get_mempool(bufss.str(), m_num_mbufs, m_mbuf_cache_size, 16384, m_socket_id);
268 0 : m_bufs[i] = (rte_mbuf**) malloc(sizeof(struct rte_mbuf*) * m_burst_size);
269 : // No need to alloc?
270 : // rte_pktmbuf_alloc_bulk(m_mbuf_pools[i].get(), m_bufs[i], m_burst_size);
271 0 : }
272 :
273 : // Pools for GARP messages
274 0 : std::stringstream garpss;
275 0 : garpss << "GARPMBP-" << m_iface_id;
276 0 : TLOG() << "Acquire GARP pool with name=" << garpss.str() << " for iface_id=" << m_iface_id;
277 0 : m_garp_mbuf_pool = ealutils::get_mempool(garpss.str());
278 0 : m_garp_bufs[0] = (rte_mbuf**) malloc(sizeof(struct rte_mbuf*) * m_burst_size);
279 0 : rte_pktmbuf_alloc_bulk(m_garp_mbuf_pool.get(), m_garp_bufs[0], m_burst_size);
280 :
281 : // Pools for ARP request/responses
282 0 : std::stringstream arpss;
283 0 : arpss << "ARPMBP-" << m_iface_id;
284 0 : TLOG() << "Acquire ARP pool with name=" << arpss.str() << " for iface_id=" << m_iface_id;
285 0 : m_arp_mbuf_pool = ealutils::get_mempool(arpss.str());
286 0 : m_arp_bufs[0] = (rte_mbuf**) malloc(sizeof(struct rte_mbuf*) * m_burst_size);
287 0 : rte_pktmbuf_alloc_bulk(m_arp_mbuf_pool.get(), m_arp_bufs[0], m_burst_size);
288 :
289 0 : }
290 :
291 :
292 : //-----------------------------------------------------------------------------
293 : void
294 0 : IfaceWrapper::setup_interface()
295 : {
296 0 : TLOG() << "Initialize interface " << m_iface_id;
297 0 : bool with_reset = true, with_mq_mode = true; // go to config
298 0 : bool check_link_status = false;
299 :
300 0 : int retval = ealutils::iface_init(m_iface_id, m_rx_qs.size(), m_tx_qs.size(), m_rx_ring_size, m_tx_ring_size, m_mbuf_pools, with_reset, with_mq_mode, check_link_status);
301 0 : if (retval != 0 ) {
302 0 : throw FailedToSetupInterface(ERS_HERE, m_iface_id, retval);
303 : }
304 : // Promiscuous mode
305 0 : ealutils::iface_promiscuous_mode(m_iface_id, m_prom_mode); // should come from config
306 0 : }
307 :
308 :
309 : //-----------------------------------------------------------------------------
310 : void
311 0 : IfaceWrapper::setup_flow_steering()
312 : {
313 : // Flow steering setup
314 0 : TLOG() << "Configuring Flow steering rules for iface=" << m_iface_id;
315 0 : struct rte_flow_error error;
316 0 : struct rte_flow *flow;
317 0 : TLOG() << "Attempt to flush previous flow rules...";
318 0 : rte_flow_flush(m_iface_id, &error);
319 : #warning RS: FIXME -> Check for flow flush return!
320 :
321 0 : TLOG() << "Create control flow rules (ARP) assinged to rxq=" << m_arp_rx_queue;
322 0 : flow = generate_arp_flow(m_iface_id, m_arp_rx_queue, &error);
323 0 : if (not flow) { // ers::fatal
324 0 : TLOG() << "ARP flow can't be created for " << m_arp_rx_queue
325 0 : << " Error type: " << (unsigned)error.type
326 0 : << " Message: '" << error.message << "'";
327 0 : ers::fatal(dunedaq::datahandlinglibs::InitializationError(
328 0 : ERS_HERE, "Couldn't create ARP flow API rules!"));
329 0 : rte_exit(EXIT_FAILURE, "error in creating ARP flow");
330 : }
331 :
332 0 : TLOG() << "Create flow rules for UDP RX.";
333 0 : for (auto const& [lcoreid, rxqs] : m_rx_core_map) {
334 0 : for (auto const& [rxqid, srcip] : rxqs) {
335 : // Put the IP numbers temporarily in a vector, so they can be converted easily to uint32_t
336 0 : TLOG() << "Creating flow rule for src_ip=" << srcip << " assigned to rxq=" << rxqid;
337 0 : size_t ind = 0, current_ind = 0;
338 0 : std::vector<uint8_t> v;
339 0 : for (int i = 0; i < 4; ++i) {
340 0 : v.push_back(std::stoi(srcip.substr(current_ind, srcip.size() - current_ind), &ind));
341 0 : current_ind += ind + 1;
342 : }
343 :
344 0 : flow = generate_ipv4_flow(m_iface_id, rxqid,
345 0 : RTE_IPV4(v[0], v[1], v[2], v[3]), 0xffffffff, 0, 0, &error);
346 :
347 0 : if (not flow) { // ers::fatal
348 0 : TLOG() << "Flow can't be created for " << rxqid
349 0 : << " Error type: " << (unsigned)error.type
350 0 : << " Message: '" << error.message << "'";
351 0 : ers::fatal(dunedaq::datahandlinglibs::InitializationError(
352 0 : ERS_HERE, "Couldn't create Flow API rules!"));
353 0 : rte_exit(EXIT_FAILURE, "error in creating flow");
354 : }
355 0 : }
356 : }
357 :
358 0 : return;
359 : }
360 :
361 : //-----------------------------------------------------------------------------
362 : void
363 0 : IfaceWrapper::setup_xstats()
364 : {
365 : // Stats setup
366 0 : m_iface_xstats.setup(m_iface_id);
367 0 : m_iface_xstats.reset_counters();
368 0 : }
369 :
370 :
371 : //-----------------------------------------------------------------------------
372 : void
373 0 : IfaceWrapper::start()
374 : {
375 : // Reset counters for RX queues
376 0 : for (auto const& [rx_q, _] : m_num_frames_rxq ) {
377 0 : m_num_frames_rxq[rx_q] = { 0 };
378 0 : m_num_bytes_rxq[rx_q] = { 0 };
379 0 : m_num_full_bursts[rx_q] = { 0 };
380 0 : m_max_burst_size[rx_q] = { 0 };
381 : }
382 :
383 : // Reset counters for rte_workers
384 0 : for (auto const& [lcore, _] : m_rx_core_map) {
385 0 : m_num_unhandled_non_ipv4[lcore] = { 0 };
386 0 : m_num_unhandled_non_udp[lcore] = { 0 };
387 0 : m_num_unhandled_non_jumbo_udp[lcore] = { 0 };
388 : }
389 :
390 0 : m_lcore_enable_flow.store(false);
391 0 : m_lcore_quit_signal.store(false);
392 0 : TLOG() << "Interface id=" << m_iface_id <<" Launching GARP thread with garp_func...";
393 0 : m_garp_thread = std::thread(&IfaceWrapper::garp_func, this);
394 :
395 0 : TLOG() << "Interface id=" << m_iface_id << " starting ARP LCore processor:";
396 0 : m_arp_thread = std::thread(&IfaceWrapper::IfaceWrapper::arp_response_runner, this, nullptr);
397 :
398 :
399 0 : TLOG() << "Interface id=" << m_iface_id << " starting LCore processors:";
400 0 : for (auto const& [lcoreid, _] : m_rx_core_map) {
401 0 : int ret = rte_eal_remote_launch((int (*)(void*))(&IfaceWrapper::rx_runner), this, lcoreid);
402 0 : TLOG() << " -> LCore[" << lcoreid << "] launched with return code=" << ret << " " << (ret < 0 ? rte_strerror(-ret) : "");
403 : }
404 0 : }
405 :
406 : //-----------------------------------------------------------------------------
407 : void
408 0 : IfaceWrapper::stop()
409 : {
410 0 : m_lcore_enable_flow.store(false);
411 0 : m_lcore_quit_signal.store(true);
412 : // Stop GARP sender thread
413 0 : if (m_garp_thread.joinable()) {
414 0 : m_garp_thread.join();
415 : } else {
416 0 : TLOG() << "GARP thread is not joinable!";
417 : }
418 :
419 0 : if (m_arp_thread.joinable()) {
420 0 : m_arp_thread.join();
421 : } else {
422 0 : TLOG() << "ARP thread is not joinable!";
423 : }
424 0 : }
425 : /*
426 : void
427 : IfaceWrapper::scrap()
428 : {
429 : struct rte_flow_error error;
430 : rte_flow_flush(m_iface_id, &error);
431 : }
432 : */
433 :
434 :
435 : //-----------------------------------------------------------------------------
436 : void
437 0 : IfaceWrapper::generate_opmon_data() {
438 :
439 : // Poll stats from HW
440 0 : m_iface_xstats.poll();
441 :
442 0 : opmon::EthStats s;
443 0 : s.set_ipackets( m_iface_xstats.m_eth_stats.ipackets );
444 0 : s.set_opackets( m_iface_xstats.m_eth_stats.opackets );
445 0 : s.set_ibytes( m_iface_xstats.m_eth_stats.ibytes );
446 0 : s.set_obytes( m_iface_xstats.m_eth_stats.obytes );
447 0 : s.set_imissed( m_iface_xstats.m_eth_stats.imissed );
448 0 : s.set_ierrors( m_iface_xstats.m_eth_stats.ierrors );
449 0 : s.set_oerrors( m_iface_xstats.m_eth_stats.oerrors );
450 0 : s.set_rx_nombuf( m_iface_xstats.m_eth_stats.rx_nombuf );
451 0 : publish( std::move(s) );
452 :
453 0 : if(m_iface_xstats.m_eth_stats.imissed > 0){
454 0 : ers::warning(PacketErrors(ERS_HERE, m_iface_id_str, "missed", m_iface_xstats.m_eth_stats.imissed));
455 : }
456 0 : if(m_iface_xstats.m_eth_stats.ierrors > 0){
457 0 : ers::warning(PacketErrors(ERS_HERE, m_iface_id_str, "dropped", m_iface_xstats.m_eth_stats.ierrors));
458 : }
459 :
460 : // loop over all the xstats information
461 0 : opmon::EthXStatsInfo xinfos;
462 0 : opmon::EthXStatsErrors xerrs;
463 0 : std::map<std::string, opmon::QueueEthXStats> xq;
464 :
465 0 : for (int i = 0; i < m_iface_xstats.m_len; ++i) {
466 :
467 0 : std::string name(m_iface_xstats.m_xstats_names[i].name);
468 :
469 : // first we select the info from the queue
470 0 : static std::regex queue_regex(R"((rx|tx)_q(\d+)_([^_]+))");
471 0 : std::smatch match;
472 :
473 0 : if ( std::regex_match(name, match, queue_regex) ) {
474 0 : auto queue_name = match[1].str() + '-' + match[2].str();
475 0 : auto & entry = xq[queue_name];
476 0 : try {
477 0 : opmonlib::set_value( entry, match[3], m_iface_xstats.m_xstats_values[i] );
478 0 : } catch ( const ers::Issue & e ) {
479 0 : ers::warning( MetricPublishFailed( ERS_HERE, name, e) );
480 0 : }
481 0 : continue;
482 0 : }
483 :
484 0 : google::protobuf::Message * metric_p = nullptr;
485 0 : static std::regex err_regex(R"(.+error.*)");
486 0 : if ( std::regex_match( name, err_regex ) ) metric_p = & xerrs;
487 0 : else metric_p = & xinfos;
488 :
489 0 : try {
490 0 : opmonlib::set_value(*metric_p, name, m_iface_xstats.m_xstats_values[i]);
491 0 : } catch ( const ers::Issue & e ) {
492 0 : ers::warning( MetricPublishFailed( ERS_HERE, name, e) );
493 0 : }
494 :
495 0 : } // loop over xstats
496 :
497 : // Reset HW counters
498 0 : m_iface_xstats.reset_counters();
499 :
500 : // finally we publish the information
501 0 : publish( std::move(xinfos) );
502 0 : publish( std::move(xerrs) );
503 0 : for ( auto [id, stat] : xq ) {
504 0 : publish( std::move(stat), {{"queue", id}} );
505 0 : }
506 :
507 0 : for( const auto& [src_rx_q,_] : m_num_frames_rxq) {
508 0 : opmon::QueueInfo i;
509 0 : i.set_packets_received( m_num_frames_rxq[src_rx_q].load() );
510 0 : i.set_bytes_received( m_num_bytes_rxq[src_rx_q].load() );
511 0 : i.set_full_rx_burst( m_num_full_bursts[src_rx_q].load() );
512 0 : i.set_max_burst_size( m_max_burst_size[src_rx_q].exchange(0) );
513 :
514 0 : publish( std::move(i), {{"queue", std::to_string(src_rx_q)}} );
515 0 : }
516 :
517 : // RTE Workers
518 0 : for (auto const& [lcore, _] : m_rx_core_map) {
519 0 : opmon::RTEWorkerInfo info;
520 0 : info.set_num_unhandled_non_ipv4( m_num_unhandled_non_ipv4[lcore].exchange(0) );
521 0 : info.set_num_unhandled_non_udp( m_num_unhandled_non_udp[lcore].exchange(0) );
522 0 : info.set_num_unhandled_non_jumbo_udp( m_num_unhandled_non_jumbo_udp[lcore].exchange(0) );
523 0 : publish( std::move(info), {{"rte_worker_id", std::to_string(lcore)}} );
524 0 : }
525 :
526 0 : for ( auto & [id, counter] : m_num_unexid_frames ) {
527 0 : auto val = counter.exchange(0);
528 0 : if ( val > 0 ) {
529 0 : ers::warning( UnexpectedStreamID( ERS_HERE, id, val ) );
530 : }
531 : }
532 0 : }
533 :
534 : //-----------------------------------------------------------------------------
535 : void
536 0 : IfaceWrapper::garp_func()
537 : {
538 0 : TLOG() << "Launching GARP sender...";
539 0 : while(m_run_marker.load()) {
540 0 : for( const auto& ip_addr_bin : m_ip_addr_bin ) {
541 0 : arp::pktgen_send_garp(m_garp_bufs[0][0], m_iface_id, ip_addr_bin);
542 : }
543 0 : ++m_garps_sent;
544 0 : std::this_thread::sleep_for(std::chrono::seconds(1));
545 : }
546 0 : TLOG() << "GARP function joins.";
547 0 : }
548 :
549 : //-----------------------------------------------------------------------------
550 : void
551 0 : IfaceWrapper::parse_udp_payload(int src_rx_q, char* payload, std::size_t size)
552 : {
553 : // Pointers for parsing and to the end of the UDP payload.
554 0 : char* plptr = payload;
555 0 : const char* plendptr = payload + size;
556 :
557 :
558 : // Process every DAQEth frame within UDP payload
559 0 : while ( plptr + sizeof(dunedaq::detdataformats::DAQEthHeader) < plendptr ) { // Scatter loop start
560 :
561 :
562 : // Reinterpret directly to DAQEthHeader
563 0 : auto daqhdrptr = reinterpret_cast<dunedaq::detdataformats::DAQEthHeader*>(plptr);
564 :
565 : // Check number of DAQEth block_words
566 0 : unsigned block_words = unsigned(daqhdrptr->block_length) - 1; // removing timestamp word from the block length.
567 :
568 : // Check for corrupted DAQEth frame length
569 0 : if ( block_words == 0 || block_words > m_max_block_words ) {
570 : // RS FIXME: corrupted length -> stop, add opmon counter or warning
571 0 : return;
572 : }
573 :
574 : // Calculate data bytes after DAQEthHeader based on block_words
575 0 : std::size_t data_bytes = std::size_t(block_words) * sizeof(dunedaq::detdataformats::DAQEthHeader::word_t);
576 :
577 : // Grab end pointer of DAQEth frame
578 0 : char* daqframe_endptr = plptr + sizeof(dunedaq::detdataformats::DAQEthHeader) + data_bytes;
579 :
580 : // Check if full DAQEth frame fits
581 0 : if ( daqframe_endptr > plendptr ) {
582 : // RS FIXME: truncated payload -> stop, add opmon counter or warning
583 : return;
584 : }
585 :
586 : // Calculate DAQEth frame size (used both for handling and advancing)
587 0 : std::size_t daq_frame_size = sizeof(dunedaq::detdataformats::DAQEthHeader) + data_bytes;
588 :
589 : // Sadly, cannot take a reference to a bitfield
590 0 : uint strm_id = daqhdrptr->stream_id;
591 :
592 : // Check that stream id is corresponds to a registered source
593 0 : auto& strm_to_src = m_stream_id_to_source_id[src_rx_q];
594 :
595 0 : if ( auto strm_it = strm_to_src.find(strm_id); strm_it != strm_to_src.end() ) {
596 :
597 0 : m_sources[strm_it->second]->handle_daq_frame((char*)daqhdrptr, daq_frame_size);
598 :
599 :
600 : } else {
601 : // Really bad -> unexpeced StreamID in UDP Payload.
602 : // This check is needed in order to avoid dynamically add thousands
603 : // of Sources on the fly, in case the data corruption is extremely severe.
604 0 : if (m_num_unexid_frames.count(strm_id) == 0) {
605 0 : m_num_unexid_frames[strm_id] = 0;
606 : }
607 0 : m_num_unexid_frames[strm_id]++;
608 : }
609 :
610 : // Advance to next payload
611 0 : plptr += daq_frame_size;
612 :
613 : } // Scatter loop end
614 :
615 : }
616 :
617 : //-----------------------------------------------------------------------------
618 : void
619 0 : IfaceWrapper::passthrough_udp_payload(int src_rx_q, char* payload, std::size_t size)
620 : {
621 : // Get DAQ Header and its StreamID
622 0 : auto* daqhdrptr = reinterpret_cast<dunedaq::detdataformats::DAQEthHeader*>(payload);
623 :
624 :
625 : // Sadly, cannot take a reference to a bitfield
626 0 : uint strm_id = daqhdrptr->stream_id;
627 :
628 : // Check that stream id is corresponds to a registered source
629 0 : auto& strm_to_src = m_stream_id_to_source_id[src_rx_q];
630 :
631 :
632 0 : if ( auto strm_it = strm_to_src.find(strm_id); strm_it != strm_to_src.end() ) {
633 0 : m_sources[strm_it->second]->handle_daq_frame(payload, size);
634 : } else {
635 : // Really bad -> unexpeced StreamID in UDP Payload.
636 : // This check is needed in order to avoid dynamically add thousands
637 : // of Sources on the fly, in case the data corruption is extremely severe.
638 0 : if (m_num_unexid_frames.count(strm_id) == 0) {
639 0 : m_num_unexid_frames[strm_id] = 0;
640 : }
641 0 : m_num_unexid_frames[strm_id]++;
642 : }
643 0 : }
644 :
645 : } // namespace dpdklibs
646 : } // namespace dunedaq
647 :
648 : //
649 : #include "detail/IfaceWrapper.hxx"
|