24#include <fmt/format.h>
41 const std::vector<const confmodel::NetworkConnection*>& connections,
49 for (
auto& connection : connections) {
50 auto name = connection->UID();
51 TLOG_DEBUG(15) <<
"Adding connection " << name <<
" to connection map";
54 TLOG_DEBUG(15) <<
"Name collision for connection " << name <<
", DT " << connection->get_data_type()
57 throw NameCollision(
ERS_HERE, connection->UID());
62 if (conn_svc !=
nullptr) {
66 auto connectionServer = conn_svc->
get_host();
67 auto connectionPort = service->get_port();
68 auto config_client_interval = std::chrono::milliseconds(conn_svc->
get_interval_ms());
70 TLOG_DEBUG(17) <<
"ConnectionServer host and port are " << connectionServer <<
":" << connectionPort;
73 std::make_unique<ConfigClient>(connectionServer, std::to_string(connectionPort),session_name, config_client_interval);
107 }
catch (FailedRetract
const& r) {
142 }
catch (FailedRetract
const& r) {
149std::shared_ptr<ipm::Receiver>
152 TLOG_DEBUG(9) <<
"Getting receiver for connection " << conn_id.
uid;
159 TLOG_DEBUG(9) <<
"Creating receiver for connection " << conn_id.
uid;
168std::shared_ptr<ipm::Sender>
171 TLOG_DEBUG(10) <<
"Getting sender for connection " << conn_id.
uid;
178 TLOG_DEBUG(10) <<
"Creating sender for connection " << conn_id.
uid;
193 TLOG_DEBUG(10) <<
"Removing sender for connection " << conn_id.
uid;
214 if (restrict_single && response.connections.size() > 1) {
218 auto start_time = std::chrono::steady_clock::now();
220 std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() -
start_time).count() <
224 if (restrict_single && client_response.connections.size() > 1) {
228 if (client_response.connections.size() > 0) {
229 response = client_response;
232 }
catch (FailedLookup
const& lf) {
241 if (response.connections.size() == 0) {
253 if (
is_match(conn_id, conn.first)) {
254 matching_connections.
connections.push_back(conn.second);
258 return matching_connections;
264 std::set<std::string> output;
266 if (conn.second->UID() == uid)
267 output.insert(conn.second->get_data_type());
273std::vector<std::string>
276 std::vector<std::string> uris;
277 for (
auto& conn : connections) {
279 if (conn.uri.find(
"*") != std::string::npos || conn.uri.find(
"0.0.0.0") != std::string::npos) {
282 uris.push_back(conn.uri);
287std::shared_ptr<ipm::Receiver>
291 if (connections.size() == 0) {
296 if (connections.size() > 1 && !is_pubsub) {
298 "Trying to configure a kSendRecv receiver with multiple Connections is not allowed!");
304 TLOG_DEBUG(12) <<
"Creating plugin of type " << plugin_type;
307 nlohmann::json config_json;
310 if (uris.size() == 0) {
313 config_json[
"connection_strings"] = uris;
315 config_json[
"connection_string"] = connections[0].uri;
317 auto newCs = plugin->connect_for_receives(config_json);
318 TLOG_DEBUG(12) <<
"Receiver reports connected to URI " << newCs;
321 if (connections[0].
uri.find(
"*") != std::string::npos || connections[0].uri.find(
"0.0.0.0") != std::string::npos) {
322 TLOG_DEBUG(14) <<
"Wildcard found in connection URI " << connections[0].uri <<
", adjusting before publish";
326 if (oldUri.port ==
"*")
327 oldUri.port = newUri.port;
328 if (oldUri.host ==
"*" || oldUri.host ==
"0.0.0.0")
329 oldUri.host = newUri.host;
331 connections[0].uri = oldUri.to_string();
332 TLOG_DEBUG(14) <<
"Connection URI is now " << connections[0].uri;
336 TLOG_DEBUG(12) <<
"Subscribing to topic " << connections[0].data_type <<
" after connect_for_receives";
337 auto subscriber = std::dynamic_pointer_cast<ipm::Subscriber>(plugin);
338 subscriber->subscribe(connections[0].data_type);
357std::shared_ptr<ipm::Sender>
366 (connection.
uri.find(
"*") != std::string::npos || connection.
uri.find(
"0.0.0.0") != std::string::npos)) {
370 TLOG_DEBUG(11) <<
"Creating sender plugin of type " << plugin_type;
372 TLOG_DEBUG(11) <<
"Connecting sender plugin to " << connection.
uri;
373 auto newCs = plugin->connect_for_sends({ {
"connection_string", connection.
uri } });
374 TLOG_DEBUG(11) <<
"Sender Plugin connected, reports URI " << newCs;
377 if (connection.
uri.find(
"*") != std::string::npos || connection.
uri.find(
"0.0.0.0") != std::string::npos) {
378 TLOG_DEBUG(13) <<
"Wildcard found in connection URI " << connection.
uri <<
", adjusting before publish";
382 if (oldUri.port ==
"*")
383 oldUri.port = newUri.port;
384 if (oldUri.host ==
"*" || oldUri.host ==
"0.0.0.0")
385 oldUri.host = newUri.host;
387 connection.
uri = oldUri.to_string();
388 TLOG_DEBUG(13) <<
"Connection URI is now " << connection.
uri;
405 TLOG_DEBUG(14) <<
"Updating registered subscribers";
411 nlohmann::json config_json;
413 if (uris.size() == 0) {
414 TLOG_DEBUG(14) <<
"No valid connection strings found, is the Connectivity Service running?!";
417 config_json[
"connection_strings"] = uris;
419 subscriber_pair.second->connect_for_receives(config_json);
430 std::shared_ptr<opmonlib::OpMonLink> link,
431 const std::string& name,
436 link->register_node(name, conn);
437 }
catch (
const opmonlib::NonUniqueNodeName& err) {
438 bool success =
false;
441 auto fname = fmt::format(
"{}--{}", name, counter);
443 link->register_node(fname, conn);
445 }
catch (
const opmonlib::NonUniqueNodeName& err) {
uint32_t get_interval_ms() const
Get "interval_ms" attribute value. Interval between publishes and polls of connectivity service infor...
const dunedaq::confmodel::Service * get_service() const
Get "service" relationship value. Connectivity service Service definition.
const std::string & get_host() const
Get "host" attribute value. Connectivity service Host.
ConnectionResponse get_preconfigured_connections(ConnectionId const &conn_id) const
void configure(const std::string &session_name, const std::vector< const confmodel::NetworkConnection * > &connections, const confmodel::ConnectivityService *conn_svc, dunedaq::opmonlib::OpMonManager &)
std::atomic< bool > m_subscriber_update_thread_running
std::unordered_map< ConnectionId, std::shared_ptr< ipm::Receiver > > m_receiver_plugins
std::shared_ptr< ipm::Receiver > create_receiver(std::vector< ConnectionInfo > connections, ConnectionId const &conn_id)
std::mutex m_receiver_plugin_map_mutex
std::unordered_map< ConnectionId, std::shared_ptr< ipm::Sender > > m_sender_plugins
std::unordered_map< ConnectionId, const confmodel::NetworkConnection * > m_preconfigured_connections
std::unique_ptr< std::thread > m_subscriber_update_thread
std::set< std::string > get_datatypes(std::string const &uid) const
static NetworkManager & get()
std::shared_ptr< dunedaq::opmonlib::OpMonLink > m_sender_opmon_link
std::shared_ptr< ipm::Sender > create_sender(ConnectionInfo connection)
std::shared_ptr< ipm::Sender > get_sender(ConnectionId const &conn_id)
std::unique_ptr< ConfigClient > m_config_client
static void register_monitorable_node(std::shared_ptr< opmonlib::MonitorableObject > conn, std::shared_ptr< opmonlib::OpMonLink > link, const std::string &name, bool is_pubsub)
std::mutex m_sender_plugin_map_mutex
ConnectionResponse get_connections(ConnectionId const &conn_id, bool restrict_single=false) const
bool is_pubsub_connection(ConnectionId const &conn_id) const
static std::unique_ptr< NetworkManager > s_instance
std::chrono::milliseconds m_config_client_interval
std::mutex m_subscriber_plugin_map_mutex
std::shared_ptr< ipm::Receiver > get_receiver(ConnectionId const &conn_id)
std::vector< std::string > get_pubsub_connection_strings(std::vector< ConnectionInfo > const &connections)
void remove_sender(ConnectionId const &conn_id)
void update_subscribers()
std::unordered_map< ConnectionId, std::shared_ptr< ipm::Subscriber > > m_subscriber_plugins
std::shared_ptr< dunedaq::opmonlib::OpMonLink > m_receiver_opmon_link
void register_node(ElementId name, NewNodePtr)
Base class for any user define issue.
#define TLOG_DEBUG(lvl,...)
bool is_match(ConnectionId const &search, ConnectionId const &check)
std::shared_ptr< Sender > make_ipm_sender(std::string const &plugin_name)
std::string get_recommended_plugin_name(IpmPluginType type)
std::shared_ptr< Receiver > make_ipm_receiver(std::string const &plugin_name)
ZmqUri parse_connection_string(std::string connection_string)
Unsupported std::string uri Execution of command std::string error Failed to create CommandFacility uri
Cannot add TPSet with start_time
void error(const Issue &issue)
ConnectionType connection_type
std::vector< ConnectionInfo > connections