25#include <fmt/format.h>
42 const std::vector<const confmodel::NetworkConnection*>& connections,
50 for (
auto& connection : connections) {
51 auto name = connection->UID();
52 TLOG_DEBUG(15) <<
"Adding connection " << name <<
" to connection map";
55 TLOG_DEBUG(15) <<
"Name collision for connection " << name <<
", DT " << connection->get_data_type()
58 throw NameCollision(
ERS_HERE, connection->UID());
63 if (conn_svc !=
nullptr) {
67 auto connectionServer = conn_svc->
get_host();
68 auto connectionPort = service->get_port();
69 auto config_client_interval = std::chrono::milliseconds(conn_svc->
get_interval_ms());
71 TLOG_DEBUG(17) <<
"ConnectionServer host and port are " << connectionServer <<
":" << connectionPort;
74 connectionServer, std::to_string(connectionPort), session_name, config_client_interval);
108 }
catch (FailedRetract
const& r) {
143 }
catch (FailedRetract
const& r) {
150std::shared_ptr<ipm::Receiver>
153 TLOG_DEBUG(9) <<
"Getting receiver for connection " << conn_id.
uid;
160 TLOG_DEBUG(9) <<
"Creating receiver for connection " << conn_id.
uid;
169std::shared_ptr<ipm::Sender>
172 TLOG_DEBUG(10) <<
"Getting sender for connection " << conn_id.
uid;
179 TLOG_DEBUG(10) <<
"Creating sender for connection " << conn_id.
uid;
194 TLOG_DEBUG(10) <<
"Removing sender for connection " << conn_id.
uid;
214 if (restrict_single && response.connections.size() > 1) {
218 auto start_time = std::chrono::steady_clock::now();
220 std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() -
start_time).count() <
224 if (restrict_single && client_response.connections.size() > 1) {
228 if (client_response.connections.size() > 0) {
229 response = client_response;
232 }
catch (FailedLookup
const& lf) {
241 if (response.connections.size() == 0) {
253 if (
is_match(conn_id, conn.first)) {
254 matching_connections.
connections.emplace_back(conn.second);
258 return matching_connections;
264 std::set<std::string> output;
266 if (conn.second->UID() == uid)
267 output.insert(conn.second->get_data_type());
273std::vector<std::string>
276 std::vector<std::string> uris;
277 for (
auto& conn : connections) {
280 if (conn.uri.find(
"*") != std::string::npos || conn.uri.find(
"0.0.0.0") != std::string::npos) {
283 uris.push_back(conn.uri);
288std::shared_ptr<ipm::Receiver>
292 if (connections.size() == 0) {
297 if (connections.size() > 1 && !is_pubsub) {
299 "Trying to configure a kSendRecv receiver with multiple Connections is not allowed!");
305 TLOG_DEBUG(12) <<
"Creating plugin of type " << plugin_type;
308 nlohmann::json config_json;
311 if (uris.size() == 0) {
314 config_json[
"connection_strings"] = uris;
316 config_json[
"connection_string"] = connections[0].uri;
318 auto newCs = plugin->connect_for_receives(config_json);
319 TLOG_DEBUG(12) <<
"Receiver reports connected to URI " << newCs;
322 if (connections[0].
uri.find(
"*") != std::string::npos || connections[0].uri.find(
"0.0.0.0") != std::string::npos) {
323 TLOG_DEBUG(14) <<
"Wildcard found in connection URI " << connections[0].uri <<
", adjusting before publish";
327 if (oldUri.
port ==
"*")
329 if (oldUri.
host ==
"*" || oldUri.
host ==
"0.0.0.0")
333 TLOG_DEBUG(14) <<
"Connection URI is now " << connections[0].uri;
337 TLOG_DEBUG(12) <<
"Subscribing to topic " << connections[0].data_type <<
" after connect_for_receives";
338 auto subscriber = std::dynamic_pointer_cast<ipm::Subscriber>(plugin);
339 subscriber->subscribe(connections[0].data_type);
358std::shared_ptr<ipm::Sender>
367 (connection.
uri.find(
"*") != std::string::npos || connection.
uri.find(
"0.0.0.0") != std::string::npos)) {
371 TLOG_DEBUG(11) <<
"Creating sender plugin of type " << plugin_type;
373 TLOG_DEBUG(11) <<
"Connecting sender plugin to " << connection.
uri;
375 plugin->connect_for_sends({ {
"connection_string", connection.
uri }, {
"capacity", connection.
capacity } });
376 TLOG_DEBUG(11) <<
"Sender Plugin connected, reports URI " << newCs;
379 if (connection.
uri.find(
"*") != std::string::npos || connection.
uri.find(
"0.0.0.0") != std::string::npos) {
380 TLOG_DEBUG(13) <<
"Wildcard found in connection URI " << connection.
uri <<
", adjusting before publish";
384 if (oldUri.
port ==
"*")
386 if (oldUri.
host ==
"*" || oldUri.
host ==
"0.0.0.0")
390 TLOG_DEBUG(13) <<
"Connection URI is now " << connection.
uri;
407 TLOG_DEBUG(14) <<
"Updating registered subscribers";
413 nlohmann::json config_json;
415 if (uris.size() == 0) {
416 TLOG_DEBUG(14) <<
"No valid connection strings found, is the Connectivity Service running?!";
419 config_json[
"connection_strings"] = uris;
421 subscriber_pair.second->connect_for_receives(config_json);
432 std::shared_ptr<opmonlib::OpMonLink> link,
433 const std::string& name,
438 link->register_node(name, conn);
439 }
catch (
const opmonlib::NonUniqueNodeName& err) {
440 bool success =
false;
443 auto fname = fmt::format(
"{}--{}", name, counter);
445 link->register_node(fname, conn);
447 }
catch (
const opmonlib::NonUniqueNodeName& err) {
uint32_t get_interval_ms() const
Get "interval_ms" attribute value. Interval between publishes and polls of connectivity service infor...
const dunedaq::confmodel::Service * get_service() const
Get "service" relationship value. Connectivity service Service definition.
const std::string & get_host() const
Get "host" attribute value. Connectivity service Host.
ConnectionResponse get_preconfigured_connections(ConnectionId const &conn_id) const
void configure(const std::string &session_name, const std::vector< const confmodel::NetworkConnection * > &connections, const confmodel::ConnectivityService *conn_svc, dunedaq::opmonlib::OpMonManager &)
std::atomic< bool > m_subscriber_update_thread_running
std::unordered_map< ConnectionId, std::shared_ptr< ipm::Receiver > > m_receiver_plugins
std::shared_ptr< ipm::Receiver > create_receiver(std::vector< ConnectionInfo > connections, ConnectionId const &conn_id)
std::mutex m_receiver_plugin_map_mutex
std::unordered_map< ConnectionId, std::shared_ptr< ipm::Sender > > m_sender_plugins
std::unordered_map< ConnectionId, const confmodel::NetworkConnection * > m_preconfigured_connections
std::unique_ptr< std::thread > m_subscriber_update_thread
std::set< std::string > get_datatypes(std::string const &uid) const
static NetworkManager & get()
std::shared_ptr< dunedaq::opmonlib::OpMonLink > m_sender_opmon_link
std::shared_ptr< ipm::Sender > create_sender(ConnectionInfo connection)
std::shared_ptr< ipm::Sender > get_sender(ConnectionId const &conn_id)
std::unique_ptr< ConfigClient > m_config_client
static void register_monitorable_node(std::shared_ptr< opmonlib::MonitorableObject > conn, std::shared_ptr< opmonlib::OpMonLink > link, const std::string &name, bool is_pubsub)
std::mutex m_sender_plugin_map_mutex
ConnectionResponse get_connections(ConnectionId const &conn_id, bool restrict_single=false) const
bool is_pubsub_connection(ConnectionId const &conn_id) const
static std::unique_ptr< NetworkManager > s_instance
std::chrono::milliseconds m_config_client_interval
std::mutex m_subscriber_plugin_map_mutex
std::shared_ptr< ipm::Receiver > get_receiver(ConnectionId const &conn_id)
std::vector< std::string > get_pubsub_connection_strings(std::vector< ConnectionInfo > const &connections)
void remove_sender(ConnectionId const &conn_id)
void update_subscribers()
std::unordered_map< ConnectionId, std::shared_ptr< ipm::Subscriber > > m_subscriber_plugins
std::shared_ptr< dunedaq::opmonlib::OpMonLink > m_receiver_opmon_link
void register_node(ElementId name, NewNodePtr)
Base class for any user define issue.
#define TLOG_DEBUG(lvl,...)
bool is_match(ConnectionId const &search, ConnectionId const &check)
std::shared_ptr< Sender > make_ipm_sender(std::string const &plugin_name)
std::string get_recommended_plugin_name(IpmPluginType type)
std::shared_ptr< Receiver > make_ipm_receiver(std::string const &plugin_name)
Unsupported std::string uri Execution of command std::string error Failed to create CommandFacility uri
Cannot add TPSet with start_time
void error(const Issue &issue)
ConnectionType connection_type
std::vector< ConnectionInfo > connections