LCOV - code coverage report
Current view: top level - iomanager/src/network - NetworkManager.cpp (source / functions) Coverage Total Hit
Test: code.result Lines: 57.8 % 282 163
Test Date: 2025-12-21 13:07:08 Functions: 71.4 % 42 30

            Line data    Source code
       1              : /**
       2              :  * @file NetworkManager.hpp NetworkManager Class implementations
       3              :  *
       4              :  * This is part of the DUNE DAQ Application Framework, copyright 2020.
       5              :  * Licensing/copyright details are in the COPYING file that you should have
       6              :  * received with this code.
       7              :  */
       8              : 
       9              : #include "iomanager/network/NetworkManager.hpp"
      10              : #include "iomanager/SchemaUtils.hpp"
      11              : 
      12              : #include "ipm/PluginInfo.hpp"
      13              : #include "logging/Logging.hpp"
      14              : #include "utilities/ZmqUri.hpp"
      15              : 
      16              : #include "confmodel/PhysicalHost.hpp"
      17              : #include "confmodel/Service.hpp"
      18              : 
      19              : #include <map>
      20              : #include <memory>
      21              : #include <set>
      22              : #include <string>
      23              : #include <vector>
      24              : 
      25              : #include <fmt/format.h>
      26              : 
      27              : namespace dunedaq::iomanager {
      28              : 
      29              : std::unique_ptr<NetworkManager> NetworkManager::s_instance = nullptr;
      30              : 
      31              : NetworkManager&
      32         2358 : NetworkManager::get()
      33              : {
      34         2358 :   if (!s_instance) {
      35           10 :     s_instance.reset(new NetworkManager());
      36              :   }
      37         2358 :   return *s_instance;
      38              : }
      39              : 
      40              : void
      41           57 : NetworkManager::configure(const std::string& session_name,
      42              :                           const std::vector<const confmodel::NetworkConnection*>& connections,
      43              :                           const confmodel::ConnectivityService* conn_svc,
      44              :                           dunedaq::opmonlib::OpMonManager& opmgr)
      45              : {
      46           57 :   if (!m_preconfigured_connections.empty()) {
      47            1 :     throw AlreadyConfigured(ERS_HERE);
      48              :   }
      49              : 
      50          200 :   for (auto& connection : connections) {
      51          144 :     auto name = connection->UID();
      52          144 :     TLOG_DEBUG(15) << "Adding connection " << name << " to connection map";
      53          144 :     ConnectionId id(connection);
      54          144 :     if (m_preconfigured_connections.count(id)) {
      55            0 :       TLOG_DEBUG(15) << "Name collision for connection " << name << ", DT " << connection->get_data_type()
      56            0 :                      << " connection_map.count: " << m_preconfigured_connections.count(id);
      57            0 :       reset();
      58            0 :       throw NameCollision(ERS_HERE, connection->UID());
      59              :     }
      60          144 :     m_preconfigured_connections[id] = connection;
      61          144 :   }
      62              : 
      63           56 :   if (conn_svc != nullptr) {
      64              : 
      65            0 :     auto service = conn_svc->get_service();
      66              : 
      67            0 :     auto connectionServer = conn_svc->get_host();
      68            0 :     auto connectionPort = service->get_port();
      69            0 :     auto config_client_interval = std::chrono::milliseconds(conn_svc->get_interval_ms());
      70              : 
      71            0 :     TLOG_DEBUG(17) << "ConnectionServer host and port are " << connectionServer << ":" << connectionPort;
      72            0 :     if (m_config_client == nullptr) {
      73            0 :       m_config_client = std::make_unique<ConfigClient>(
      74            0 :         connectionServer, std::to_string(connectionPort), session_name, config_client_interval);
      75              :     }
      76            0 :     m_config_client_interval = config_client_interval;
      77            0 :   }
      78              : 
      79           56 :   opmgr.register_node("senders", m_sender_opmon_link);
      80           56 :   opmgr.register_node("receivers", m_receiver_opmon_link);
      81           56 : }
      82              : 
      83              : void
      84           79 : NetworkManager::reset()
      85              : {
      86           79 :   TLOG_DEBUG(5) << "reset() BEGIN";
      87           79 :   m_subscriber_update_thread_running = false;
      88           79 :   if (m_subscriber_update_thread && m_subscriber_update_thread->joinable()) {
      89            0 :     m_subscriber_update_thread->join();
      90              :   }
      91           79 :   {
      92           79 :     std::lock_guard<std::mutex> lkk(m_subscriber_plugin_map_mutex);
      93           79 :     m_subscriber_plugins.clear();
      94           79 :   }
      95           79 :   {
      96           79 :     std::lock_guard<std::mutex> lk(m_sender_plugin_map_mutex);
      97           79 :     m_sender_plugins.clear();
      98           79 :   }
      99           79 :   {
     100           79 :     std::lock_guard<std::mutex> lk(m_receiver_plugin_map_mutex);
     101           79 :     m_receiver_plugins.clear();
     102           79 :   }
     103              : 
     104           79 :   m_preconfigured_connections.clear();
     105           79 :   if (m_config_client != nullptr) {
     106            0 :     try {
     107            0 :       m_config_client->retract();
     108            0 :     } catch (FailedRetract const& r) {
     109            0 :       ers::error(r);
     110            0 :     }
     111              :   }
     112           79 :   m_config_client.reset(nullptr);
     113              : 
     114           79 :   m_sender_opmon_link = std::make_shared<dunedaq::opmonlib::OpMonLink>();
     115           79 :   m_receiver_opmon_link = std::make_shared<dunedaq::opmonlib::OpMonLink>();
     116           79 :   TLOG_DEBUG(5) << "reset() END";
     117           79 : }
     118              : 
     119              : void
     120            0 : NetworkManager::shutdown()
     121              : {
     122            0 :   TLOG_DEBUG(5) << "shutdown() BEGIN";
     123            0 :   m_subscriber_update_thread_running = false;
     124            0 :   if (m_subscriber_update_thread && m_subscriber_update_thread->joinable()) {
     125            0 :     m_subscriber_update_thread->join();
     126              :   }
     127            0 :   {
     128            0 :     std::lock_guard<std::mutex> lkk(m_subscriber_plugin_map_mutex);
     129            0 :     m_subscriber_plugins.clear();
     130            0 :   }
     131            0 :   {
     132            0 :     std::lock_guard<std::mutex> lk(m_sender_plugin_map_mutex);
     133            0 :     m_sender_plugins.clear();
     134            0 :   }
     135            0 :   {
     136            0 :     std::lock_guard<std::mutex> lk(m_receiver_plugin_map_mutex);
     137            0 :     m_receiver_plugins.clear();
     138            0 :   }
     139              : 
     140            0 :   if (m_config_client != nullptr) {
     141            0 :     try {
     142            0 :       m_config_client->retract();
     143            0 :     } catch (FailedRetract const& r) {
     144            0 :       ers::error(r);
     145            0 :     }
     146              :   }
     147            0 :   TLOG_DEBUG(5) << "shutdown() END";
     148            0 : }
     149              : 
     150              : std::shared_ptr<ipm::Receiver>
     151          226 : NetworkManager::get_receiver(ConnectionId const& conn_id)
     152              : {
     153          226 :   TLOG_DEBUG(9) << "Getting receiver for connection " << conn_id.uid;
     154              : 
     155          226 :   std::lock_guard<std::mutex> lk(m_receiver_plugin_map_mutex);
     156          226 :   if (!m_receiver_plugins.count(conn_id) || m_receiver_plugins.at(conn_id) == nullptr) {
     157              : 
     158          226 :     auto response = get_connections(conn_id);
     159              : 
     160           28 :     TLOG_DEBUG(9) << "Creating receiver for connection " << conn_id.uid;
     161           28 :     auto receiver = create_receiver(response.connections, conn_id);
     162              : 
     163           28 :     m_receiver_plugins[conn_id] = receiver;
     164           28 :   }
     165              : 
     166           56 :   return m_receiver_plugins[conn_id];
     167          226 : }
     168              : 
     169              : std::shared_ptr<ipm::Sender>
     170         1955 : NetworkManager::get_sender(ConnectionId const& conn_id)
     171              : {
     172         1955 :   TLOG_DEBUG(10) << "Getting sender for connection " << conn_id.uid;
     173              : 
     174         1955 :   std::lock_guard<std::mutex> lk(m_sender_plugin_map_mutex);
     175              : 
     176         1955 :   if (!m_sender_plugins.count(conn_id) || m_sender_plugins.at(conn_id) == nullptr) {
     177         1955 :     auto response = get_connections(conn_id, true);
     178              : 
     179           28 :     TLOG_DEBUG(10) << "Creating sender for connection " << conn_id.uid;
     180           28 :     auto sender = create_sender(response.connections[0]);
     181           28 :     m_sender_plugins[conn_id] = sender;
     182           28 :   }
     183              : 
     184           28 :   if (m_sender_plugins.count(conn_id)) {
     185           28 :     return m_sender_plugins[conn_id];
     186              :   }
     187              : 
     188            0 :   return nullptr;
     189         1955 : }
     190              : 
     191              : void
     192            1 : NetworkManager::remove_sender(ConnectionId const& conn_id)
     193              : {
     194            1 :   TLOG_DEBUG(10) << "Removing sender for connection " << conn_id.uid;
     195              : 
     196            1 :   std::lock_guard<std::mutex> lk(m_sender_plugin_map_mutex);
     197            1 :   m_sender_plugins.erase(conn_id);
     198            1 : }
     199              : 
     200              : bool
     201           35 : NetworkManager::is_pubsub_connection(ConnectionId const& conn_id) const
     202              : {
     203           35 :   auto response = get_connections(conn_id);
     204           34 :   bool is_pubsub = response.connections[0].connection_type == ConnectionType::kPubSub;
     205              : 
     206              :   // TLOG() << "Returning " << std::boolalpha << is_pubsub << " for request " << request;
     207           68 :   return is_pubsub;
     208           34 : }
     209              : 
     210              : ConnectionResponse
     211         2216 : NetworkManager::get_connections(ConnectionId const& conn_id, bool restrict_single) const
     212              : {
     213         2216 :   auto response = get_preconfigured_connections(conn_id);
     214         2216 :   if (restrict_single && response.connections.size() > 1) {
     215            0 :     throw NameCollision(ERS_HERE, conn_id.uid);
     216              :   }
     217         2216 :   if (m_config_client != nullptr) {
     218            0 :     auto start_time = std::chrono::steady_clock::now();
     219            0 :     while (
     220            0 :       std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - start_time).count() <
     221              :       1000) {
     222            0 :       try {
     223            0 :         auto client_response = m_config_client->resolve_connection(conn_id, conn_id.session);
     224            0 :         if (restrict_single && client_response.connections.size() > 1) {
     225            0 :           throw NameCollision(ERS_HERE, conn_id.uid);
     226              :         }
     227              : 
     228            0 :         if (client_response.connections.size() > 0) {
     229            0 :           response = client_response;
     230              :         }
     231            0 :         break;
     232            0 :       } catch (FailedLookup const& lf) {
     233            0 :         if (m_config_client->is_connected()) {
     234            0 :           throw ConnectionNotFound(ERS_HERE, conn_id.uid, conn_id.data_type, lf);
     235              :         }
     236            0 :         usleep(1000);
     237            0 :       }
     238              :     }
     239              :   }
     240              : 
     241         2216 :   if (response.connections.size() == 0) {
     242         2126 :     throw ConnectionNotFound(ERS_HERE, conn_id.uid, conn_id.data_type);
     243              :   }
     244              : 
     245           90 :   return response;
     246         2126 : }
     247              : 
     248              : ConnectionResponse
     249         2222 : NetworkManager::get_preconfigured_connections(ConnectionId const& conn_id) const
     250              : {
     251         2222 :   ConnectionResponse matching_connections;
     252         7615 :   for (auto& conn : m_preconfigured_connections) {
     253         5393 :     if (is_match(conn_id, conn.first)) {
     254          103 :       matching_connections.connections.emplace_back(conn.second);
     255              :     }
     256              :   }
     257              : 
     258         2222 :   return matching_connections;
     259            0 : }
     260              : 
     261              : std::set<std::string>
     262            7 : NetworkManager::get_datatypes(std::string const& uid) const
     263              : {
     264            7 :   std::set<std::string> output;
     265           39 :   for (auto& conn : m_preconfigured_connections) {
     266           32 :     if (conn.second->UID() == uid)
     267            4 :       output.insert(conn.second->get_data_type());
     268              :   }
     269              : 
     270            7 :   return output;
     271            0 : }
     272              : 
     273              : std::vector<std::string>
     274            7 : NetworkManager::get_pubsub_connection_strings(std::vector<ConnectionInfo> const& connections)
     275              : {
     276            7 :   std::vector<std::string> uris;
     277           18 :   for (auto& conn : connections) {
     278              :     // Check for case where both ends are in app and ConnectivityService hasn't received other end yet, or
     279              :     // ConnectivityService was no longer available
     280           11 :     if (conn.uri.find("*") != std::string::npos || conn.uri.find("0.0.0.0") != std::string::npos) {
     281            0 :       continue;
     282              :     }
     283           11 :     uris.push_back(conn.uri);
     284              :   }
     285            7 :   return uris;
     286            0 : }
     287              : 
     288              : std::shared_ptr<ipm::Receiver>
     289           28 : NetworkManager::create_receiver(std::vector<ConnectionInfo> connections, ConnectionId const& conn_id)
     290              : {
     291           28 :   TLOG_DEBUG(12) << "START";
     292           28 :   if (connections.size() == 0) {
     293            0 :     return nullptr;
     294              :   }
     295              : 
     296           28 :   bool is_pubsub = connections[0].connection_type == ConnectionType::kPubSub;
     297           28 :   if (connections.size() > 1 && !is_pubsub) {
     298            0 :     throw OperationFailed(ERS_HERE,
     299            0 :                           "Trying to configure a kSendRecv receiver with multiple Connections is not allowed!");
     300              :   }
     301              : 
     302           28 :   auto plugin_type =
     303           49 :     ipm::get_recommended_plugin_name(is_pubsub ? ipm::IpmPluginType::Subscriber : ipm::IpmPluginType::Receiver);
     304              : 
     305           28 :   TLOG_DEBUG(12) << "Creating plugin of type " << plugin_type;
     306           28 :   auto plugin = dunedaq::ipm::make_ipm_receiver(plugin_type);
     307              : 
     308           28 :   nlohmann::json config_json;
     309           28 :   if (is_pubsub) {
     310            7 :     std::vector<std::string> uris = get_pubsub_connection_strings(connections);
     311            7 :     if (uris.size() == 0) {
     312            0 :       return nullptr;
     313              :     }
     314            7 :     config_json["connection_strings"] = uris;
     315            7 :   } else {
     316           21 :     config_json["connection_string"] = connections[0].uri;
     317              :   }
     318           28 :   auto newCs = plugin->connect_for_receives(config_json);
     319           28 :   TLOG_DEBUG(12) << "Receiver reports connected to URI " << newCs;
     320              : 
     321              :   // Replace with resolved if there are wildcards (host and/or port)
     322           28 :   if (connections[0].uri.find("*") != std::string::npos || connections[0].uri.find("0.0.0.0") != std::string::npos) {
     323            0 :     TLOG_DEBUG(14) << "Wildcard found in connection URI " << connections[0].uri << ", adjusting before publish";
     324            0 :     utilities::ZmqUri newUri(newCs);
     325            0 :     utilities::ZmqUri oldUri(connections[0].uri);
     326              : 
     327            0 :     if (oldUri.port == "*")
     328            0 :       oldUri.port = newUri.port;
     329            0 :     if (oldUri.host == "*" || oldUri.host == "0.0.0.0")
     330            0 :       oldUri.host = newUri.host;
     331              : 
     332            0 :     connections[0].uri = oldUri.to_string();
     333            0 :     TLOG_DEBUG(14) << "Connection URI is now " << connections[0].uri;
     334            0 :   }
     335              : 
     336           28 :   if (is_pubsub) {
     337            7 :     TLOG_DEBUG(12) << "Subscribing to topic " << connections[0].data_type << " after connect_for_receives";
     338            7 :     auto subscriber = std::dynamic_pointer_cast<ipm::Subscriber>(plugin);
     339            7 :     subscriber->subscribe(connections[0].data_type);
     340            7 :     std::lock_guard<std::mutex> lkk(m_subscriber_plugin_map_mutex);
     341            7 :     m_subscriber_plugins[conn_id] = subscriber;
     342            7 :     if (!m_subscriber_update_thread_running && m_config_client != nullptr) {
     343            0 :       m_subscriber_update_thread_running = true;
     344            0 :       m_subscriber_update_thread = std::make_unique<std::thread>(&NetworkManager::update_subscribers, this);
     345              :     }
     346            7 :   }
     347              : 
     348           28 :   if (m_config_client != nullptr && !is_pubsub) {
     349            0 :     m_config_client->publish(connections[0]);
     350              :   }
     351              : 
     352           28 :   register_monitorable_node(plugin, m_receiver_opmon_link, conn_id.uid, is_pubsub);
     353              : 
     354           28 :   TLOG_DEBUG(12) << "END";
     355           28 :   return plugin;
     356           28 : }
     357              : 
     358              : std::shared_ptr<ipm::Sender>
     359           28 : NetworkManager::create_sender(ConnectionInfo connection)
     360              : {
     361           28 :   auto is_pubsub = connection.connection_type == ConnectionType::kPubSub;
     362           28 :   auto plugin_type =
     363           50 :     ipm::get_recommended_plugin_name(is_pubsub ? ipm::IpmPluginType::Publisher : ipm::IpmPluginType::Sender);
     364              : 
     365              :   // Check for case where both ends are in app and ConnectivityService hasn't received other end yet
     366           28 :   if (!is_pubsub &&
     367           22 :       (connection.uri.find("*") != std::string::npos || connection.uri.find("0.0.0.0") != std::string::npos)) {
     368            0 :     return nullptr;
     369              :   }
     370              : 
     371           28 :   TLOG_DEBUG(11) << "Creating sender plugin of type " << plugin_type;
     372           28 :   auto plugin = dunedaq::ipm::make_ipm_sender(plugin_type);
     373           28 :   TLOG_DEBUG(11) << "Connecting sender plugin to " << connection.uri;
     374           28 :   auto newCs =
     375          196 :     plugin->connect_for_sends({ { "connection_string", connection.uri }, { "capacity", connection.capacity } });
     376           28 :   TLOG_DEBUG(11) << "Sender Plugin connected, reports URI " << newCs;
     377              : 
     378              :   // Replace with resolved if there are wildcards (host and/or port)
     379           28 :   if (connection.uri.find("*") != std::string::npos || connection.uri.find("0.0.0.0") != std::string::npos) {
     380            0 :     TLOG_DEBUG(13) << "Wildcard found in connection URI " << connection.uri << ", adjusting before publish";
     381            0 :     utilities::ZmqUri newUri(newCs);
     382            0 :     utilities::ZmqUri oldUri(connection.uri);
     383              : 
     384            0 :     if (oldUri.port == "*")
     385            0 :       oldUri.port = newUri.port;
     386            0 :     if (oldUri.host == "*" || oldUri.host == "0.0.0.0")
     387            0 :       oldUri.host = newUri.host;
     388              : 
     389            0 :     connection.uri = oldUri.to_string();
     390            0 :     TLOG_DEBUG(13) << "Connection URI is now " << connection.uri;
     391            0 :   }
     392              : 
     393           28 :   if (m_config_client != nullptr && is_pubsub) {
     394            0 :     m_config_client->publish(connection);
     395              :   }
     396              : 
     397           28 :   register_monitorable_node(plugin, m_sender_opmon_link, connection.uid, is_pubsub);
     398              : 
     399           28 :   return plugin;
     400          196 : }
     401              : 
     402              : void
     403            0 : NetworkManager::update_subscribers()
     404              : {
     405            0 :   while (m_subscriber_update_thread_running.load()) {
     406            0 :     {
     407            0 :       TLOG_DEBUG(14) << "Updating registered subscribers";
     408            0 :       std::lock_guard<std::mutex> lk(m_subscriber_plugin_map_mutex);
     409            0 :       for (auto& subscriber_pair : m_subscriber_plugins) {
     410            0 :         try {
     411            0 :           auto response = get_connections(subscriber_pair.first, false);
     412              : 
     413            0 :           nlohmann::json config_json;
     414            0 :           std::vector<std::string> uris = get_pubsub_connection_strings(response.connections);
     415            0 :           if (uris.size() == 0) {
     416            0 :             TLOG_DEBUG(14) << "No valid connection strings found, is the Connectivity Service running?!";
     417            0 :             continue;
     418            0 :           }
     419            0 :           config_json["connection_strings"] = uris;
     420              : 
     421            0 :           subscriber_pair.second->connect_for_receives(config_json);
     422            0 :         } catch (ers::Issue&) {
     423            0 :         }
     424              :       }
     425            0 :     }
     426            0 :     std::this_thread::sleep_for(m_config_client_interval);
     427              :   }
     428            0 : }
     429              : 
     430              : void
     431           56 : NetworkManager::register_monitorable_node(std::shared_ptr<opmonlib::MonitorableObject> conn,
     432              :                                           std::shared_ptr<opmonlib::OpMonLink> link,
     433              :                                           const std::string& name,
     434              :                                           bool /*is_pubsub*/)
     435              : {
     436              : 
     437           56 :   try {
     438           58 :     link->register_node(name, conn);
     439            2 :   } catch (const opmonlib::NonUniqueNodeName& err) {
     440            2 :     bool success = false;
     441            2 :     size_t counter = 1;
     442            2 :     do {
     443            2 :       auto fname = fmt::format("{}--{}", name, counter);
     444            2 :       try {
     445            2 :         link->register_node(fname, conn);
     446            2 :         success = true;
     447            0 :       } catch (const opmonlib::NonUniqueNodeName& err) {
     448            0 :         ++counter;
     449            0 :       }
     450            2 :     } while (!success);
     451            2 :   }
     452           56 : }
     453              : 
     454              : } // namespace dunedaq::iomanager
        

Generated by: LCOV version 2.0-1