61IfaceWrapper::IfaceWrapper(
63 const appmodel::DPDKReceiver* receiver,
64 const std::vector<const appmodel::NWDetDataSender*>& nw_senders,
65 sid_to_source_map_t& sources,
73 std::set<int> src_in_d2d;
74 for(
auto nw_sender : nw_senders ) {
75 for (
auto det_stream : nw_sender->get_streams() ) {
76 src_in_d2d.insert(det_stream->get_source_id());
81 std::set<int> src_models;
82 for(
const auto& [
src_id, _] : m_sources ) {
87 if (!std::includes(src_models.begin(), src_models.end(), src_in_d2d.begin(), src_in_d2d.end())) {
93 std::vector<int> src_missing;
94 std::set_difference(src_models.begin(), src_models.end(),
95 src_in_d2d.begin(), src_in_d2d.end(),
96 std::back_inserter(src_missing));
99 for(
int src : src_missing ) {
104 throw MissingSourceIDOutputs(
ERS_HERE, m_iface_id, ss.str());
107 auto net_device = receiver->get_uses();
109 m_iface_id = iface_id;
110 m_mac_addr = net_device->get_mac_address();
111 m_ip_addr = net_device->get_ip_address();
113 TLOG() <<
"Building IfaceWrapper " << m_iface_id;
115 s <<
'IfaceWrapper (port ' << m_iface_id <<
") responding to : ";
116 for(
const std::string& ip_addr : m_ip_addr) {
122 for(
const std::string& ip_addr : m_ip_addr) {
123 IpAddr ip_addr_struct(ip_addr);
125 ip_addr_struct.addr_bytes[0],
126 ip_addr_struct.addr_bytes[1],
127 ip_addr_struct.addr_bytes[2],
128 ip_addr_struct.addr_bytes[3]
133 auto iface_cfg = receiver->get_configuration();
135 m_with_flow = iface_cfg->get_flow_control();
136 m_prom_mode = iface_cfg->get_promiscuous_mode();;
137 m_mtu = iface_cfg->get_mtu();
138 m_max_block_words = unsigned(m_mtu) /
sizeof(uint64_t);
139 m_rx_ring_size = iface_cfg->get_rx_ring_size();
140 m_tx_ring_size = iface_cfg->get_tx_ring_size();
141 m_num_mbufs = iface_cfg->get_num_bufs();
142 m_burst_size = iface_cfg->get_burst_size();
143 m_mbuf_cache_size = iface_cfg->get_mbuf_cache_size();
145 m_lcore_sleep_ns = iface_cfg->get_lcore_sleep_us() * 1000;
146 m_socket_id = rte_eth_dev_socket_id(m_iface_id);
148 m_iface_id_str = iface_cfg->UID();
152 for(
const auto* proc_res : iface_cfg->get_used_lcores()) {
153 m_rte_cores.insert(m_rte_cores.end(), proc_res->get_cpu_cores().begin(), proc_res->get_cpu_cores().end());
155 if(std::find(m_rte_cores.begin(), m_rte_cores.end(), rte_get_main_lcore())!=m_rte_cores.end()) {
156 TLOG() <<
"ERROR! Throw ERS error here that LCore=0 should not be used, as it's a control RTE core!";
157 throw std::runtime_error(std::string(
"ERROR! Throw ERS here that LCore=0 should not be used, as it's a control RTE core!"));
163 std::map<std::string, std::map<uint, uint>> ip_to_stream_src_groups;
165 for(
auto nw_sender : nw_senders ) {
166 auto sender_ni = nw_sender->get_uses();
168 std::string
tx_ip = sender_ni->get_ip_address().at(0);
171 for (
auto det_stream : nw_sender->get_streams() ) {
173 uint32_t tx_geo_stream_id = det_stream->get_geo_id()->get_stream_id();
175 ip_to_stream_src_groups[
tx_ip][tx_geo_stream_id] = det_stream->get_source_id();
181 uint32_t core_idx(0), rx_q(0);
183 m_rx_qs.insert(rx_q);
184 m_arp_rx_queue = rx_q;
188 for(
const auto& [
tx_ip, strm_src] : ip_to_stream_src_groups) {
190 m_rx_qs.insert(rx_q);
191 m_num_frames_rxq[rx_q] = { 0 };
192 m_num_bytes_rxq[rx_q] = { 0 };
194 m_rx_core_map[m_rte_cores[core_idx]][rx_q] =
tx_ip;
195 m_stream_id_to_source_id[rx_q] = strm_src;
199 if ( ++core_idx == m_rte_cores.size()) {
205 for (
auto const& [lcore, rx_qs] : m_rx_core_map) {
206 TLOG() <<
"Lcore=" << lcore <<
" handles: ";
207 for (
auto const& [rx_q, src_ip] : rx_qs) {
208 TLOG() <<
" rx_q=" << rx_q <<
" src_ip=" << src_ip;
213 TLOG() <<
"Append TX_Q=0 for ARP responses.";
217 for (
auto const& [sid, src_concept] : m_sources) {
218 if (!src_concept->m_daq_protocol_ensured) {
219 m_strict_parsing =
false;
227IfaceWrapper::~IfaceWrapper()
231 struct rte_flow_error
error;
232 rte_flow_flush(m_iface_id, &
error);
241IfaceWrapper::allocate_mbufs()
243 TLOG() <<
"Allocating pools and mbufs for UDP, GARP, and ARP.";
246 for (
size_t i=0; i<m_rx_qs.size(); ++i) {
247 std::stringstream bufss;
248 bufss <<
"MBP-" << m_iface_id <<
'-' << i;
249 TLOG() <<
"Acquire pool with name=" << bufss.str() <<
" for iface_id=" << m_iface_id <<
" rxq=" << i;
250 m_mbuf_pools[i] =
ealutils::get_mempool(bufss.str(), m_num_mbufs, m_mbuf_cache_size, 16384, m_socket_id);
251 m_bufs[i] = (rte_mbuf**) malloc(
sizeof(
struct rte_mbuf*) * m_burst_size);
257 std::stringstream garpss;
258 garpss <<
"GARPMBP-" << m_iface_id;
259 TLOG() <<
"Acquire GARP pool with name=" << garpss.str() <<
" for iface_id=" << m_iface_id;
261 m_garp_bufs[0] = (rte_mbuf**) malloc(
sizeof(
struct rte_mbuf*) * m_burst_size);
262 rte_pktmbuf_alloc_bulk(m_garp_mbuf_pool.get(), m_garp_bufs[0], m_burst_size);
265 std::stringstream arpss;
266 arpss <<
"ARPMBP-" << m_iface_id;
267 TLOG() <<
"Acquire ARP pool with name=" << arpss.str() <<
" for iface_id=" << m_iface_id;
269 m_arp_bufs[0] = (rte_mbuf**) malloc(
sizeof(
struct rte_mbuf*) * m_burst_size);
270 rte_pktmbuf_alloc_bulk(m_arp_mbuf_pool.get(), m_arp_bufs[0], m_burst_size);
277IfaceWrapper::setup_interface()
279 TLOG() <<
"Initialize interface " << m_iface_id;
280 bool with_reset =
true, with_mq_mode =
true;
281 bool check_link_status =
false;
283 int retval =
ealutils::iface_init(m_iface_id, m_rx_qs.size(), m_tx_qs.size(), m_rx_ring_size, m_tx_ring_size, m_mbuf_pools, with_reset, with_mq_mode, check_link_status);
285 throw FailedToSetupInterface(
ERS_HERE, m_iface_id, retval);
294IfaceWrapper::setup_flow_steering()
297 TLOG() <<
"Configuring Flow steering rules for iface=" << m_iface_id;
298 struct rte_flow_error
error;
299 struct rte_flow *flow;
300 TLOG() <<
"Attempt to flush previous flow rules...";
301 rte_flow_flush(m_iface_id, &
error);
302#warning RS: FIXME -> Check for flow flush return!
304 TLOG() <<
"Create control flow rules (ARP) assinged to rxq=" << m_arp_rx_queue;
307 TLOG() <<
"ARP flow can't be created for " << m_arp_rx_queue
308 <<
" Error type: " << (unsigned)
error.type
309 <<
" Message: '" <<
error.message <<
"'";
310 ers::fatal(dunedaq::datahandlinglibs::InitializationError(
311 ERS_HERE,
"Couldn't create ARP flow API rules!"));
312 rte_exit(EXIT_FAILURE,
"error in creating ARP flow");
315 TLOG() <<
"Create flow rules for UDP RX.";
316 for (
auto const& [lcoreid, rxqs] : m_rx_core_map) {
317 for (
auto const& [rxqid, srcip] : rxqs) {
319 TLOG() <<
"Creating flow rule for src_ip=" << srcip <<
" assigned to rxq=" << rxqid;
320 size_t ind = 0, current_ind = 0;
321 std::vector<uint8_t>
v;
322 for (
int i = 0; i < 4; ++i) {
323 v.push_back(std::stoi(srcip.substr(current_ind, srcip.size() - current_ind), &ind));
324 current_ind += ind + 1;
328 RTE_IPV4(
v[0],
v[1],
v[2],
v[3]), 0xffffffff, 0, 0, &
error);
331 TLOG() <<
"Flow can't be created for " << rxqid
332 <<
" Error type: " << (unsigned)
error.type
333 <<
" Message: '" <<
error.message <<
"'";
334 ers::fatal(dunedaq::datahandlinglibs::InitializationError(
335 ERS_HERE,
"Couldn't create Flow API rules!"));
336 rte_exit(EXIT_FAILURE,
"error in creating flow");
346IfaceWrapper::setup_xstats()
349 m_iface_xstats.setup(m_iface_id);
350 m_iface_xstats.reset_counters();
359 for (
auto const& [rx_q, _] : m_num_frames_rxq ) {
360 m_num_frames_rxq[rx_q] = { 0 };
361 m_num_bytes_rxq[rx_q] = { 0 };
362 m_num_full_bursts[rx_q] = { 0 };
363 m_max_burst_size[rx_q] = { 0 };
367 for (
auto const& [lcore, _] : m_rx_core_map) {
368 m_num_unhandled_non_ipv4[lcore] = { 0 };
369 m_num_unhandled_non_udp[lcore] = { 0 };
370 m_num_unhandled_non_jumbo_udp[lcore] = { 0 };
373 m_lcore_enable_flow.store(
false);
374 m_lcore_quit_signal.store(
false);
375 TLOG() <<
"Interface id=" << m_iface_id <<
" Launching GARP thread with garp_func...";
376 m_garp_thread = std::thread(&IfaceWrapper::garp_func,
this);
378 TLOG() <<
"Interface id=" << m_iface_id <<
" starting ARP LCore processor:";
379 m_arp_thread = std::thread(&IfaceWrapper::IfaceWrapper::arp_response_runner,
this,
nullptr);
382 TLOG() <<
"Interface id=" << m_iface_id <<
" starting LCore processors:";
383 for (
auto const& [lcoreid, _] : m_rx_core_map) {
384 int ret = rte_eal_remote_launch((
int (*)(
void*))(&IfaceWrapper::rx_runner),
this, lcoreid);
385 TLOG() <<
" -> LCore[" << lcoreid <<
"] launched with return code=" <<
ret <<
" " << (
ret < 0 ? rte_strerror(-
ret) :
"");
393 m_lcore_enable_flow.store(
false);
394 m_lcore_quit_signal.store(
true);
396 if (m_garp_thread.joinable()) {
397 m_garp_thread.join();
399 TLOG() <<
"GARP thread is not joinable!";
402 if (m_arp_thread.joinable()) {
405 TLOG() <<
"ARP thread is not joinable!";
420IfaceWrapper::generate_opmon_data() {
423 m_iface_xstats.poll();
426 s.set_ipackets( m_iface_xstats.m_eth_stats.ipackets );
427 s.set_opackets( m_iface_xstats.m_eth_stats.opackets );
428 s.set_ibytes( m_iface_xstats.m_eth_stats.ibytes );
429 s.set_obytes( m_iface_xstats.m_eth_stats.obytes );
430 s.set_imissed( m_iface_xstats.m_eth_stats.imissed );
431 s.set_ierrors( m_iface_xstats.m_eth_stats.ierrors );
432 s.set_oerrors( m_iface_xstats.m_eth_stats.oerrors );
433 s.set_rx_nombuf( m_iface_xstats.m_eth_stats.rx_nombuf );
434 publish( std::move(
s) );
436 if(m_iface_xstats.m_eth_stats.imissed > 0){
437 ers::warning(PacketErrors(
ERS_HERE, m_iface_id_str,
"missed", m_iface_xstats.m_eth_stats.imissed));
439 if(m_iface_xstats.m_eth_stats.ierrors > 0){
440 ers::warning(PacketErrors(
ERS_HERE, m_iface_id_str,
"dropped", m_iface_xstats.m_eth_stats.ierrors));
444 opmon::EthXStatsInfo xinfos;
445 opmon::EthXStatsErrors xerrs;
446 std::map<std::string, opmon::QueueEthXStats> xq;
448 for (
int i = 0; i < m_iface_xstats.m_len; ++i) {
450 std::string name(m_iface_xstats.m_xstats_names[i].name);
453 static std::regex queue_regex(R
"((rx|tx)_q(\d+)_([^_]+))");
456 if ( std::regex_match(name,
match, queue_regex) ) {
457 auto queue_name =
match[1].str() +
'-' +
match[2].str();
458 auto & entry = xq[queue_name];
467 google::protobuf::Message * metric_p =
nullptr;
468 static std::regex err_regex(R
"(.+error.*)");
469 if ( std::regex_match( name, err_regex ) ) metric_p = & xerrs;
470 else metric_p = & xinfos;
481 m_iface_xstats.reset_counters();
484 publish( std::move(xinfos) );
485 publish( std::move(xerrs) );
486 for (
auto [
id, stat] : xq ) {
487 publish( std::move(stat), {{
"queue",
id}} );
490 for(
const auto& [src_rx_q,_] : m_num_frames_rxq) {
492 i.set_packets_received( m_num_frames_rxq[src_rx_q].load() );
493 i.set_bytes_received( m_num_bytes_rxq[src_rx_q].load() );
494 i.set_full_rx_burst( m_num_full_bursts[src_rx_q].load() );
495 i.set_max_burst_size( m_max_burst_size[src_rx_q].exchange(0) );
497 publish( std::move(i), {{
"queue", std::to_string(src_rx_q)}} );
501 for (
auto const& [lcore, _] : m_rx_core_map) {
502 opmon::RTEWorkerInfo
info;
503 info.set_num_unhandled_non_ipv4( m_num_unhandled_non_ipv4[lcore].exchange(0) );
504 info.set_num_unhandled_non_udp( m_num_unhandled_non_udp[lcore].exchange(0) );
505 info.set_num_unhandled_non_jumbo_udp( m_num_unhandled_non_jumbo_udp[lcore].exchange(0) );
506 publish( std::move(
info), {{
"rte_worker_id", std::to_string(lcore)}} );
509 for (
auto & [
id, counter] : m_num_unexid_frames ) {
510 auto val = counter.exchange(0);
519IfaceWrapper::garp_func()
521 TLOG() <<
"Launching GARP sender...";
522 while(m_run_marker.load()) {
523 for(
const auto& ip_addr_bin : m_ip_addr_bin ) {
527 std::this_thread::sleep_for(std::chrono::seconds(1));
529 TLOG() <<
"GARP function joins.";
534IfaceWrapper::parse_udp_payload(
int src_rx_q,
char*
payload, std::size_t
size)
549 unsigned block_words = unsigned(daqhdrptr->block_length) - 1;
552 if ( block_words == 0 || block_words > m_max_block_words ) {
564 if ( daqframe_endptr > plendptr ) {
573 uint
strm_id = daqhdrptr->stream_id;
576 auto& strm_to_src = m_stream_id_to_source_id[src_rx_q];
578 if (
auto strm_it = strm_to_src.find(
strm_id); strm_it != strm_to_src.end() ) {
580 m_sources[strm_it->second]->handle_daq_frame((
char*)daqhdrptr, daq_frame_size);
587 if (m_num_unexid_frames.count(
strm_id) == 0) {
588 m_num_unexid_frames[
strm_id] = 0;
590 m_num_unexid_frames[
strm_id]++;
594 plptr += daq_frame_size;
602IfaceWrapper::passthrough_udp_payload(
int src_rx_q,
char*
payload, std::size_t
size)
609 uint
strm_id = daqhdrptr->stream_id;
612 auto& strm_to_src = m_stream_id_to_source_id[src_rx_q];
615 if (
auto strm_it = strm_to_src.find(
strm_id); strm_it != strm_to_src.end() ) {
616 m_sources[strm_it->second]->handle_daq_frame(
payload,
size);
621 if (m_num_unexid_frames.count(
strm_id) == 0) {
622 m_num_unexid_frames[
strm_id] = 0;
624 m_num_unexid_frames[
strm_id]++;
@ TLVL_ENTER_EXIT_METHODS
Base class for any user define issue.
std::atomic< bool > run_marker
Global atomic for process lifetime.
#define TLOG_DEBUG(lvl,...)
dunedaq::conffwk::relationship_t match(T const &, T const &)
void pktgen_send_garp(struct rte_mbuf *m, uint32_t port_id, rte_be32_t binary_ip_address)
std::unique_ptr< rte_mempool > get_mempool(const std::string &pool_name, int num_mbufs=NUM_MBUFS, int mbuf_cache_size=MBUF_CACHE_SIZE, int data_room_size=9800, int socket_id=0)
int iface_promiscuous_mode(std::uint16_t iface, bool mode=false)
int iface_init(uint16_t iface, uint16_t rx_rings, uint16_t tx_rings, uint16_t rx_ring_size, uint16_t tx_ring_size, std::map< int, std::unique_ptr< rte_mempool > > &mbuf_pool, bool with_reset=false, bool with_mq_rss=false, bool check_link_status=false)
rte_be32_t ip_address_dotdecimal_to_binary(std::uint8_t byte1, std::uint8_t byte2, std::uint8_t byte3, std::uint8_t byte4)
struct rte_flow * generate_ipv4_flow(uint16_t port_id, uint16_t rx_q, uint32_t src_ip, uint32_t src_mask, uint32_t dest_ip, uint32_t dest_mask, struct rte_flow_error *error)
struct rte_flow * generate_arp_flow(uint16_t port_id, uint16_t rx_q, struct rte_flow_error *error)
void set_value(google::protobuf::Message &m, const std::string &name, T value)
FELIX Initialization std::string initerror FELIX queue timed std::string queuename Unexpected chunk size
void warning(const Issue &issue)
void fatal(const Issue &issue)
#define TLVL_ENTER_EXIT_METHODS
Factory couldn t std::string alg_name Invalid configuration error