Line data Source code
1 : /**
2 : * @file DPDKReaderModule.cpp DPDKReaderModule DAQModule implementation
3 : *
4 : * This is part of the DUNE DAQ Application Framework, copyright 2020.
5 : * Licensing/copyright details are in the COPYING file that you should have
6 : * received with this code.
7 : */
8 : //#include "dpdklibs/nicreader/Nljs.hpp"
9 :
10 : #include "appfwk/ConfigurationManager.hpp"
11 : #include "appfwk/ConfigurationManager.hpp"
12 :
13 : #include "appmodel/NetworkDetectorToDaqConnection.hpp"
14 :
15 : #include "appmodel/DataReaderModule.hpp"
16 : #include "appmodel/DPDKReaderConf.hpp"
17 : #include "appmodel/DPDKPortConfiguration.hpp"
18 : #include "confmodel/ProcessingResource.hpp"
19 : #include "confmodel/NetworkDevice.hpp"
20 : #include "confmodel/QueueWithSourceId.hpp"
21 : #include "confmodel/DetectorStream.hpp"
22 :
23 : #include "logging/Logging.hpp"
24 :
25 : #include "datahandlinglibs/DataHandlingIssues.hpp"
26 : #include "datahandlinglibs/utils/BufferCopy.hpp"
27 :
28 : #include "dpdklibs/EALSetup.hpp"
29 : #include "dpdklibs/RTEIfaceSetup.hpp"
30 : #include "dpdklibs/udp/Utils.hpp"
31 : #include "dpdklibs/udp/PacketCtor.hpp"
32 : #include "dpdklibs/FlowControl.hpp"
33 : #include "dpdklibs/Issues.hpp"
34 : #include "CreateSource.hpp"
35 : #include "DPDKReaderModule.hpp"
36 :
37 : #include <cinttypes>
38 : #include <chrono>
39 : #include <sstream>
40 : #include <memory>
41 : #include <string>
42 : #include <thread>
43 : #include <utility>
44 : #include <vector>
45 : #include <ios>
46 :
47 :
48 : /**
49 : * @brief Name used by TRACE TLOG calls from this source file
50 : */
51 : #define TRACE_NAME "DPDKReaderModule" // NOLINT
52 :
53 : /**
54 : * @brief TRACE debug levels used in this source file
55 : */
56 : enum
57 : {
58 : TLVL_ENTER_EXIT_METHODS = 5,
59 : TLVL_WORK_STEPS = 10,
60 : TLVL_BOOKKEEPING = 15
61 : };
62 :
63 : namespace dunedaq {
64 : namespace dpdklibs {
65 :
66 0 : DPDKReaderModule::DPDKReaderModule(const std::string& name)
67 : : DAQModule(name),
68 0 : m_run_marker{ false }
69 : {
70 0 : register_command("conf", &DPDKReaderModule::do_configure);
71 0 : register_command("start", &DPDKReaderModule::do_start);
72 0 : register_command("stop_trigger_sources", &DPDKReaderModule::do_stop);
73 0 : register_command("scrap", &DPDKReaderModule::do_scrap);
74 0 : }
75 :
76 0 : DPDKReaderModule::~DPDKReaderModule()
77 : {
78 0 : }
79 :
80 : inline void
81 : tokenize(std::string const& str, const char delim, std::vector<std::string>& out)
82 : {
83 : std::size_t start;
84 : std::size_t end = 0;
85 : while ((start = str.find_first_not_of(delim, end)) != std::string::npos) {
86 : end = str.find(delim, start);
87 : out.push_back(str.substr(start, end - start));
88 : }
89 : }
90 :
91 : void
92 0 : DPDKReaderModule::init(const std::shared_ptr<appfwk::ConfigurationManager> mcfg )
93 : {
94 0 : auto mdal = mcfg->get_dal<appmodel::DataReaderModule>(get_name());
95 0 : m_cfg = mcfg;
96 0 : if (mdal->get_raw_data_callbacks().empty()) {
97 0 : auto err = datahandlinglibs::InitializationError(ERS_HERE, "No outputs defined for NIC reader in configuration.");
98 0 : ers::fatal(err);
99 0 : throw err;
100 0 : }
101 :
102 : // Loop over output queues, extract source ids and create source model objects
103 0 : for (auto con : mdal->get_raw_data_callbacks()) {
104 : // TODO: add nullpointer check against misconfiguration
105 0 : auto ptr = m_sources[con->get_source_id()] = createSourceModel(con);
106 0 : register_node(con->UID(), ptr);
107 0 : }
108 0 : }
109 :
110 : void
111 0 : DPDKReaderModule::do_configure(const CommandData_t& /*args*/)
112 : {
113 0 : TLOG() << get_name() << ": Entering do_conf() method";
114 : //auto session = appfwk::ModuleManager::get()->get_session();
115 0 : auto mdal = m_cfg->get_dal<appmodel::DataReaderModule>(get_name());
116 0 : auto module_conf = mdal->get_configuration()->cast<appmodel::DPDKReaderConf>();
117 0 : auto res_set = mdal->get_connections();
118 : // EAL setup
119 0 : TLOG() << "Setting up EAL with params from config.";
120 0 : std::vector<std::string> eal_params ;
121 0 : eal_params.push_back("eal_cmdline");
122 0 : eal_params.push_back("--proc-type=primary");
123 :
124 : // Construct the pcie devices allowed mask
125 0 : std::string first_pcie_addr;
126 0 : bool is_first_pcie_addr = true;
127 0 : std::deque<uint16_t> rte_cores;
128 :
129 0 : std::vector<const appmodel::NetworkDetectorToDaqConnection*> d2d_conns;
130 0 : for (auto res : res_set) {
131 0 : auto connection = res->cast<appmodel::NetworkDetectorToDaqConnection>();
132 0 : if (connection == nullptr) {
133 0 : datahandlinglibs::GenericConfigurationError err(
134 0 : ERS_HERE, "DetectorToDaqConnection configuration failed due expected but unavailable connection!"
135 0 : );
136 0 : ers::fatal(err);
137 0 : throw err;
138 0 : }
139 0 : if (connection->is_disabled(*(m_cfg->get_session()))) {
140 0 : continue;
141 : }
142 :
143 0 : d2d_conns.push_back(connection);
144 :
145 0 : auto receiver = connection->receiver()->cast<appmodel::DPDKReceiver>();
146 0 : if (!receiver) {
147 0 : throw datahandlinglibs::InitializationError(
148 0 : ERS_HERE, fmt::format("Found {} of type {} in connection {} while expecting type DPDKReceiver", receiver->class_name(), receiver->UID(), connection->UID())
149 0 : );
150 : }
151 :
152 0 : auto net_device = receiver->get_uses()->cast<confmodel::NetworkDevice>();
153 :
154 0 : if (is_first_pcie_addr) {
155 0 : first_pcie_addr = net_device->get_pcie_addr();
156 : is_first_pcie_addr = false;
157 : }
158 0 : eal_params.push_back("-a");
159 0 : eal_params.push_back(net_device->get_pcie_addr());
160 :
161 0 : for ( const auto* proc_res : receiver->get_configuration()->get_used_lcores() ) {
162 0 : rte_cores.insert(rte_cores.end(), proc_res->get_cpu_cores().begin(), proc_res->get_cpu_cores().end());
163 : }
164 : }
165 :
166 0 : uint16_t main_core = rte_get_main_lcore();
167 0 : if (std::find(rte_cores.begin(), rte_cores.end(), main_core) != rte_cores.end()) {
168 0 : throw MainCoreConflict(ERS_HERE, main_core);
169 : }
170 0 : rte_cores.push_front(main_core);
171 :
172 0 : eal_params.push_back("-l");
173 0 : eal_params.push_back(fmt::format("{}", fmt::join(rte_cores,",")));
174 :
175 :
176 : // Use the first pcie device id as file prefix
177 : // FIXME: Review this strategy - should work in most of cases, but it could be
178 : // confusing in configs with multiple connections
179 0 : eal_params.push_back(fmt::format("--file-prefix={}", first_pcie_addr));
180 :
181 0 : eal_params.push_back(module_conf->get_eal_args());
182 :
183 0 : ealutils::init_eal(eal_params);
184 :
185 : // Get available connections from EAL
186 0 : auto available_ifaces = ifaceutils::get_num_available_ifaces();
187 0 : TLOG() << "Number of available connections: " << available_ifaces;
188 0 : for (unsigned int ifc_id=0; ifc_id<available_ifaces; ++ifc_id) {
189 0 : std::string mac_addr_str = ifaceutils::get_iface_mac_str(ifc_id);
190 0 : std::string pci_addr_str = ifaceutils::get_iface_pci_str(ifc_id);
191 0 : m_mac_to_id_map[mac_addr_str] = ifc_id;
192 : // TODO: remove
193 0 : m_pci_to_id_map[pci_addr_str] = ifc_id;
194 0 : TLOG() << "Available iface with MAC=" << mac_addr_str << " PCIe=" << pci_addr_str << " logical ID=" << ifc_id;
195 0 : }
196 :
197 0 : for (auto d2d_conn : d2d_conns) {
198 0 : auto dpdk_receiver = d2d_conn->get_net_receiver()->cast<appmodel::DPDKReceiver>();
199 :
200 : // Prepare the list of active senders (network transmitters) and active streams
201 0 : std::vector<const appmodel::NWDetDataSender*> nw_senders;
202 0 : std::vector<const confmodel::DetectorStream*> active_streams;
203 :
204 0 : for ( auto nw_sender : d2d_conn->get_net_senders() ) {
205 0 : TLOG() << "Sender " << nw_sender->UID() << "is " << nw_sender->is_disabled(*(m_cfg->get_session()));
206 :
207 0 : if ( ! nw_sender->is_disabled(*(m_cfg->get_session())) ) {
208 0 : nw_senders.push_back(nw_sender);
209 :
210 0 : for ( auto det_stream : nw_sender->get_streams() ) {
211 0 : if ( det_stream->is_disabled(*(m_cfg->get_session())) )
212 0 : continue;
213 :
214 0 : active_streams.push_back(det_stream);
215 : }
216 : }
217 : }
218 :
219 0 : auto net_device = dpdk_receiver->get_uses();
220 :
221 0 : if ((m_mac_to_id_map.count(net_device->get_mac_address()) == 0) || (m_pci_to_id_map.count(net_device->get_pcie_addr()) == 0)) {
222 0 : TLOG() << "No available interface with MAC=" << net_device->get_mac_address();
223 0 : throw datahandlinglibs::InitializationError(
224 0 : ERS_HERE, "DPDKReaderModule configuration failed due expected but unavailable interface!"
225 0 : );
226 : }
227 :
228 0 : uint iface_id = m_mac_to_id_map[net_device->get_mac_address()];
229 0 : auto ptr = m_ifaces[iface_id] = std::make_shared<IfaceWrapper>(iface_id, dpdk_receiver, nw_senders, active_streams, m_sources, m_run_marker);
230 0 : register_node( fmt::format("interface-{}", iface_id), ptr);
231 0 : ptr->allocate_mbufs();
232 0 : ptr->setup_interface();
233 0 : ptr->setup_flow_steering();
234 0 : ptr->setup_xstats();
235 :
236 0 : }
237 :
238 0 : if (!m_run_marker.load()) {
239 0 : set_running(true);
240 0 : TLOG() << "Starting iface wrappers.";
241 0 : for (auto& [iface_id, iface] : m_ifaces) {
242 0 : iface->start();
243 : }
244 : } else {
245 0 : TLOG_DEBUG(5) << "iface wrappers are already running!";
246 : }
247 :
248 0 : }
249 :
250 : void
251 0 : DPDKReaderModule::do_start(const CommandData_t&)
252 : {
253 :
254 : // Setup callbacks on all sourcemodels
255 0 : for (auto& [sourceid, source] : m_sources) {
256 0 : source->acquire_callback();
257 : }
258 :
259 0 : for (auto& [iface_id, iface] : m_ifaces) {
260 0 : iface->enable_flow();
261 : }
262 0 : }
263 :
264 : void
265 0 : DPDKReaderModule::do_stop(const CommandData_t&)
266 : {
267 0 : for (auto& [iface_id, iface] : m_ifaces) {
268 0 : iface->disable_flow();
269 : }
270 0 : }
271 :
272 :
273 : void
274 0 : DPDKReaderModule::do_scrap(const CommandData_t&)
275 : {
276 0 : TLOG() << get_name() << ": Entering do_scrap() method";
277 0 : if (m_run_marker.load()) {
278 0 : TLOG() << "Raising stop through variables!";
279 0 : set_running(false);
280 0 : TLOG() << "Stopping iface wrappers.";
281 0 : for (auto& [iface_id, iface] : m_ifaces) {
282 0 : iface->stop();
283 : }
284 0 : ealutils::wait_for_lcores();
285 0 : TLOG() << "Stoppped DPDK lcore processors and internal threads...";
286 : } else {
287 0 : TLOG_DEBUG(5) << "DPDK lcore processor is already stopped!";
288 : }
289 0 : TLOG() << get_name() << ": do_scrap called. Tearing down EAL.";
290 0 : ealutils::finish_eal();
291 0 : }
292 :
293 :
294 : void
295 0 : DPDKReaderModule::set_running(bool should_run)
296 : {
297 0 : bool was_running = m_run_marker.exchange(should_run);
298 0 : TLOG_DEBUG(5) << "Active state was toggled from " << was_running << " to " << should_run;
299 0 : }
300 :
301 : } // namespace dpdklibs
302 : } // namespace dunedaq
303 :
304 0 : DEFINE_DUNE_DAQ_MODULE(dunedaq::dpdklibs::DPDKReaderModule)
|