LCOV - code coverage report
Current view: top level - dpdklibs/plugins - DPDKReaderModule.cpp (source / functions) Coverage Total Hit
Test: code.result Lines: 0.0 % 159 0
Test Date: 2025-12-21 13:07:08 Functions: 0.0 % 25 0

            Line data    Source code
       1              : /**
       2              :  * @file DPDKReaderModule.cpp DPDKReaderModule DAQModule implementation
       3              :  *
       4              :  * This is part of the DUNE DAQ Application Framework, copyright 2020.
       5              :  * Licensing/copyright details are in the COPYING file that you should have
       6              :  * received with this code.
       7              :  */
       8              : //#include "dpdklibs/nicreader/Nljs.hpp"
       9              : 
      10              : #include "appfwk/ConfigurationManager.hpp"
      11              : #include "appfwk/ConfigurationManager.hpp"
      12              : 
      13              : #include "appmodel/NetworkDetectorToDaqConnection.hpp"
      14              : 
      15              : #include "appmodel/DataReaderModule.hpp"
      16              : #include "appmodel/DPDKReaderConf.hpp"
      17              : #include "appmodel/DPDKPortConfiguration.hpp"
      18              : #include "confmodel/ProcessingResource.hpp"
      19              : #include "confmodel/NetworkDevice.hpp"
      20              : #include "confmodel/QueueWithSourceId.hpp"
      21              : #include "confmodel/DetectorStream.hpp"
      22              : 
      23              : #include "logging/Logging.hpp"
      24              : 
      25              : #include "datahandlinglibs/DataHandlingIssues.hpp"
      26              : #include "datahandlinglibs/utils/BufferCopy.hpp" 
      27              : 
      28              : #include "dpdklibs/EALSetup.hpp"
      29              : #include "dpdklibs/RTEIfaceSetup.hpp"
      30              : #include "dpdklibs/udp/Utils.hpp"
      31              : #include "dpdklibs/udp/PacketCtor.hpp"
      32              : #include "dpdklibs/FlowControl.hpp"
      33              : #include "dpdklibs/Issues.hpp"
      34              : #include "CreateSource.hpp"
      35              : #include "DPDKReaderModule.hpp"
      36              : 
      37              : #include <cinttypes>
      38              : #include <chrono>
      39              : #include <sstream>
      40              : #include <memory>
      41              : #include <string>
      42              : #include <thread>
      43              : #include <utility>
      44              : #include <vector>
      45              : #include <ios>
      46              : 
      47              : 
      48              : /**
      49              :  * @brief Name used by TRACE TLOG calls from this source file
      50              :  */
      51              : #define TRACE_NAME "DPDKReaderModule" // NOLINT
      52              : 
      53              : /**
      54              :  * @brief TRACE debug levels used in this source file
      55              :  */
      56              : enum
      57              : {
      58              :   TLVL_ENTER_EXIT_METHODS = 5,
      59              :   TLVL_WORK_STEPS = 10,
      60              :   TLVL_BOOKKEEPING = 15
      61              : };
      62              : 
      63              : namespace dunedaq {
      64              : namespace dpdklibs {
      65              : 
      66            0 : DPDKReaderModule::DPDKReaderModule(const std::string& name)
      67              :   : DAQModule(name),
      68            0 :     m_run_marker{ false }
      69              : {
      70            0 :   register_command("conf", &DPDKReaderModule::do_configure);
      71            0 :   register_command("start", &DPDKReaderModule::do_start);
      72            0 :   register_command("stop_trigger_sources", &DPDKReaderModule::do_stop);
      73            0 :   register_command("scrap", &DPDKReaderModule::do_scrap);
      74            0 : }
      75              : 
      76            0 : DPDKReaderModule::~DPDKReaderModule()
      77              : {
      78            0 : }
      79              : 
      80              : inline void
      81            0 : tokenize(std::string const& str, const char delim, std::vector<std::string>& out)
      82              : {
      83            0 :   std::size_t start;
      84            0 :   std::size_t end = 0;
      85            0 :   while ((start = str.find_first_not_of(delim, end)) != std::string::npos) {
      86            0 :     end = str.find(delim, start);
      87            0 :     out.push_back(str.substr(start, end - start));
      88              :   }
      89            0 : }
      90              : 
      91              : void
      92            0 : DPDKReaderModule::init(const std::shared_ptr<appfwk::ConfigurationManager> mcfg )
      93              : {
      94            0 :   auto mdal = mcfg->get_dal<appmodel::DataReaderModule>(get_name());
      95            0 :   m_cfg = mcfg;
      96            0 :   if (mdal->get_outputs().empty()) {
      97            0 :     auto err = datahandlinglibs::InitializationError(ERS_HERE, "No outputs defined for NIC reader in configuration.");
      98            0 :     ers::fatal(err);
      99            0 :     throw err;
     100            0 :   }
     101              : 
     102              :   // Loop over output queues, extract source ids and create source model objects
     103            0 :   for (auto con : mdal->get_outputs()) {
     104            0 :     auto queue = con->cast<confmodel::QueueWithSourceId>();
     105            0 :     if (queue == nullptr) {
     106            0 :       auto err = datahandlinglibs::InitializationError(ERS_HERE, "Outputs are not of type QueueWithGeoId.");
     107            0 :       ers::fatal(err);
     108            0 :       throw err;
     109            0 :     }
     110              : 
     111              :     // Check for CB prefix indicating Callback use
     112            0 :     const char delim = '_';
     113            0 :     std::string target = queue->UID();
     114            0 :     std::vector<std::string> words;
     115            0 :     tokenize(target, delim, words);
     116            0 :     int sourceid = -1;
     117              : 
     118            0 :     bool callback_mode = false;
     119            0 :     if (words.front() == "cb") {
     120              :       callback_mode = true;
     121              :     }
     122              : 
     123              :     // TODO: add nullpointer check against misconfiguration
     124            0 :     auto ptr = m_sources[queue->get_source_id()] = createSourceModel(queue->UID(), callback_mode);
     125            0 :     register_node(queue->UID(), ptr);
     126              :     // m_sources[queue->get_source_id()]->init();
     127            0 :   }
     128            0 : }
     129              : 
     130              : void
     131            0 : DPDKReaderModule::do_configure(const CommandData_t& /*args*/)
     132              : {
     133            0 :   TLOG() << get_name() << ": Entering do_conf() method";
     134              :   //auto session = appfwk::ModuleManager::get()->get_session();
     135            0 :   auto mdal = m_cfg->get_dal<appmodel::DataReaderModule>(get_name());
     136            0 :   auto module_conf = mdal->get_configuration()->cast<appmodel::DPDKReaderConf>();
     137            0 :   auto res_set = mdal->get_connections();
     138              :   // EAL setup
     139            0 :   TLOG() << "Setting up EAL with params from config.";
     140            0 :   std::vector<std::string> eal_params ;
     141            0 :   eal_params.push_back("eal_cmdline");
     142            0 :   eal_params.push_back("--proc-type=primary");
     143              : 
     144              :   // Construct the pcie devices allowed mask
     145            0 :   std::string first_pcie_addr;
     146            0 :   bool is_first_pcie_addr = true;
     147            0 :   std::deque<uint16_t> rte_cores;
     148              : 
     149            0 :   std::vector<const appmodel::NetworkDetectorToDaqConnection*> d2d_conns;
     150            0 :   for (auto res : res_set) {
     151            0 :     auto connection = res->cast<appmodel::NetworkDetectorToDaqConnection>();
     152            0 :     if (connection == nullptr) {
     153            0 :       datahandlinglibs::GenericConfigurationError err(
     154            0 :           ERS_HERE, "DetectorToDaqConnection configuration failed due expected but unavailable connection!"
     155            0 :         );
     156            0 :       ers::fatal(err);
     157            0 :       throw err;      
     158            0 :     }
     159            0 :     if (connection->is_disabled(*(m_cfg->get_session()))) {
     160            0 :             continue;
     161              :     }
     162              : 
     163            0 :     d2d_conns.push_back(connection);
     164              : 
     165            0 :     auto receiver = connection->receiver()->cast<appmodel::DPDKReceiver>();
     166            0 :     if (!receiver) {
     167            0 :       throw datahandlinglibs::InitializationError(
     168            0 :         ERS_HERE, fmt::format("Found {} of type {} in connection {} while expecting type DPDKReceiver", receiver->class_name(), receiver->UID(), connection->UID())
     169            0 :       );
     170              :     }
     171              : 
     172            0 :     auto net_device = receiver->get_uses()->cast<confmodel::NetworkDevice>();
     173              : 
     174            0 :     if (is_first_pcie_addr) {
     175            0 :       first_pcie_addr = net_device->get_pcie_addr();
     176              :       is_first_pcie_addr = false;
     177              :     }
     178            0 :     eal_params.push_back("-a");
     179            0 :     eal_params.push_back(net_device->get_pcie_addr());
     180              : 
     181            0 :     for ( const auto* proc_res : receiver->get_configuration()->get_used_lcores() ) {
     182            0 :       rte_cores.insert(rte_cores.end(), proc_res->get_cpu_cores().begin(), proc_res->get_cpu_cores().end());
     183              :     }
     184              :   }
     185              : 
     186            0 :   uint16_t main_core = rte_get_main_lcore();
     187            0 :   if (std::find(rte_cores.begin(), rte_cores.end(), main_core) != rte_cores.end()) {
     188            0 :     throw MainCoreConflict(ERS_HERE, main_core);
     189              :   }
     190            0 :   rte_cores.push_front(main_core);
     191              : 
     192            0 :   eal_params.push_back("-l");
     193            0 :   eal_params.push_back(fmt::format("{}", fmt::join(rte_cores,",")));
     194              : 
     195              : 
     196              :   // Use the first pcie device id as file prefix
     197              :   // FIXME: Review this strategy - should work in most of cases, but it could be 
     198              :   // confusing in configs with multiple connections
     199            0 :   eal_params.push_back(fmt::format("--file-prefix={}", first_pcie_addr));
     200              : 
     201            0 :   eal_params.push_back(module_conf->get_eal_args());
     202              : 
     203            0 :   ealutils::init_eal(eal_params);
     204              : 
     205              :   // Get available connections from EAL
     206            0 :   auto available_ifaces = ifaceutils::get_num_available_ifaces();
     207            0 :   TLOG() << "Number of available connections: " << available_ifaces;
     208            0 :   for (unsigned int ifc_id=0; ifc_id<available_ifaces; ++ifc_id) {
     209            0 :     std::string mac_addr_str = ifaceutils::get_iface_mac_str(ifc_id);
     210            0 :     std::string pci_addr_str = ifaceutils::get_iface_pci_str(ifc_id);
     211            0 :     m_mac_to_id_map[mac_addr_str] = ifc_id;
     212              :     // TODO: remove
     213            0 :     m_pci_to_id_map[pci_addr_str] = ifc_id;
     214            0 :     TLOG() << "Available iface with MAC=" << mac_addr_str << " PCIe=" <<  pci_addr_str << " logical ID=" << ifc_id;
     215            0 :   }
     216              : 
     217            0 :   for (auto d2d_conn : d2d_conns) {
     218            0 :     auto dpdk_receiver = d2d_conn->get_net_receiver()->cast<appmodel::DPDKReceiver>();
     219              : 
     220              :     // Prepare the list of active senders (network transmitters) and active streams
     221            0 :     std::vector<const appmodel::NWDetDataSender*> nw_senders;
     222            0 :     std::vector<const confmodel::DetectorStream*> active_streams;
     223              : 
     224            0 :     for ( auto nw_sender : d2d_conn->get_net_senders() ) {
     225            0 :       TLOG() << "Sender " << nw_sender->UID() << "is " << nw_sender->is_disabled(*(m_cfg->get_session()));
     226              : 
     227            0 :       if ( ! nw_sender->is_disabled(*(m_cfg->get_session())) ) {
     228            0 :         nw_senders.push_back(nw_sender);
     229              : 
     230            0 :         for ( auto det_stream : nw_sender->get_streams() ) {
     231            0 :           if ( det_stream->is_disabled(*(m_cfg->get_session())) ) 
     232            0 :             continue;
     233              :           
     234            0 :           active_streams.push_back(det_stream);
     235              :         }
     236              :       }
     237              :     }
     238              : 
     239            0 :     auto net_device = dpdk_receiver->get_uses();
     240              :     
     241            0 :     if ((m_mac_to_id_map.count(net_device->get_mac_address()) == 0) || (m_pci_to_id_map.count(net_device->get_pcie_addr()) == 0)) {
     242            0 :         TLOG() << "No available interface with MAC=" << net_device->get_mac_address();
     243            0 :         throw datahandlinglibs::InitializationError(
     244            0 :           ERS_HERE, "DPDKReaderModule configuration failed due expected but unavailable interface!"
     245            0 :         );
     246              :     }
     247              :     
     248            0 :     uint iface_id = m_mac_to_id_map[net_device->get_mac_address()];
     249            0 :     auto ptr = m_ifaces[iface_id] = std::make_shared<IfaceWrapper>(iface_id, dpdk_receiver, nw_senders, active_streams, m_sources, m_run_marker);
     250            0 :     register_node( fmt::format("interface-{}", iface_id), ptr);
     251            0 :     ptr->allocate_mbufs();
     252            0 :     ptr->setup_interface();
     253            0 :     ptr->setup_flow_steering();
     254            0 :     ptr->setup_xstats();
     255              : 
     256            0 :   }
     257              : 
     258            0 :   if (!m_run_marker.load()) {
     259            0 :     set_running(true);
     260            0 :     TLOG() << "Starting iface wrappers.";
     261            0 :     for (auto& [iface_id, iface] : m_ifaces) {
     262            0 :       iface->start();
     263              :     }
     264              :   } else {
     265            0 :     TLOG_DEBUG(5) << "iface wrappers are already running!";
     266              :   }
     267              : 
     268            0 : }
     269              : 
     270              : void
     271            0 : DPDKReaderModule::do_start(const CommandData_t&)
     272              : {
     273              : 
     274              :   // Setup callbacks on all sourcemodels
     275            0 :   for (auto& [sourceid, source] : m_sources) {
     276            0 :     source->acquire_callback();
     277              :   }
     278              : 
     279            0 :   for (auto& [iface_id, iface] : m_ifaces) {
     280            0 :     iface->enable_flow();
     281              :   }
     282            0 : }
     283              : 
     284              : void
     285            0 : DPDKReaderModule::do_stop(const CommandData_t&)
     286              : {
     287            0 :   for (auto& [iface_id, iface] : m_ifaces) {
     288            0 :     iface->disable_flow();
     289              :   }
     290            0 : }
     291              : 
     292              : 
     293              : void
     294            0 : DPDKReaderModule::do_scrap(const CommandData_t&)
     295              : {
     296            0 :   TLOG() << get_name() << ": Entering do_scrap() method";
     297            0 :   if (m_run_marker.load()) {
     298            0 :     TLOG() << "Raising stop through variables!";
     299            0 :     set_running(false);
     300            0 :     TLOG() << "Stopping iface wrappers.";
     301            0 :     for (auto& [iface_id, iface] : m_ifaces) {
     302            0 :       iface->stop();
     303              :     }
     304            0 :     ealutils::wait_for_lcores();
     305            0 :     TLOG() << "Stoppped DPDK lcore processors and internal threads...";
     306              :   } else {
     307            0 :     TLOG_DEBUG(5) << "DPDK lcore processor is already stopped!";
     308              :   }
     309            0 :   TLOG() << get_name() << ": do_scrap called. Tearing down EAL.";
     310            0 :   ealutils::finish_eal();
     311            0 : }
     312              : 
     313              : 
     314              : void 
     315            0 : DPDKReaderModule::set_running(bool should_run)
     316              : {
     317            0 :   bool was_running = m_run_marker.exchange(should_run);
     318            0 :   TLOG_DEBUG(5) << "Active state was toggled from " << was_running << " to " << should_run;
     319            0 : }
     320              : 
     321              : } // namespace dpdklibs
     322              : } // namespace dunedaq
     323              : 
     324            0 : DEFINE_DUNE_DAQ_MODULE(dunedaq::dpdklibs::DPDKReaderModule)
        

Generated by: LCOV version 2.0-1