Line data Source code
1 : /**
2 : * @file DPDKReaderModule.cpp DPDKReaderModule DAQModule implementation
3 : *
4 : * This is part of the DUNE DAQ Application Framework, copyright 2020.
5 : * Licensing/copyright details are in the COPYING file that you should have
6 : * received with this code.
7 : */
8 : //#include "dpdklibs/nicreader/Nljs.hpp"
9 :
10 : #include "appfwk/ConfigurationManager.hpp"
11 : #include "appfwk/ConfigurationManager.hpp"
12 :
13 : #include "appmodel/NetworkDetectorToDaqConnection.hpp"
14 :
15 : #include "appmodel/DataReaderModule.hpp"
16 : #include "appmodel/DPDKReaderConf.hpp"
17 : #include "appmodel/DPDKPortConfiguration.hpp"
18 : #include "confmodel/ProcessingResource.hpp"
19 : #include "confmodel/NetworkDevice.hpp"
20 : #include "confmodel/QueueWithSourceId.hpp"
21 : #include "confmodel/DetectorStream.hpp"
22 :
23 : #include "logging/Logging.hpp"
24 :
25 : #include "datahandlinglibs/DataHandlingIssues.hpp"
26 : #include "datahandlinglibs/utils/BufferCopy.hpp"
27 :
28 : #include "dpdklibs/EALSetup.hpp"
29 : #include "dpdklibs/RTEIfaceSetup.hpp"
30 : #include "dpdklibs/udp/Utils.hpp"
31 : #include "dpdklibs/udp/PacketCtor.hpp"
32 : #include "dpdklibs/FlowControl.hpp"
33 : #include "dpdklibs/Issues.hpp"
34 : #include "CreateSource.hpp"
35 : #include "DPDKReaderModule.hpp"
36 :
37 : #include <cinttypes>
38 : #include <chrono>
39 : #include <sstream>
40 : #include <memory>
41 : #include <string>
42 : #include <thread>
43 : #include <utility>
44 : #include <vector>
45 : #include <ios>
46 :
47 :
48 : /**
49 : * @brief Name used by TRACE TLOG calls from this source file
50 : */
51 : #define TRACE_NAME "DPDKReaderModule" // NOLINT
52 :
53 : /**
54 : * @brief TRACE debug levels used in this source file
55 : */
56 : enum
57 : {
58 : TLVL_ENTER_EXIT_METHODS = 5,
59 : TLVL_WORK_STEPS = 10,
60 : TLVL_BOOKKEEPING = 15
61 : };
62 :
63 : namespace dunedaq {
64 : namespace dpdklibs {
65 :
66 0 : DPDKReaderModule::DPDKReaderModule(const std::string& name)
67 : : DAQModule(name),
68 0 : m_run_marker{ false }
69 : {
70 0 : register_command("conf", &DPDKReaderModule::do_configure);
71 0 : register_command("start", &DPDKReaderModule::do_start);
72 0 : register_command("stop_trigger_sources", &DPDKReaderModule::do_stop);
73 0 : register_command("scrap", &DPDKReaderModule::do_scrap);
74 0 : }
75 :
76 0 : DPDKReaderModule::~DPDKReaderModule()
77 : {
78 0 : }
79 :
80 : inline void
81 0 : tokenize(std::string const& str, const char delim, std::vector<std::string>& out)
82 : {
83 0 : std::size_t start;
84 0 : std::size_t end = 0;
85 0 : while ((start = str.find_first_not_of(delim, end)) != std::string::npos) {
86 0 : end = str.find(delim, start);
87 0 : out.push_back(str.substr(start, end - start));
88 : }
89 0 : }
90 :
91 : void
92 0 : DPDKReaderModule::init(const std::shared_ptr<appfwk::ConfigurationManager> mcfg )
93 : {
94 0 : auto mdal = mcfg->get_dal<appmodel::DataReaderModule>(get_name());
95 0 : m_cfg = mcfg;
96 0 : if (mdal->get_outputs().empty()) {
97 0 : auto err = datahandlinglibs::InitializationError(ERS_HERE, "No outputs defined for NIC reader in configuration.");
98 0 : ers::fatal(err);
99 0 : throw err;
100 0 : }
101 :
102 : // Loop over output queues, extract source ids and create source model objects
103 0 : for (auto con : mdal->get_outputs()) {
104 0 : auto queue = con->cast<confmodel::QueueWithSourceId>();
105 0 : if (queue == nullptr) {
106 0 : auto err = datahandlinglibs::InitializationError(ERS_HERE, "Outputs are not of type QueueWithGeoId.");
107 0 : ers::fatal(err);
108 0 : throw err;
109 0 : }
110 :
111 : // Check for CB prefix indicating Callback use
112 0 : const char delim = '_';
113 0 : std::string target = queue->UID();
114 0 : std::vector<std::string> words;
115 0 : tokenize(target, delim, words);
116 0 : int sourceid = -1;
117 :
118 0 : bool callback_mode = false;
119 0 : if (words.front() == "cb") {
120 : callback_mode = true;
121 : }
122 :
123 : // TODO: add nullpointer check against misconfiguration
124 0 : auto ptr = m_sources[queue->get_source_id()] = createSourceModel(queue->UID(), callback_mode);
125 0 : register_node(queue->UID(), ptr);
126 : // m_sources[queue->get_source_id()]->init();
127 0 : }
128 0 : }
129 :
130 : void
131 0 : DPDKReaderModule::do_configure(const CommandData_t& /*args*/)
132 : {
133 0 : TLOG() << get_name() << ": Entering do_conf() method";
134 : //auto session = appfwk::ModuleManager::get()->get_session();
135 0 : auto mdal = m_cfg->get_dal<appmodel::DataReaderModule>(get_name());
136 0 : auto module_conf = mdal->get_configuration()->cast<appmodel::DPDKReaderConf>();
137 0 : auto res_set = mdal->get_connections();
138 : // EAL setup
139 0 : TLOG() << "Setting up EAL with params from config.";
140 0 : std::vector<std::string> eal_params ;
141 0 : eal_params.push_back("eal_cmdline");
142 0 : eal_params.push_back("--proc-type=primary");
143 :
144 : // Construct the pcie devices allowed mask
145 0 : std::string first_pcie_addr;
146 0 : bool is_first_pcie_addr = true;
147 0 : std::deque<uint16_t> rte_cores;
148 :
149 0 : std::vector<const appmodel::NetworkDetectorToDaqConnection*> d2d_conns;
150 0 : for (auto res : res_set) {
151 0 : auto connection = res->cast<appmodel::NetworkDetectorToDaqConnection>();
152 0 : if (connection == nullptr) {
153 0 : datahandlinglibs::GenericConfigurationError err(
154 0 : ERS_HERE, "DetectorToDaqConnection configuration failed due expected but unavailable connection!"
155 0 : );
156 0 : ers::fatal(err);
157 0 : throw err;
158 0 : }
159 0 : if (connection->is_disabled(*(m_cfg->get_session()))) {
160 0 : continue;
161 : }
162 :
163 0 : d2d_conns.push_back(connection);
164 :
165 0 : auto receiver = connection->receiver()->cast<appmodel::DPDKReceiver>();
166 0 : if (!receiver) {
167 0 : throw datahandlinglibs::InitializationError(
168 0 : ERS_HERE, fmt::format("Found {} of type {} in connection {} while expecting type DPDKReceiver", receiver->class_name(), receiver->UID(), connection->UID())
169 0 : );
170 : }
171 :
172 0 : auto net_device = receiver->get_uses()->cast<confmodel::NetworkDevice>();
173 :
174 0 : if (is_first_pcie_addr) {
175 0 : first_pcie_addr = net_device->get_pcie_addr();
176 : is_first_pcie_addr = false;
177 : }
178 0 : eal_params.push_back("-a");
179 0 : eal_params.push_back(net_device->get_pcie_addr());
180 :
181 0 : for ( const auto* proc_res : receiver->get_configuration()->get_used_lcores() ) {
182 0 : rte_cores.insert(rte_cores.end(), proc_res->get_cpu_cores().begin(), proc_res->get_cpu_cores().end());
183 : }
184 : }
185 :
186 0 : uint16_t main_core = rte_get_main_lcore();
187 0 : if (std::find(rte_cores.begin(), rte_cores.end(), main_core) != rte_cores.end()) {
188 0 : throw MainCoreConflict(ERS_HERE, main_core);
189 : }
190 0 : rte_cores.push_front(main_core);
191 :
192 0 : eal_params.push_back("-l");
193 0 : eal_params.push_back(fmt::format("{}", fmt::join(rte_cores,",")));
194 :
195 :
196 : // Use the first pcie device id as file prefix
197 : // FIXME: Review this strategy - should work in most of cases, but it could be
198 : // confusing in configs with multiple connections
199 0 : eal_params.push_back(fmt::format("--file-prefix={}", first_pcie_addr));
200 :
201 0 : eal_params.push_back(module_conf->get_eal_args());
202 :
203 0 : ealutils::init_eal(eal_params);
204 :
205 : // Get available connections from EAL
206 0 : auto available_ifaces = ifaceutils::get_num_available_ifaces();
207 0 : TLOG() << "Number of available connections: " << available_ifaces;
208 0 : for (unsigned int ifc_id=0; ifc_id<available_ifaces; ++ifc_id) {
209 0 : std::string mac_addr_str = ifaceutils::get_iface_mac_str(ifc_id);
210 0 : std::string pci_addr_str = ifaceutils::get_iface_pci_str(ifc_id);
211 0 : m_mac_to_id_map[mac_addr_str] = ifc_id;
212 : // TODO: remove
213 0 : m_pci_to_id_map[pci_addr_str] = ifc_id;
214 0 : TLOG() << "Available iface with MAC=" << mac_addr_str << " PCIe=" << pci_addr_str << " logical ID=" << ifc_id;
215 0 : }
216 :
217 0 : for (auto d2d_conn : d2d_conns) {
218 0 : auto dpdk_receiver = d2d_conn->get_net_receiver()->cast<appmodel::DPDKReceiver>();
219 :
220 : // Prepare the list of active senders (network transmitters) and active streams
221 0 : std::vector<const appmodel::NWDetDataSender*> nw_senders;
222 0 : std::vector<const confmodel::DetectorStream*> active_streams;
223 :
224 0 : for ( auto nw_sender : d2d_conn->get_net_senders() ) {
225 0 : TLOG() << "Sender " << nw_sender->UID() << "is " << nw_sender->is_disabled(*(m_cfg->get_session()));
226 :
227 0 : if ( ! nw_sender->is_disabled(*(m_cfg->get_session())) ) {
228 0 : nw_senders.push_back(nw_sender);
229 :
230 0 : for ( auto det_stream : nw_sender->get_streams() ) {
231 0 : if ( det_stream->is_disabled(*(m_cfg->get_session())) )
232 0 : continue;
233 :
234 0 : active_streams.push_back(det_stream);
235 : }
236 : }
237 : }
238 :
239 0 : auto net_device = dpdk_receiver->get_uses();
240 :
241 0 : if ((m_mac_to_id_map.count(net_device->get_mac_address()) == 0) || (m_pci_to_id_map.count(net_device->get_pcie_addr()) == 0)) {
242 0 : TLOG() << "No available interface with MAC=" << net_device->get_mac_address();
243 0 : throw datahandlinglibs::InitializationError(
244 0 : ERS_HERE, "DPDKReaderModule configuration failed due expected but unavailable interface!"
245 0 : );
246 : }
247 :
248 0 : uint iface_id = m_mac_to_id_map[net_device->get_mac_address()];
249 0 : auto ptr = m_ifaces[iface_id] = std::make_shared<IfaceWrapper>(iface_id, dpdk_receiver, nw_senders, active_streams, m_sources, m_run_marker);
250 0 : register_node( fmt::format("interface-{}", iface_id), ptr);
251 0 : ptr->allocate_mbufs();
252 0 : ptr->setup_interface();
253 0 : ptr->setup_flow_steering();
254 0 : ptr->setup_xstats();
255 :
256 0 : }
257 :
258 0 : if (!m_run_marker.load()) {
259 0 : set_running(true);
260 0 : TLOG() << "Starting iface wrappers.";
261 0 : for (auto& [iface_id, iface] : m_ifaces) {
262 0 : iface->start();
263 : }
264 : } else {
265 0 : TLOG_DEBUG(5) << "iface wrappers are already running!";
266 : }
267 :
268 0 : }
269 :
270 : void
271 0 : DPDKReaderModule::do_start(const CommandData_t&)
272 : {
273 :
274 : // Setup callbacks on all sourcemodels
275 0 : for (auto& [sourceid, source] : m_sources) {
276 0 : source->acquire_callback();
277 : }
278 :
279 0 : for (auto& [iface_id, iface] : m_ifaces) {
280 0 : iface->enable_flow();
281 : }
282 0 : }
283 :
284 : void
285 0 : DPDKReaderModule::do_stop(const CommandData_t&)
286 : {
287 0 : for (auto& [iface_id, iface] : m_ifaces) {
288 0 : iface->disable_flow();
289 : }
290 0 : }
291 :
292 :
293 : void
294 0 : DPDKReaderModule::do_scrap(const CommandData_t&)
295 : {
296 0 : TLOG() << get_name() << ": Entering do_scrap() method";
297 0 : if (m_run_marker.load()) {
298 0 : TLOG() << "Raising stop through variables!";
299 0 : set_running(false);
300 0 : TLOG() << "Stopping iface wrappers.";
301 0 : for (auto& [iface_id, iface] : m_ifaces) {
302 0 : iface->stop();
303 : }
304 0 : ealutils::wait_for_lcores();
305 0 : TLOG() << "Stoppped DPDK lcore processors and internal threads...";
306 : } else {
307 0 : TLOG_DEBUG(5) << "DPDK lcore processor is already stopped!";
308 : }
309 0 : TLOG() << get_name() << ": do_scrap called. Tearing down EAL.";
310 0 : ealutils::finish_eal();
311 0 : }
312 :
313 :
314 : void
315 0 : DPDKReaderModule::set_running(bool should_run)
316 : {
317 0 : bool was_running = m_run_marker.exchange(should_run);
318 0 : TLOG_DEBUG(5) << "Active state was toggled from " << was_running << " to " << should_run;
319 0 : }
320 :
321 : } // namespace dpdklibs
322 : } // namespace dunedaq
323 :
324 0 : DEFINE_DUNE_DAQ_MODULE(dunedaq::dpdklibs::DPDKReaderModule)
|