DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
IfaceWrapper.cpp
Go to the documentation of this file.
1
8#include "logging/Logging.hpp"
10
11#include "opmonlib/Utils.hpp"
12
13#include "dpdklibs/Issues.hpp"
14
16
17#include "dpdklibs/EALSetup.hpp"
21#include "dpdklibs/arp/ARP.hpp"
23#include "IfaceWrapper.hpp"
24
26// #include "confmodel/DROStreamConf.hpp"
27// #include "confmodel/StreamParameters.hpp"
28#include "confmodel/GeoId.hpp"
32// #include "confmodel/NetworkDevice.hpp"
33// #include "appmodel/NICInterfaceConfiguration.hpp"
34// #include "appmodel/NICStatsConf.hpp"
35// #include "appmodel/EthStreamParameters.hpp"
36
38
39#include <chrono>
40#include <memory>
41#include <string>
42#include <regex>
43#include <stdexcept>
44#include <format>
45
49enum
50{
52 TLVL_WORK_STEPS = 10,
53 TLVL_BOOKKEEPING = 15
54};
55
56namespace dunedaq {
57namespace dpdklibs {
58
59
60//-----------------------------------------------------------------------------
61// TODO: the constructor signature shall be reviewd.
62// The current constructor takes a set of largely correlated arguments
63// - a receiver object
64// - the list of active senders
65// - the list of active streams
66//
67// These arguments are created by applying the enable mask to detector2daq connection object.
68// They are preferred to the d2d object not to expose the IfaceWrapper code to the System class
69IfaceWrapper::IfaceWrapper(
70 uint iface_id,
71 const appmodel::DPDKReceiver* receiver,
72 const std::vector<const appmodel::NWDetDataSender*>& nw_senders,
73 const std::vector<const confmodel::DetectorStream*>& active_streams,
74 sid_to_source_map_t& sources,
75 std::atomic<bool>& run_marker
76 )
77 : m_sources(sources)
78 , m_run_marker(run_marker)
79{
80
81 // Arguments consistency check: collect source ids in senders
82 std::set<int> src_in_d2d;
83
84 for( auto& det_stream : active_streams ) {
85 src_in_d2d.insert(det_stream->get_source_id());
86 }
87
88 // Arguments consistency check: collect source ids in source model map
89 std::set<int> src_models;
90 for( const auto& [src_id, _] : m_sources ) {
91 src_models.insert(src_id);
92 }
93
94 // check that the 2 sets are identical.
95 if (!std::includes(src_models.begin(), src_models.end(), src_in_d2d.begin(), src_in_d2d.end())) {
96
97 // TODO: remove, possibly
98 for ( auto src : src_models )
99 TLOG_DEBUG(TLVL_BOOKKEEPING) << "model srcid " << src;
100 for ( auto src : src_in_d2d )
101 TLOG_DEBUG(TLVL_BOOKKEEPING) << "d2d srcid " << src;
102
103 // D2D sources are not included in the source model list
104 // Extract the differences: src_in_d2d - src_models
105
106 std::vector<int> src_missing;
107 std::set_difference(src_models.begin(), src_models.end(),
108 src_in_d2d.begin(), src_in_d2d.end(),
109 std::back_inserter(src_missing));
110
111 std::stringstream ss;
112 for( int src : src_missing ) {
113 ss << src << " ";
114 }
115
116 // TLOG() << std::format("WARNING : these source ids are present in the d2d connection but no corresponding source objects are found {}", ss.str());
117 throw MissingSourceIDOutputs(ERS_HERE, m_iface_id, ss.str());
118 }
119
120 auto net_device = receiver->get_uses();
121
122 m_iface_id = iface_id;
123 m_mac_addr = net_device->get_mac_address();
124 m_ip_addr = net_device->get_ip_address();
125
126 TLOG() << "Building IfaceWrapper " << m_iface_id;
127 std::stringstream s;
128 s << 'IfaceWrapper (port ' << m_iface_id << ") responding to : ";
129 for( const std::string& ip_addr : m_ip_addr) {
130 s << ip_addr << " ";
131 }
132
133 TLOG() << s.str();
134
135 for( const std::string& ip_addr : m_ip_addr) {
136 IpAddr ip_addr_struct(ip_addr);
137 m_ip_addr_bin.push_back(udp::ip_address_dotdecimal_to_binary(
138 ip_addr_struct.addr_bytes[0],
139 ip_addr_struct.addr_bytes[1],
140 ip_addr_struct.addr_bytes[2],
141 ip_addr_struct.addr_bytes[3]
142 ));
143 }
144
145
146 auto iface_cfg = receiver->get_configuration();
147
148 m_with_flow = iface_cfg->get_flow_control();
149 m_prom_mode = iface_cfg->get_promiscuous_mode();;
150 m_mtu = iface_cfg->get_mtu();
151 m_max_block_words = unsigned(m_mtu) / sizeof(uint64_t);
152 m_rx_ring_size = iface_cfg->get_rx_ring_size();
153 m_tx_ring_size = iface_cfg->get_tx_ring_size();
154 m_num_mbufs = iface_cfg->get_num_bufs();
155 m_burst_size = iface_cfg->get_burst_size();
156 m_mbuf_cache_size = iface_cfg->get_mbuf_cache_size();
157
158 m_lcore_sleep_ns = iface_cfg->get_lcore_sleep_us() * 1000;
159 m_socket_id = rte_eth_dev_socket_id(m_iface_id);
160
161 m_iface_id_str = iface_cfg->UID();
162
163
164 // Here is my list of cores
165 for( const auto* proc_res : iface_cfg->get_used_lcores()) {
166 m_rte_cores.insert(m_rte_cores.end(), proc_res->get_cpu_cores().begin(), proc_res->get_cpu_cores().end());
167 }
168 if(std::find(m_rte_cores.begin(), m_rte_cores.end(), rte_get_main_lcore())!=m_rte_cores.end()) {
169 TLOG() << "ERROR! Throw ERS error here that LCore=0 should not be used, as it's a control RTE core!";
170 throw std::runtime_error(std::string("ERROR! Throw ERS here that LCore=0 should not be used, as it's a control RTE core!"));
171 }
172
173 // iterate through active streams
174
175 // Create a map of sender ni (ip) to streams from the d2d connection object
176 std::map<std::string, std::map<uint, uint>> ip_to_stream_src_groups;
177
178 for( auto nw_sender : nw_senders ) {
179 auto sender_ni = nw_sender->get_uses();
180
181 std::string tx_ip = sender_ni->get_ip_address().at(0);
182
183 // Loop over streams
184 for ( auto det_stream : nw_sender->get_streams() ) {
185
186 // Only include active streams
187 if ( std::find(active_streams.begin(), active_streams.end(), det_stream) == active_streams.end())
188 continue;
189
190 uint32_t tx_geo_stream_id = det_stream->get_geo_id()->get_stream_id();
191 // (tx, geo_stream) -> source_id
192 ip_to_stream_src_groups[tx_ip][tx_geo_stream_id] = det_stream->get_source_id();
193 }
194 }
195
196
197// RS FIXME: Is this RX_Q bump is enough??? I don't remember how the RX_Qs are assigned...
198 uint32_t core_idx(0), rx_q(0); // RS FIXME: Ensure that no RX_Q=0 is used for UDP RX, ever.
199
200 m_rx_qs.insert(rx_q);
201 m_arp_rx_queue = rx_q;
202 ++rx_q;
203
204 // Build additional helper maps
205 for( const auto& [tx_ip, strm_src] : ip_to_stream_src_groups) {
206 m_ips.insert(tx_ip);
207 m_rx_qs.insert(rx_q);
208 m_num_frames_rxq[rx_q] = { 0 };
209 m_num_bytes_rxq[rx_q] = { 0 };
210
211 m_rx_core_map[m_rte_cores[core_idx]][rx_q] = tx_ip;
212 m_stream_id_to_source_id[rx_q] = strm_src;
213 // TLOG() << "+++ ip, rx_q : (" << tx_ip << ", " << rx_q << ") -> " << strm_src;
214
215 ++rx_q;
216 if ( ++core_idx == m_rte_cores.size()) {
217 core_idx = 0;
218 }
219 }
220
221 // Log mapping
222 for (auto const& [lcore, rx_qs] : m_rx_core_map) {
223 TLOG() << "Lcore=" << lcore << " handles: ";
224 for (auto const& [rx_q, src_ip] : rx_qs) {
225 TLOG() << " rx_q=" << rx_q << " src_ip=" << src_ip;
226 }
227 }
228
229 // Adding single TX queue for ARP responses
230 TLOG() << "Append TX_Q=0 for ARP responses.";
231 m_tx_qs.insert(0);
232
233 // Strict parsing (DAQ protocol) or pass through of UDP payloads to SourceModels
234 for (auto const& [sid, src_concept] : m_sources) {
235 if (!src_concept->m_daq_protocol_ensured) {
236 m_strict_parsing = false;
237 }
238 }
239
240}
241
242
243//-----------------------------------------------------------------------------
244IfaceWrapper::~IfaceWrapper()
245{
246 TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << "IfaceWrapper destructor called. First stop check, then closing iface.";
247
248 struct rte_flow_error error;
249 rte_flow_flush(m_iface_id, &error);
250 //graceful_stop();
251 //close_iface();
252 TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << "IfaceWrapper destroyed.";
253}
254
255
256//-----------------------------------------------------------------------------
257void
258IfaceWrapper::allocate_mbufs()
259{
260 TLOG() << "Allocating pools and mbufs for UDP, GARP, and ARP.";
261
262 // Pools for UDP RX messages
263 for (size_t i=0; i<m_rx_qs.size(); ++i) {
264 std::stringstream bufss;
265 bufss << "MBP-" << m_iface_id << '-' << i;
266 TLOG() << "Acquire pool with name=" << bufss.str() << " for iface_id=" << m_iface_id << " rxq=" << i;
267 m_mbuf_pools[i] = ealutils::get_mempool(bufss.str(), m_num_mbufs, m_mbuf_cache_size, 16384, m_socket_id);
268 m_bufs[i] = (rte_mbuf**) malloc(sizeof(struct rte_mbuf*) * m_burst_size);
269 // No need to alloc?
270 // rte_pktmbuf_alloc_bulk(m_mbuf_pools[i].get(), m_bufs[i], m_burst_size);
271 }
272
273 // Pools for GARP messages
274 std::stringstream garpss;
275 garpss << "GARPMBP-" << m_iface_id;
276 TLOG() << "Acquire GARP pool with name=" << garpss.str() << " for iface_id=" << m_iface_id;
277 m_garp_mbuf_pool = ealutils::get_mempool(garpss.str());
278 m_garp_bufs[0] = (rte_mbuf**) malloc(sizeof(struct rte_mbuf*) * m_burst_size);
279 rte_pktmbuf_alloc_bulk(m_garp_mbuf_pool.get(), m_garp_bufs[0], m_burst_size);
280
281 // Pools for ARP request/responses
282 std::stringstream arpss;
283 arpss << "ARPMBP-" << m_iface_id;
284 TLOG() << "Acquire ARP pool with name=" << arpss.str() << " for iface_id=" << m_iface_id;
285 m_arp_mbuf_pool = ealutils::get_mempool(arpss.str());
286 m_arp_bufs[0] = (rte_mbuf**) malloc(sizeof(struct rte_mbuf*) * m_burst_size);
287 rte_pktmbuf_alloc_bulk(m_arp_mbuf_pool.get(), m_arp_bufs[0], m_burst_size);
288
289}
290
291
292//-----------------------------------------------------------------------------
293void
294IfaceWrapper::setup_interface()
295{
296 TLOG() << "Initialize interface " << m_iface_id;
297 bool with_reset = true, with_mq_mode = true; // go to config
298 bool check_link_status = false;
299
300 int retval = ealutils::iface_init(m_iface_id, m_rx_qs.size(), m_tx_qs.size(), m_rx_ring_size, m_tx_ring_size, m_mbuf_pools, with_reset, with_mq_mode, check_link_status);
301 if (retval != 0 ) {
302 throw FailedToSetupInterface(ERS_HERE, m_iface_id, retval);
303 }
304 // Promiscuous mode
305 ealutils::iface_promiscuous_mode(m_iface_id, m_prom_mode); // should come from config
306}
307
308
309//-----------------------------------------------------------------------------
310void
311IfaceWrapper::setup_flow_steering()
312{
313 // Flow steering setup
314 TLOG() << "Configuring Flow steering rules for iface=" << m_iface_id;
315 struct rte_flow_error error;
316 struct rte_flow *flow;
317 TLOG() << "Attempt to flush previous flow rules...";
318 rte_flow_flush(m_iface_id, &error);
319#warning RS: FIXME -> Check for flow flush return!
320
321 TLOG() << "Create control flow rules (ARP) assinged to rxq=" << m_arp_rx_queue;
322 flow = generate_arp_flow(m_iface_id, m_arp_rx_queue, &error);
323 if (not flow) { // ers::fatal
324 TLOG() << "ARP flow can't be created for " << m_arp_rx_queue
325 << " Error type: " << (unsigned)error.type
326 << " Message: '" << error.message << "'";
327 ers::fatal(dunedaq::datahandlinglibs::InitializationError(
328 ERS_HERE, "Couldn't create ARP flow API rules!"));
329 rte_exit(EXIT_FAILURE, "error in creating ARP flow");
330 }
331
332 TLOG() << "Create flow rules for UDP RX.";
333 for (auto const& [lcoreid, rxqs] : m_rx_core_map) {
334 for (auto const& [rxqid, srcip] : rxqs) {
335 // Put the IP numbers temporarily in a vector, so they can be converted easily to uint32_t
336 TLOG() << "Creating flow rule for src_ip=" << srcip << " assigned to rxq=" << rxqid;
337 size_t ind = 0, current_ind = 0;
338 std::vector<uint8_t> v;
339 for (int i = 0; i < 4; ++i) {
340 v.push_back(std::stoi(srcip.substr(current_ind, srcip.size() - current_ind), &ind));
341 current_ind += ind + 1;
342 }
343
344 flow = generate_ipv4_flow(m_iface_id, rxqid,
345 RTE_IPV4(v[0], v[1], v[2], v[3]), 0xffffffff, 0, 0, &error);
346
347 if (not flow) { // ers::fatal
348 TLOG() << "Flow can't be created for " << rxqid
349 << " Error type: " << (unsigned)error.type
350 << " Message: '" << error.message << "'";
351 ers::fatal(dunedaq::datahandlinglibs::InitializationError(
352 ERS_HERE, "Couldn't create Flow API rules!"));
353 rte_exit(EXIT_FAILURE, "error in creating flow");
354 }
355 }
356 }
357
358 return;
359}
360
361//-----------------------------------------------------------------------------
362void
363IfaceWrapper::setup_xstats()
364{
365 // Stats setup
366 m_iface_xstats.setup(m_iface_id);
367 m_iface_xstats.reset_counters();
368}
369
370
371//-----------------------------------------------------------------------------
372void
373IfaceWrapper::start()
374{
375 // Reset counters for RX queues
376 for (auto const& [rx_q, _] : m_num_frames_rxq ) {
377 m_num_frames_rxq[rx_q] = { 0 };
378 m_num_bytes_rxq[rx_q] = { 0 };
379 m_num_full_bursts[rx_q] = { 0 };
380 m_max_burst_size[rx_q] = { 0 };
381 }
382
383 // Reset counters for rte_workers
384 for (auto const& [lcore, _] : m_rx_core_map) {
385 m_num_unhandled_non_ipv4[lcore] = { 0 };
386 m_num_unhandled_non_udp[lcore] = { 0 };
387 m_num_unhandled_non_jumbo_udp[lcore] = { 0 };
388 }
389
390 m_lcore_enable_flow.store(false);
391 m_lcore_quit_signal.store(false);
392 TLOG() << "Interface id=" << m_iface_id <<" Launching GARP thread with garp_func...";
393 m_garp_thread = std::thread(&IfaceWrapper::garp_func, this);
394
395 TLOG() << "Interface id=" << m_iface_id << " starting ARP LCore processor:";
396 m_arp_thread = std::thread(&IfaceWrapper::IfaceWrapper::arp_response_runner, this, nullptr);
397
398
399 TLOG() << "Interface id=" << m_iface_id << " starting LCore processors:";
400 for (auto const& [lcoreid, _] : m_rx_core_map) {
401 int ret = rte_eal_remote_launch((int (*)(void*))(&IfaceWrapper::rx_runner), this, lcoreid);
402 TLOG() << " -> LCore[" << lcoreid << "] launched with return code=" << ret << " " << (ret < 0 ? rte_strerror(-ret) : "");
403 }
404}
405
406//-----------------------------------------------------------------------------
407void
408IfaceWrapper::stop()
409{
410 m_lcore_enable_flow.store(false);
411 m_lcore_quit_signal.store(true);
412 // Stop GARP sender thread
413 if (m_garp_thread.joinable()) {
414 m_garp_thread.join();
415 } else {
416 TLOG() << "GARP thread is not joinable!";
417 }
418
419 if (m_arp_thread.joinable()) {
420 m_arp_thread.join();
421 } else {
422 TLOG() << "ARP thread is not joinable!";
423 }
424}
425/*
426void
427IfaceWrapper::scrap()
428{
429 struct rte_flow_error error;
430 rte_flow_flush(m_iface_id, &error);
431}
432*/
433
434
435//-----------------------------------------------------------------------------
436void
437IfaceWrapper::generate_opmon_data() {
438
439 // Poll stats from HW
440 m_iface_xstats.poll();
441
442 opmon::EthStats s;
443 s.set_ipackets( m_iface_xstats.m_eth_stats.ipackets );
444 s.set_opackets( m_iface_xstats.m_eth_stats.opackets );
445 s.set_ibytes( m_iface_xstats.m_eth_stats.ibytes );
446 s.set_obytes( m_iface_xstats.m_eth_stats.obytes );
447 s.set_imissed( m_iface_xstats.m_eth_stats.imissed );
448 s.set_ierrors( m_iface_xstats.m_eth_stats.ierrors );
449 s.set_oerrors( m_iface_xstats.m_eth_stats.oerrors );
450 s.set_rx_nombuf( m_iface_xstats.m_eth_stats.rx_nombuf );
451 publish( std::move(s) );
452
453 if(m_iface_xstats.m_eth_stats.imissed > 0){
454 ers::warning(PacketErrors(ERS_HERE, m_iface_id_str, "missed", m_iface_xstats.m_eth_stats.imissed));
455 }
456 if(m_iface_xstats.m_eth_stats.ierrors > 0){
457 ers::warning(PacketErrors(ERS_HERE, m_iface_id_str, "dropped", m_iface_xstats.m_eth_stats.ierrors));
458 }
459
460 // loop over all the xstats information
461 opmon::EthXStatsInfo xinfos;
462 opmon::EthXStatsErrors xerrs;
463 std::map<std::string, opmon::QueueEthXStats> xq;
464
465 for (int i = 0; i < m_iface_xstats.m_len; ++i) {
466
467 std::string name(m_iface_xstats.m_xstats_names[i].name);
468
469 // first we select the info from the queue
470 static std::regex queue_regex(R"((rx|tx)_q(\d+)_([^_]+))");
471 std::smatch match;
472
473 if ( std::regex_match(name, match, queue_regex) ) {
474 auto queue_name = match[1].str() + '-' + match[2].str();
475 auto & entry = xq[queue_name];
476 try {
477 opmonlib::set_value( entry, match[3], m_iface_xstats.m_xstats_values[i] );
478 } catch ( const ers::Issue & e ) {
479 ers::warning( MetricPublishFailed( ERS_HERE, name, e) );
480 }
481 continue;
482 }
483
484 google::protobuf::Message * metric_p = nullptr;
485 static std::regex err_regex(R"(.+error.*)");
486 if ( std::regex_match( name, err_regex ) ) metric_p = & xerrs;
487 else metric_p = & xinfos;
488
489 try {
490 opmonlib::set_value(*metric_p, name, m_iface_xstats.m_xstats_values[i]);
491 } catch ( const ers::Issue & e ) {
492 ers::warning( MetricPublishFailed( ERS_HERE, name, e) );
493 }
494
495 } // loop over xstats
496
497 // Reset HW counters
498 m_iface_xstats.reset_counters();
499
500 // finally we publish the information
501 publish( std::move(xinfos) );
502 publish( std::move(xerrs) );
503 for ( auto [id, stat] : xq ) {
504 publish( std::move(stat), {{"queue", id}} );
505 }
506
507 for( const auto& [src_rx_q,_] : m_num_frames_rxq) {
508 opmon::QueueInfo i;
509 i.set_packets_received( m_num_frames_rxq[src_rx_q].load() );
510 i.set_bytes_received( m_num_bytes_rxq[src_rx_q].load() );
511 i.set_full_rx_burst( m_num_full_bursts[src_rx_q].load() );
512 i.set_max_burst_size( m_max_burst_size[src_rx_q].exchange(0) );
513
514 publish( std::move(i), {{"queue", std::to_string(src_rx_q)}} );
515 }
516
517 // RTE Workers
518 for (auto const& [lcore, _] : m_rx_core_map) {
519 opmon::RTEWorkerInfo info;
520 info.set_num_unhandled_non_ipv4( m_num_unhandled_non_ipv4[lcore].exchange(0) );
521 info.set_num_unhandled_non_udp( m_num_unhandled_non_udp[lcore].exchange(0) );
522 info.set_num_unhandled_non_jumbo_udp( m_num_unhandled_non_jumbo_udp[lcore].exchange(0) );
523 publish( std::move(info), {{"rte_worker_id", std::to_string(lcore)}} );
524 }
525
526 for ( auto & [id, counter] : m_num_unexid_frames ) {
527 auto val = counter.exchange(0);
528 if ( val > 0 ) {
530 }
531 }
532}
533
534//-----------------------------------------------------------------------------
535void
536IfaceWrapper::garp_func()
537{
538 TLOG() << "Launching GARP sender...";
539 while(m_run_marker.load()) {
540 for( const auto& ip_addr_bin : m_ip_addr_bin ) {
541 arp::pktgen_send_garp(m_garp_bufs[0][0], m_iface_id, ip_addr_bin);
542 }
543 ++m_garps_sent;
544 std::this_thread::sleep_for(std::chrono::seconds(1));
545 }
546 TLOG() << "GARP function joins.";
547}
548
549//-----------------------------------------------------------------------------
550void
551IfaceWrapper::parse_udp_payload(int src_rx_q, char* payload, std::size_t size)
552{
553 // Pointers for parsing and to the end of the UDP payload.
554 char* plptr = payload;
555 const char* plendptr = payload + size;
556
557
558 // Process every DAQEth frame within UDP payload
559 while ( plptr + sizeof(dunedaq::detdataformats::DAQEthHeader) < plendptr ) { // Scatter loop start
560
561
562 // Reinterpret directly to DAQEthHeader
563 auto daqhdrptr = reinterpret_cast<dunedaq::detdataformats::DAQEthHeader*>(plptr);
564
565 // Check number of DAQEth block_words
566 unsigned block_words = unsigned(daqhdrptr->block_length) - 1; // removing timestamp word from the block length.
567
568 // Check for corrupted DAQEth frame length
569 if ( block_words == 0 || block_words > m_max_block_words ) {
570 // RS FIXME: corrupted length -> stop, add opmon counter or warning
571 return;
572 }
573
574 // Calculate data bytes after DAQEthHeader based on block_words
575 std::size_t data_bytes = std::size_t(block_words) * sizeof(dunedaq::detdataformats::DAQEthHeader::word_t);
576
577 // Grab end pointer of DAQEth frame
578 char* daqframe_endptr = plptr + sizeof(dunedaq::detdataformats::DAQEthHeader) + data_bytes;
579
580 // Check if full DAQEth frame fits
581 if ( daqframe_endptr > plendptr ) {
582 // RS FIXME: truncated payload -> stop, add opmon counter or warning
583 return;
584 }
585
586 // Calculate DAQEth frame size (used both for handling and advancing)
587 std::size_t daq_frame_size = sizeof(dunedaq::detdataformats::DAQEthHeader) + data_bytes;
588
589 // Sadly, cannot take a reference to a bitfield
590 uint strm_id = daqhdrptr->stream_id;
591
592 // Check that stream id is corresponds to a registered source
593 auto& strm_to_src = m_stream_id_to_source_id[src_rx_q];
594
595 if ( auto strm_it = strm_to_src.find(strm_id); strm_it != strm_to_src.end() ) {
596
597 m_sources[strm_it->second]->handle_daq_frame((char*)daqhdrptr, daq_frame_size);
598
599
600 } else {
601 // Really bad -> unexpeced StreamID in UDP Payload.
602 // This check is needed in order to avoid dynamically add thousands
603 // of Sources on the fly, in case the data corruption is extremely severe.
604 if (m_num_unexid_frames.count(strm_id) == 0) {
605 m_num_unexid_frames[strm_id] = 0;
606 }
607 m_num_unexid_frames[strm_id]++;
608 }
609
610 // Advance to next payload
611 plptr += daq_frame_size;
612
613 } // Scatter loop end
614
615}
616
617//-----------------------------------------------------------------------------
618void
619IfaceWrapper::passthrough_udp_payload(int src_rx_q, char* payload, std::size_t size)
620{
621 // Get DAQ Header and its StreamID
622 auto* daqhdrptr = reinterpret_cast<dunedaq::detdataformats::DAQEthHeader*>(payload);
623
624
625 // Sadly, cannot take a reference to a bitfield
626 uint strm_id = daqhdrptr->stream_id;
627
628 // Check that stream id is corresponds to a registered source
629 auto& strm_to_src = m_stream_id_to_source_id[src_rx_q];
630
631
632 if ( auto strm_it = strm_to_src.find(strm_id); strm_it != strm_to_src.end() ) {
633 m_sources[strm_it->second]->handle_daq_frame(payload, size);
634 } else {
635 // Really bad -> unexpeced StreamID in UDP Payload.
636 // This check is needed in order to avoid dynamically add thousands
637 // of Sources on the fly, in case the data corruption is extremely severe.
638 if (m_num_unexid_frames.count(strm_id) == 0) {
639 m_num_unexid_frames[strm_id] = 0;
640 }
641 m_num_unexid_frames[strm_id]++;
642 }
643}
644
645} // namespace dpdklibs
646} // namespace dunedaq
647
648//
@ TLVL_ENTER_EXIT_METHODS
#define ERS_HERE
Base class for any user define issue.
Definition Issue.hpp:69
std::atomic< bool > run_marker
Global atomic for process lifetime.
#define TLOG_DEBUG(lvl,...)
Definition Logging.hpp:112
#define TLOG(...)
Definition macro.hpp:22
dunedaq::conffwk::relationship_t match(T const &, T const &)
void pktgen_send_garp(struct rte_mbuf *m, uint32_t port_id, rte_be32_t binary_ip_address)
Definition ARP.cpp:23
std::unique_ptr< rte_mempool > get_mempool(const std::string &pool_name, int num_mbufs=NUM_MBUFS, int mbuf_cache_size=MBUF_CACHE_SIZE, int data_room_size=9800, int socket_id=0)
Definition EALSetup.cpp:260
int iface_promiscuous_mode(std::uint16_t iface, bool mode=false)
Definition EALSetup.cpp:74
int iface_init(uint16_t iface, uint16_t rx_rings, uint16_t tx_rings, uint16_t rx_ring_size, uint16_t tx_ring_size, std::map< int, std::unique_ptr< rte_mempool > > &mbuf_pool, bool with_reset=false, bool with_mq_rss=false, bool check_link_status=false)
Definition EALSetup.cpp:95
rte_be32_t ip_address_dotdecimal_to_binary(std::uint8_t byte1, std::uint8_t byte2, std::uint8_t byte3, std::uint8_t byte4)
Definition Utils.cpp:41
struct rte_flow * generate_ipv4_flow(uint16_t port_id, uint16_t rx_q, uint32_t src_ip, uint32_t src_mask, uint32_t dest_ip, uint32_t dest_mask, struct rte_flow_error *error)
struct rte_flow * generate_arp_flow(uint16_t port_id, uint16_t rx_q, struct rte_flow_error *error)
void set_value(google::protobuf::Message &m, const std::string &name, T value)
Definition Utils.hxx:17
The DUNE-DAQ namespace.
FELIX Initialization std::string initerror FELIX queue timed std::string queuename Unexpected chunk size
default char v[0]
void warning(const Issue &issue)
Definition ers.hpp:115
void fatal(const Issue &issue)
Definition ers.hpp:88
DAQEthHeader is a versioned and unified structure for every FE electronics.
#define TLVL_ENTER_EXIT_METHODS
Definition Issues.hpp:24
Factory couldn t std::string alg_name Invalid configuration error
Definition Issues.hpp:34