LCOV - code coverage report
Current view: top level - dpdklibs/src - IfaceWrapper.cpp (source / functions) Coverage Total Hit
Test: code.result Lines: 0.0 % 306 0
Test Date: 2025-12-21 13:07:08 Functions: 0.0 % 42 0

            Line data    Source code
       1              : /**
       2              :  * @file IfaceWrapper.cpp DPDK based Interface wrapper
       3              :  *
       4              :  * This is part of the DUNE DAQ , copyright 2020.
       5              :  * Licensing/copyright details are in the COPYING file that you should have
       6              :  * received with this code.
       7              :  */
       8              : #include "logging/Logging.hpp"
       9              : #include "datahandlinglibs/DataHandlingIssues.hpp"
      10              : 
      11              : #include "opmonlib/Utils.hpp"
      12              : 
      13              : #include "dpdklibs/Issues.hpp"
      14              : 
      15              : #include "dpdklibs/nicreader/Structs.hpp"
      16              : 
      17              : #include "dpdklibs/EALSetup.hpp"
      18              : #include "dpdklibs/FlowControl.hpp"
      19              : #include "dpdklibs/udp/PacketCtor.hpp"
      20              : #include "dpdklibs/udp/Utils.hpp"
      21              : #include "dpdklibs/arp/ARP.hpp"
      22              : #include "dpdklibs/ipv4_addr.hpp"
      23              : #include "IfaceWrapper.hpp"
      24              : 
      25              : #include "appfwk/ConfigurationManager.hpp"
      26              : // #include "confmodel/DROStreamConf.hpp"
      27              : // #include "confmodel/StreamParameters.hpp"
      28              : #include "confmodel/GeoId.hpp"
      29              : #include "confmodel/DetectorStream.hpp"
      30              : #include "confmodel/ProcessingResource.hpp"
      31              : #include "appmodel/DPDKPortConfiguration.hpp"
      32              : // #include "confmodel/NetworkDevice.hpp"
      33              : // #include "appmodel/NICInterfaceConfiguration.hpp"
      34              : // #include "appmodel/NICStatsConf.hpp"
      35              : // #include "appmodel/EthStreamParameters.hpp"
      36              : 
      37              : #include "dpdklibs/opmon/IfaceWrapper.pb.h"
      38              : 
      39              : #include <chrono>
      40              : #include <memory>
      41              : #include <string>
      42              : #include <regex>
      43              : #include <stdexcept>
      44              : #include <format>
      45              : 
      46              : /**
      47              :  * @brief TRACE debug levels used in this source file
      48              :  */
      49              : enum
      50              : {
      51              :   TLVL_ENTER_EXIT_METHODS = 5,
      52              :   TLVL_WORK_STEPS = 10,
      53              :   TLVL_BOOKKEEPING = 15
      54              : };
      55              : 
      56              : namespace dunedaq {
      57              : namespace dpdklibs {
      58              : 
      59              : 
      60              : //-----------------------------------------------------------------------------
      61              : // TODO: the constructor signature shall be reviewd.
      62              : // The current constructor takes a set of largely correlated arguments
      63              : // - a receiver object
      64              : // - the list of active senders
      65              : // - the list of active streams
      66              : // 
      67              : // These arguments are created by applying the enable mask to detector2daq connection object.
      68              : // They are preferred to the d2d object not to expose the IfaceWrapper code to the System class
      69            0 : IfaceWrapper::IfaceWrapper(
      70              :   uint iface_id,
      71              :   const appmodel::DPDKReceiver* receiver,
      72              :   const std::vector<const appmodel::NWDetDataSender*>& nw_senders,
      73              :   const std::vector<const confmodel::DetectorStream*>& active_streams,
      74              :   sid_to_source_map_t& sources,
      75              :   std::atomic<bool>& run_marker
      76            0 :   )
      77            0 :     : m_sources(sources)
      78            0 :     , m_run_marker(run_marker)
      79              : { 
      80              : 
      81              :   // Arguments consistency check: collect source ids in senders
      82            0 :   std::set<int> src_in_d2d;
      83              : 
      84            0 :   for( auto& det_stream : active_streams ) {
      85            0 :     src_in_d2d.insert(det_stream->get_source_id());
      86              :   }
      87              : 
      88              :   // Arguments consistency check: collect source ids in source model map
      89            0 :   std::set<int> src_models;
      90            0 :   for( const auto& [src_id, _] : m_sources ) {
      91            0 :     src_models.insert(src_id);
      92              :   }
      93              : 
      94              :   // check that the 2 sets are identical.
      95            0 :   if (!std::includes(src_models.begin(), src_models.end(), src_in_d2d.begin(), src_in_d2d.end())) {
      96              : 
      97              :     // TODO: remove, possibly
      98            0 :     for ( auto src : src_models ) 
      99            0 :       TLOG_DEBUG(TLVL_BOOKKEEPING) << "model srcid " << src;
     100            0 :     for ( auto src : src_in_d2d ) 
     101            0 :       TLOG_DEBUG(TLVL_BOOKKEEPING) << "d2d srcid " << src;
     102              : 
     103              :     // D2D sources are not included in the source model list
     104              :     // Extract the differences: src_in_d2d - src_models
     105              : 
     106            0 :     std::vector<int> src_missing;
     107            0 :     std::set_difference(src_models.begin(), src_models.end(),
     108              :                         src_in_d2d.begin(), src_in_d2d.end(),
     109              :                         std::back_inserter(src_missing));
     110              : 
     111            0 :     std::stringstream ss;
     112            0 :     for( int src : src_missing ) {
     113            0 :       ss << src << " ";
     114              :     }
     115              : 
     116              :     // TLOG() << std::format("WARNING : these source ids are present in the d2d connection but no corresponding source objects are found {}", ss.str());
     117            0 :     throw MissingSourceIDOutputs(ERS_HERE, m_iface_id, ss.str());
     118            0 :   }
     119              : 
     120            0 :   auto net_device = receiver->get_uses();
     121              : 
     122            0 :   m_iface_id = iface_id;
     123            0 :   m_mac_addr = net_device->get_mac_address();
     124            0 :   m_ip_addr = net_device->get_ip_address();
     125              : 
     126            0 :   TLOG() << "Building IfaceWrapper " << m_iface_id;
     127            0 :   std::stringstream s;
     128            0 :   s << 'IfaceWrapper (port ' << m_iface_id << ") responding to : ";
     129            0 :   for( const std::string& ip_addr : m_ip_addr) {
     130            0 :       s << ip_addr << " ";
     131              :   }
     132              : 
     133            0 :   TLOG() << s.str();
     134              : 
     135            0 :   for( const std::string& ip_addr : m_ip_addr) {
     136            0 :     IpAddr ip_addr_struct(ip_addr);
     137            0 :     m_ip_addr_bin.push_back(udp::ip_address_dotdecimal_to_binary(
     138            0 :         ip_addr_struct.addr_bytes[0],
     139            0 :         ip_addr_struct.addr_bytes[1],
     140            0 :         ip_addr_struct.addr_bytes[2],
     141            0 :         ip_addr_struct.addr_bytes[3]
     142              :     ));
     143              :   } 
     144              : 
     145              : 
     146            0 :   auto iface_cfg = receiver->get_configuration();
     147              : 
     148            0 :   m_with_flow = iface_cfg->get_flow_control();
     149            0 :   m_prom_mode = iface_cfg->get_promiscuous_mode();;
     150            0 :   m_mtu = iface_cfg->get_mtu();
     151            0 :   m_max_block_words = unsigned(m_mtu) / sizeof(uint64_t);
     152            0 :   m_rx_ring_size = iface_cfg->get_rx_ring_size();
     153            0 :   m_tx_ring_size = iface_cfg->get_tx_ring_size();
     154            0 :   m_num_mbufs = iface_cfg->get_num_bufs();
     155            0 :   m_burst_size = iface_cfg->get_burst_size();
     156            0 :   m_mbuf_cache_size = iface_cfg->get_mbuf_cache_size();
     157              : 
     158            0 :   m_lcore_sleep_ns = iface_cfg->get_lcore_sleep_us() * 1000;
     159            0 :   m_socket_id = rte_eth_dev_socket_id(m_iface_id);
     160              : 
     161            0 :   m_iface_id_str = iface_cfg->UID();
     162              : 
     163              : 
     164              :   // Here is my list of cores
     165            0 :   for( const auto* proc_res : iface_cfg->get_used_lcores()) {
     166            0 :     m_rte_cores.insert(m_rte_cores.end(), proc_res->get_cpu_cores().begin(), proc_res->get_cpu_cores().end());
     167              :   }
     168            0 :   if(std::find(m_rte_cores.begin(), m_rte_cores.end(), rte_get_main_lcore())!=m_rte_cores.end()) {
     169            0 :     TLOG() << "ERROR! Throw ERS error here that LCore=0 should not be used, as it's a control RTE core!";
     170            0 :     throw std::runtime_error(std::string("ERROR! Throw ERS here that LCore=0 should not be used, as it's a control RTE core!"));
     171              :   }
     172              : 
     173              :   // iterate through active streams
     174              : 
     175              :   // Create a map of sender ni (ip) to streams from the d2d connection object
     176            0 :   std::map<std::string, std::map<uint, uint>> ip_to_stream_src_groups;
     177              : 
     178            0 :   for( auto nw_sender : nw_senders ) {
     179            0 :     auto sender_ni = nw_sender->get_uses();
     180              : 
     181            0 :     std::string tx_ip = sender_ni->get_ip_address().at(0);
     182              : 
     183              :     // Loop over streams
     184            0 :     for ( auto det_stream : nw_sender->get_streams() ) {
     185              : 
     186              :       // Only include active streams
     187            0 :       if ( std::find(active_streams.begin(), active_streams.end(), det_stream) == active_streams.end()) 
     188            0 :         continue;
     189              :         
     190            0 :       uint32_t tx_geo_stream_id = det_stream->get_geo_id()->get_stream_id();
     191              :       // (tx, geo_stream) -> source_id
     192            0 :       ip_to_stream_src_groups[tx_ip][tx_geo_stream_id] = det_stream->get_source_id();
     193              :     }
     194            0 :   }
     195              : 
     196              : 
     197              : // RS FIXME: Is this RX_Q bump is enough??? I don't remember how the RX_Qs are assigned... 
     198            0 :   uint32_t core_idx(0), rx_q(0); // RS FIXME: Ensure that no RX_Q=0 is used for UDP RX, ever.
     199              : 
     200            0 :   m_rx_qs.insert(rx_q);
     201            0 :   m_arp_rx_queue = rx_q;
     202            0 :   ++rx_q;
     203              : 
     204              :   // Build additional helper maps
     205            0 :   for( const auto& [tx_ip, strm_src] : ip_to_stream_src_groups) {
     206            0 :     m_ips.insert(tx_ip);
     207            0 :     m_rx_qs.insert(rx_q);
     208            0 :     m_num_frames_rxq[rx_q] = { 0 };
     209            0 :     m_num_bytes_rxq[rx_q] = { 0 };
     210              : 
     211            0 :     m_rx_core_map[m_rte_cores[core_idx]][rx_q] = tx_ip;
     212            0 :     m_stream_id_to_source_id[rx_q] = strm_src;
     213              :     // TLOG() << "+++ ip, rx_q : (" << tx_ip << ", " << rx_q << ") -> " << strm_src;
     214              : 
     215            0 :     ++rx_q;
     216            0 :     if ( ++core_idx == m_rte_cores.size()) {
     217            0 :       core_idx = 0;
     218              :     }
     219              :   }
     220              : 
     221              :   // Log mapping
     222            0 :   for (auto const& [lcore, rx_qs] : m_rx_core_map) {
     223            0 :     TLOG() << "Lcore=" << lcore << " handles: ";
     224            0 :     for (auto const& [rx_q, src_ip] : rx_qs) {
     225            0 :       TLOG() << " rx_q=" << rx_q << " src_ip=" << src_ip;
     226              :     }
     227              :   }
     228              : 
     229              :   // Adding single TX queue for ARP responses
     230            0 :   TLOG() << "Append TX_Q=0 for ARP responses.";
     231            0 :   m_tx_qs.insert(0);
     232              : 
     233              :   // Strict parsing (DAQ protocol) or pass through of UDP payloads to SourceModels
     234            0 :   for (auto const& [sid, src_concept] : m_sources) {
     235            0 :     if (!src_concept->m_daq_protocol_ensured) {
     236            0 :       m_strict_parsing = false;
     237              :     }
     238              :   }
     239              : 
     240            0 : }
     241              : 
     242              : 
     243              : //-----------------------------------------------------------------------------
     244            0 : IfaceWrapper::~IfaceWrapper()
     245              : {
     246            0 :   TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << "IfaceWrapper destructor called. First stop check, then closing iface.";
     247              :     
     248            0 :   struct rte_flow_error error;
     249            0 :   rte_flow_flush(m_iface_id, &error);
     250              :   //graceful_stop();
     251              :   //close_iface();
     252            0 :   TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << "IfaceWrapper destroyed.";
     253            0 : }
     254              : 
     255              : 
     256              : //-----------------------------------------------------------------------------
     257              : void
     258            0 : IfaceWrapper::allocate_mbufs() 
     259              : {
     260            0 :   TLOG() << "Allocating pools and mbufs for UDP, GARP, and ARP.";
     261              : 
     262              :   // Pools for UDP RX messages 
     263            0 :   for (size_t i=0; i<m_rx_qs.size(); ++i) {
     264            0 :     std::stringstream bufss;
     265            0 :     bufss << "MBP-" << m_iface_id << '-' << i;
     266            0 :     TLOG() << "Acquire pool with name=" << bufss.str() << " for iface_id=" << m_iface_id << " rxq=" << i;
     267            0 :     m_mbuf_pools[i] = ealutils::get_mempool(bufss.str(), m_num_mbufs, m_mbuf_cache_size, 16384, m_socket_id);
     268            0 :     m_bufs[i] = (rte_mbuf**) malloc(sizeof(struct rte_mbuf*) * m_burst_size);
     269              :     // No need to alloc?
     270              :     // rte_pktmbuf_alloc_bulk(m_mbuf_pools[i].get(), m_bufs[i], m_burst_size);
     271            0 :   }
     272              : 
     273              :   // Pools for GARP messages
     274            0 :   std::stringstream garpss;
     275            0 :   garpss << "GARPMBP-" << m_iface_id;
     276            0 :   TLOG() << "Acquire GARP pool with name=" << garpss.str() << " for iface_id=" << m_iface_id;
     277            0 :   m_garp_mbuf_pool = ealutils::get_mempool(garpss.str());
     278            0 :   m_garp_bufs[0] = (rte_mbuf**) malloc(sizeof(struct rte_mbuf*) * m_burst_size);
     279            0 :   rte_pktmbuf_alloc_bulk(m_garp_mbuf_pool.get(), m_garp_bufs[0], m_burst_size);
     280              : 
     281              :   // Pools for ARP request/responses
     282            0 :   std::stringstream arpss;
     283            0 :   arpss << "ARPMBP-" << m_iface_id;
     284            0 :   TLOG() << "Acquire ARP pool with name=" << arpss.str() << " for iface_id=" << m_iface_id;
     285            0 :   m_arp_mbuf_pool = ealutils::get_mempool(arpss.str());
     286            0 :   m_arp_bufs[0] = (rte_mbuf**) malloc(sizeof(struct rte_mbuf*) * m_burst_size);
     287            0 :   rte_pktmbuf_alloc_bulk(m_arp_mbuf_pool.get(), m_arp_bufs[0], m_burst_size);
     288              : 
     289            0 : }
     290              : 
     291              : 
     292              : //-----------------------------------------------------------------------------
     293              : void
     294            0 : IfaceWrapper::setup_interface()
     295              : {
     296            0 :   TLOG() << "Initialize interface " << m_iface_id;
     297            0 :   bool with_reset = true, with_mq_mode = true; // go to config
     298            0 :   bool check_link_status = false;
     299              : 
     300            0 :   int retval = ealutils::iface_init(m_iface_id, m_rx_qs.size(), m_tx_qs.size(), m_rx_ring_size, m_tx_ring_size, m_mbuf_pools, with_reset, with_mq_mode, check_link_status);
     301            0 :   if (retval != 0 ) {
     302            0 :     throw FailedToSetupInterface(ERS_HERE, m_iface_id, retval);
     303              :   }
     304              :   // Promiscuous mode
     305            0 :   ealutils::iface_promiscuous_mode(m_iface_id, m_prom_mode); // should come from config
     306            0 : }
     307              : 
     308              : 
     309              : //-----------------------------------------------------------------------------
     310              : void
     311            0 : IfaceWrapper::setup_flow_steering()
     312              : {
     313              :   // Flow steering setup
     314            0 :   TLOG() << "Configuring Flow steering rules for iface=" << m_iface_id;
     315            0 :   struct rte_flow_error error;
     316            0 :   struct rte_flow *flow;
     317            0 :   TLOG() << "Attempt to flush previous flow rules...";
     318            0 :   rte_flow_flush(m_iface_id, &error);
     319              : #warning RS: FIXME -> Check for flow flush return!
     320              : 
     321            0 :   TLOG() << "Create control flow rules (ARP) assinged to rxq=" << m_arp_rx_queue;
     322            0 :         flow = generate_arp_flow(m_iface_id, m_arp_rx_queue, &error);
     323            0 :   if (not flow) { // ers::fatal
     324            0 :         TLOG() << "ARP flow  can't be created for " << m_arp_rx_queue
     325            0 :          << " Error type: " << (unsigned)error.type
     326            0 :          << " Message: '" << error.message << "'";
     327            0 :         ers::fatal(dunedaq::datahandlinglibs::InitializationError(
     328            0 :           ERS_HERE, "Couldn't create ARP flow API rules!"));
     329            0 :         rte_exit(EXIT_FAILURE, "error in creating ARP flow");
     330              :       }
     331              : 
     332            0 :   TLOG() << "Create flow rules for UDP RX.";
     333            0 :   for (auto const& [lcoreid, rxqs] : m_rx_core_map) {
     334            0 :     for (auto const& [rxqid, srcip] : rxqs) {
     335              :       // Put the IP numbers temporarily in a vector, so they can be converted easily to uint32_t
     336            0 :       TLOG() << "Creating flow rule for src_ip=" << srcip << " assigned to rxq=" << rxqid;
     337            0 :       size_t ind = 0, current_ind = 0;
     338            0 :       std::vector<uint8_t> v;
     339            0 :       for (int i = 0; i < 4; ++i) {
     340            0 :         v.push_back(std::stoi(srcip.substr(current_ind, srcip.size() - current_ind), &ind));
     341            0 :         current_ind += ind + 1;
     342              :       }
     343              : 
     344            0 :       flow = generate_ipv4_flow(m_iface_id, rxqid,
     345            0 :         RTE_IPV4(v[0], v[1], v[2], v[3]), 0xffffffff, 0, 0, &error);
     346              : 
     347            0 :       if (not flow) { // ers::fatal
     348            0 :         TLOG() << "Flow can't be created for " << rxqid
     349            0 :          << " Error type: " << (unsigned)error.type
     350            0 :          << " Message: '" << error.message << "'";
     351            0 :         ers::fatal(dunedaq::datahandlinglibs::InitializationError(
     352            0 :           ERS_HERE, "Couldn't create Flow API rules!"));
     353            0 :         rte_exit(EXIT_FAILURE, "error in creating flow");
     354              :       }
     355            0 :     }
     356              :   }
     357              : 
     358            0 :   return;
     359              : }
     360              : 
     361              : //-----------------------------------------------------------------------------
     362              : void
     363            0 : IfaceWrapper::setup_xstats() 
     364              : {
     365              :   // Stats setup
     366            0 :   m_iface_xstats.setup(m_iface_id);
     367            0 :   m_iface_xstats.reset_counters();
     368            0 : }
     369              : 
     370              : 
     371              : //-----------------------------------------------------------------------------
     372              : void
     373            0 : IfaceWrapper::start()
     374              : {
     375              :   // Reset counters for RX queues
     376            0 :   for (auto const& [rx_q, _] : m_num_frames_rxq ) {
     377            0 :     m_num_frames_rxq[rx_q] = { 0 };
     378            0 :     m_num_bytes_rxq[rx_q] = { 0 };
     379            0 :     m_num_full_bursts[rx_q] = { 0 };
     380            0 :     m_max_burst_size[rx_q] = { 0 };
     381              :   }
     382              : 
     383              :   // Reset counters for rte_workers
     384            0 :   for (auto const& [lcore, _] : m_rx_core_map) {
     385            0 :     m_num_unhandled_non_ipv4[lcore] = { 0 };
     386            0 :     m_num_unhandled_non_udp[lcore] = { 0 };
     387            0 :     m_num_unhandled_non_jumbo_udp[lcore] = { 0 };
     388              :   }
     389              : 
     390            0 :   m_lcore_enable_flow.store(false);
     391            0 :   m_lcore_quit_signal.store(false);
     392            0 :   TLOG() << "Interface id=" << m_iface_id <<" Launching GARP thread with garp_func...";
     393            0 :   m_garp_thread = std::thread(&IfaceWrapper::garp_func, this);
     394              :   
     395            0 :   TLOG() << "Interface id=" << m_iface_id << " starting ARP LCore processor:";
     396            0 :   m_arp_thread = std::thread(&IfaceWrapper::IfaceWrapper::arp_response_runner, this, nullptr);
     397              : 
     398              : 
     399            0 :   TLOG() << "Interface id=" << m_iface_id << " starting LCore processors:";
     400            0 :   for (auto const& [lcoreid, _] : m_rx_core_map) {
     401            0 :     int ret = rte_eal_remote_launch((int (*)(void*))(&IfaceWrapper::rx_runner), this, lcoreid);
     402            0 :     TLOG() << "  -> LCore[" << lcoreid << "] launched with return code=" << ret << "   " << (ret < 0 ? rte_strerror(-ret) : "");
     403              :   }
     404            0 : }
     405              : 
     406              : //-----------------------------------------------------------------------------
     407              : void
     408            0 : IfaceWrapper::stop()
     409              : {
     410            0 :   m_lcore_enable_flow.store(false);
     411            0 :   m_lcore_quit_signal.store(true);
     412              :   // Stop GARP sender thread  
     413            0 :   if (m_garp_thread.joinable()) {
     414            0 :     m_garp_thread.join();
     415              :   } else {
     416            0 :     TLOG() << "GARP thread is not joinable!";
     417              :   }
     418              : 
     419            0 :   if (m_arp_thread.joinable()) {
     420            0 :     m_arp_thread.join();
     421              :   } else {
     422            0 :     TLOG() << "ARP thread is not joinable!";
     423              :   }
     424            0 : }
     425              : /*
     426              : void
     427              : IfaceWrapper::scrap()
     428              : {
     429              :   struct rte_flow_error error;
     430              :   rte_flow_flush(m_iface_id, &error);
     431              : }
     432              : */
     433              : 
     434              : 
     435              : //-----------------------------------------------------------------------------
     436              : void 
     437            0 : IfaceWrapper::generate_opmon_data() {
     438              : 
     439              :   // Poll stats from HW
     440            0 :   m_iface_xstats.poll();
     441              : 
     442            0 :   opmon::EthStats s;
     443            0 :   s.set_ipackets( m_iface_xstats.m_eth_stats.ipackets );
     444            0 :   s.set_opackets( m_iface_xstats.m_eth_stats.opackets );
     445            0 :   s.set_ibytes( m_iface_xstats.m_eth_stats.ibytes );
     446            0 :   s.set_obytes( m_iface_xstats.m_eth_stats.obytes );
     447            0 :   s.set_imissed( m_iface_xstats.m_eth_stats.imissed );
     448            0 :   s.set_ierrors( m_iface_xstats.m_eth_stats.ierrors );
     449            0 :   s.set_oerrors( m_iface_xstats.m_eth_stats.oerrors );
     450            0 :   s.set_rx_nombuf( m_iface_xstats.m_eth_stats.rx_nombuf );
     451            0 :   publish( std::move(s) );
     452              : 
     453            0 :   if(m_iface_xstats.m_eth_stats.imissed > 0){
     454            0 :     ers::warning(PacketErrors(ERS_HERE, m_iface_id_str, "missed", m_iface_xstats.m_eth_stats.imissed));
     455              :   }
     456            0 :   if(m_iface_xstats.m_eth_stats.ierrors > 0){
     457            0 :     ers::warning(PacketErrors(ERS_HERE, m_iface_id_str, "dropped", m_iface_xstats.m_eth_stats.ierrors));
     458              :   }
     459              : 
     460              :   // loop over all the xstats information
     461            0 :   opmon::EthXStatsInfo xinfos;
     462            0 :   opmon::EthXStatsErrors xerrs;
     463            0 :   std::map<std::string, opmon::QueueEthXStats> xq;
     464              : 
     465            0 :   for (int i = 0; i < m_iface_xstats.m_len; ++i) {
     466              :     
     467            0 :     std::string name(m_iface_xstats.m_xstats_names[i].name);
     468              :     
     469              :     // first we select the info from the queue
     470            0 :     static std::regex queue_regex(R"((rx|tx)_q(\d+)_([^_]+))");
     471            0 :     std::smatch match;
     472              :     
     473            0 :     if ( std::regex_match(name, match, queue_regex) ) {
     474            0 :       auto queue_name = match[1].str() + '-' + match[2].str();
     475            0 :       auto & entry = xq[queue_name];
     476            0 :       try {
     477            0 :         opmonlib::set_value( entry, match[3], m_iface_xstats.m_xstats_values[i] );
     478            0 :       } catch ( const ers::Issue & e ) {
     479            0 :         ers::warning( MetricPublishFailed( ERS_HERE, name, e) );
     480            0 :       }
     481            0 :       continue;
     482            0 :     } 
     483              : 
     484            0 :     google::protobuf::Message * metric_p = nullptr;
     485            0 :     static std::regex err_regex(R"(.+error.*)");
     486            0 :     if ( std::regex_match( name, err_regex ) ) metric_p = & xerrs;
     487            0 :     else  metric_p = & xinfos;
     488              :     
     489            0 :     try { 
     490            0 :       opmonlib::set_value(*metric_p, name, m_iface_xstats.m_xstats_values[i]);
     491            0 :     } catch ( const ers::Issue & e ) {
     492            0 :       ers::warning( MetricPublishFailed( ERS_HERE, name, e) );
     493            0 :     }
     494              :     
     495            0 :   } // loop over xstats
     496              :   
     497              :   // Reset HW counters
     498            0 :   m_iface_xstats.reset_counters();
     499              :   
     500              :   // finally we publish the information
     501            0 :   publish( std::move(xinfos) );
     502            0 :   publish( std::move(xerrs) );
     503            0 :   for ( auto [id, stat] : xq ) {
     504            0 :     publish( std::move(stat), {{"queue", id}} );
     505            0 :   }
     506              :   
     507            0 :   for( const auto& [src_rx_q,_] : m_num_frames_rxq) {
     508            0 :     opmon::QueueInfo i;
     509            0 :     i.set_packets_received( m_num_frames_rxq[src_rx_q].load() );
     510            0 :     i.set_bytes_received( m_num_bytes_rxq[src_rx_q].load() );
     511            0 :     i.set_full_rx_burst( m_num_full_bursts[src_rx_q].load() );
     512            0 :     i.set_max_burst_size( m_max_burst_size[src_rx_q].exchange(0) );
     513              :     
     514            0 :     publish( std::move(i), {{"queue", std::to_string(src_rx_q)}} );
     515            0 :   }
     516              : 
     517              :   // RTE Workers
     518            0 :   for (auto const& [lcore, _] : m_rx_core_map) {
     519            0 :     opmon::RTEWorkerInfo info;
     520            0 :     info.set_num_unhandled_non_ipv4( m_num_unhandled_non_ipv4[lcore].exchange(0) );
     521            0 :     info.set_num_unhandled_non_udp( m_num_unhandled_non_udp[lcore].exchange(0) ); 
     522            0 :     info.set_num_unhandled_non_jumbo_udp( m_num_unhandled_non_jumbo_udp[lcore].exchange(0) );
     523            0 :     publish( std::move(info), {{"rte_worker_id", std::to_string(lcore)}} );
     524            0 :   }
     525              : 
     526            0 :   for ( auto & [id, counter] : m_num_unexid_frames ) {
     527            0 :     auto val = counter.exchange(0);
     528            0 :     if ( val > 0 ) {
     529            0 :       ers::warning( UnexpectedStreamID( ERS_HERE, id, val ) );
     530              :     }
     531              :   }
     532            0 : }
     533              : 
     534              : //-----------------------------------------------------------------------------
     535              : void
     536            0 : IfaceWrapper::garp_func()
     537              : {  
     538            0 :   TLOG() << "Launching GARP sender...";
     539            0 :   while(m_run_marker.load()) {
     540            0 :     for( const auto& ip_addr_bin : m_ip_addr_bin ) {
     541            0 :       arp::pktgen_send_garp(m_garp_bufs[0][0], m_iface_id, ip_addr_bin);   
     542              :     }
     543            0 :     ++m_garps_sent;
     544            0 :     std::this_thread::sleep_for(std::chrono::seconds(1));
     545              :   }
     546            0 :   TLOG() << "GARP function joins.";
     547            0 : }
     548              : 
     549              : //-----------------------------------------------------------------------------
     550              : void
     551            0 : IfaceWrapper::parse_udp_payload(int src_rx_q, char* payload, std::size_t size)
     552              : {
     553              :   // Pointers for parsing and to the end of the UDP payload.
     554            0 :   char* plptr = payload;
     555            0 :   const char* plendptr = payload + size;
     556              : 
     557              : 
     558              :   // Process every DAQEth frame within UDP payload
     559            0 :   while ( plptr + sizeof(dunedaq::detdataformats::DAQEthHeader) < plendptr ) { // Scatter loop start
     560              : 
     561              : 
     562              :     // Reinterpret directly to DAQEthHeader
     563            0 :     auto daqhdrptr = reinterpret_cast<dunedaq::detdataformats::DAQEthHeader*>(plptr);
     564              : 
     565              :     // Check number of DAQEth block_words 
     566            0 :     unsigned block_words = unsigned(daqhdrptr->block_length) - 1; // removing timestamp word from the block length.
     567              : 
     568              :     // Check for corrupted DAQEth frame length   
     569            0 :     if ( block_words == 0 || block_words > m_max_block_words ) {
     570              :       // RS FIXME: corrupted length -> stop, add opmon counter or warning
     571            0 :       return;
     572              :     }    
     573              : 
     574              :     // Calculate data bytes after DAQEthHeader based on block_words
     575            0 :     std::size_t data_bytes = std::size_t(block_words) * sizeof(dunedaq::detdataformats::DAQEthHeader::word_t);
     576              : 
     577              :     // Grab end pointer of DAQEth frame
     578            0 :     char* daqframe_endptr = plptr + sizeof(dunedaq::detdataformats::DAQEthHeader) + data_bytes;
     579              : 
     580              :     // Check if full DAQEth frame fits
     581            0 :     if ( daqframe_endptr > plendptr ) {
     582              :       // RS FIXME: truncated payload -> stop, add opmon counter or warning
     583              :       return;
     584              :     }
     585              : 
     586              :     // Calculate DAQEth frame size (used both for handling and advancing)
     587            0 :     std::size_t daq_frame_size = sizeof(dunedaq::detdataformats::DAQEthHeader) + data_bytes;
     588              : 
     589              :     // Sadly, cannot take a reference to a bitfield
     590            0 :     uint strm_id = daqhdrptr->stream_id;
     591              : 
     592              :     // Check that stream id is corresponds to a registered source
     593            0 :     auto& strm_to_src = m_stream_id_to_source_id[src_rx_q];
     594              : 
     595            0 :     if ( auto strm_it = strm_to_src.find(strm_id); strm_it != strm_to_src.end() ) {
     596              : 
     597            0 :       m_sources[strm_it->second]->handle_daq_frame((char*)daqhdrptr, daq_frame_size);
     598              : 
     599              : 
     600              :     } else {
     601              :       // Really bad -> unexpeced StreamID in UDP Payload.
     602              :       // This check is needed in order to avoid dynamically add thousands
     603              :       // of Sources on the fly, in case the data corruption is extremely severe.
     604            0 :       if (m_num_unexid_frames.count(strm_id) == 0) {
     605            0 :         m_num_unexid_frames[strm_id] = 0;
     606              :       }
     607            0 :       m_num_unexid_frames[strm_id]++;
     608              :     }
     609              :       
     610              :     // Advance to next payload
     611            0 :     plptr += daq_frame_size;
     612              : 
     613              :   } // Scatter loop end
     614              : 
     615              : }
     616              : 
     617              : //-----------------------------------------------------------------------------
     618              : void
     619            0 : IfaceWrapper::passthrough_udp_payload(int src_rx_q, char* payload, std::size_t size)
     620              : {
     621              :   // Get DAQ Header and its StreamID
     622            0 :   auto* daqhdrptr = reinterpret_cast<dunedaq::detdataformats::DAQEthHeader*>(payload);
     623              : 
     624              : 
     625              :     // Sadly, cannot take a reference to a bitfield
     626            0 :     uint strm_id = daqhdrptr->stream_id;
     627              : 
     628              :     // Check that stream id is corresponds to a registered source
     629            0 :     auto& strm_to_src = m_stream_id_to_source_id[src_rx_q];
     630              :     
     631              : 
     632            0 :     if ( auto strm_it = strm_to_src.find(strm_id); strm_it != strm_to_src.end() ) {
     633            0 :       m_sources[strm_it->second]->handle_daq_frame(payload, size);
     634              :     } else {
     635              :       // Really bad -> unexpeced StreamID in UDP Payload.
     636              :       // This check is needed in order to avoid dynamically add thousands
     637              :       // of Sources on the fly, in case the data corruption is extremely severe.
     638            0 :       if (m_num_unexid_frames.count(strm_id) == 0) {
     639            0 :         m_num_unexid_frames[strm_id] = 0;
     640              :       }
     641            0 :       m_num_unexid_frames[strm_id]++;
     642              :     }
     643            0 : }
     644              : 
     645              : } // namespace dpdklibs
     646              : } // namespace dunedaq
     647              : 
     648              : // 
     649              : #include "detail/IfaceWrapper.hxx"
        

Generated by: LCOV version 2.0-1