DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
NetworkManager.cpp
Go to the documentation of this file.
1
11
12#include "ipm/PluginInfo.hpp"
13#include "logging/Logging.hpp"
15
17#include "confmodel/Service.hpp"
18
19#include <map>
20#include <memory>
21#include <set>
22#include <string>
23#include <vector>
24
25#include <fmt/format.h>
26
27namespace dunedaq::iomanager {
28
29std::unique_ptr<NetworkManager> NetworkManager::s_instance = nullptr;
30
31NetworkManager&
33{
34 if (!s_instance) {
35 s_instance.reset(new NetworkManager());
36 }
37 return *s_instance;
38}
39
40void
41NetworkManager::configure(const std::string& session_name,
42 const std::vector<const confmodel::NetworkConnection*>& connections,
43 const confmodel::ConnectivityService* conn_svc,
45{
46 if (!m_preconfigured_connections.empty()) {
47 throw AlreadyConfigured(ERS_HERE);
48 }
49
50 for (auto& connection : connections) {
51 auto name = connection->UID();
52 TLOG_DEBUG(15) << "Adding connection " << name << " to connection map";
53 ConnectionId id(connection);
54 if (m_preconfigured_connections.count(id)) {
55 TLOG_DEBUG(15) << "Name collision for connection " << name << ", DT " << connection->get_data_type()
56 << " connection_map.count: " << m_preconfigured_connections.count(id);
57 reset();
58 throw NameCollision(ERS_HERE, connection->UID());
59 }
60 m_preconfigured_connections[id] = connection;
61 }
62
63 if (conn_svc != nullptr) {
64
65 auto service = conn_svc->get_service();
66
67 auto connectionServer = conn_svc->get_host();
68 auto connectionPort = service->get_port();
69 auto config_client_interval = std::chrono::milliseconds(conn_svc->get_interval_ms());
70
71 TLOG_DEBUG(17) << "ConnectionServer host and port are " << connectionServer << ":" << connectionPort;
72 if (m_config_client == nullptr) {
73 m_config_client = std::make_unique<ConfigClient>(
74 connectionServer, std::to_string(connectionPort), session_name, config_client_interval);
75 }
76 m_config_client_interval = config_client_interval;
77 }
78
79 opmgr.register_node("senders", m_sender_opmon_link);
80 opmgr.register_node("receivers", m_receiver_opmon_link);
81}
82
83void
85{
86 TLOG_DEBUG(5) << "reset() BEGIN";
90 }
91 {
92 std::lock_guard<std::mutex> lkk(m_subscriber_plugin_map_mutex);
94 }
95 {
96 std::lock_guard<std::mutex> lk(m_sender_plugin_map_mutex);
97 m_sender_plugins.clear();
98 }
99 {
100 std::lock_guard<std::mutex> lk(m_receiver_plugin_map_mutex);
101 m_receiver_plugins.clear();
102 }
103
105 if (m_config_client != nullptr) {
106 try {
107 m_config_client->retract();
108 } catch (FailedRetract const& r) {
109 ers::error(r);
110 }
111 }
112 m_config_client.reset(nullptr);
113
114 m_sender_opmon_link = std::make_shared<dunedaq::opmonlib::OpMonLink>();
115 m_receiver_opmon_link = std::make_shared<dunedaq::opmonlib::OpMonLink>();
116 TLOG_DEBUG(5) << "reset() END";
117}
118
119void
121{
122 TLOG_DEBUG(5) << "shutdown() BEGIN";
126 }
127 {
128 std::lock_guard<std::mutex> lkk(m_subscriber_plugin_map_mutex);
129 m_subscriber_plugins.clear();
130 }
131 {
132 std::lock_guard<std::mutex> lk(m_sender_plugin_map_mutex);
133 m_sender_plugins.clear();
134 }
135 {
136 std::lock_guard<std::mutex> lk(m_receiver_plugin_map_mutex);
137 m_receiver_plugins.clear();
138 }
139
140 if (m_config_client != nullptr) {
141 try {
142 m_config_client->retract();
143 } catch (FailedRetract const& r) {
144 ers::error(r);
145 }
146 }
147 TLOG_DEBUG(5) << "shutdown() END";
148}
149
150std::shared_ptr<ipm::Receiver>
152{
153 TLOG_DEBUG(9) << "Getting receiver for connection " << conn_id.uid;
154
155 std::lock_guard<std::mutex> lk(m_receiver_plugin_map_mutex);
156 if (!m_receiver_plugins.count(conn_id) || m_receiver_plugins.at(conn_id) == nullptr) {
157
158 auto response = get_connections(conn_id);
159
160 TLOG_DEBUG(9) << "Creating receiver for connection " << conn_id.uid;
161 auto receiver = create_receiver(response.connections, conn_id);
162
163 m_receiver_plugins[conn_id] = receiver;
164 }
165
166 return m_receiver_plugins[conn_id];
167}
168
169std::shared_ptr<ipm::Sender>
171{
172 TLOG_DEBUG(10) << "Getting sender for connection " << conn_id.uid;
173
174 std::lock_guard<std::mutex> lk(m_sender_plugin_map_mutex);
175
176 if (!m_sender_plugins.count(conn_id) || m_sender_plugins.at(conn_id) == nullptr) {
177 auto response = get_connections(conn_id, true);
178
179 TLOG_DEBUG(10) << "Creating sender for connection " << conn_id.uid;
180 auto sender = create_sender(response.connections[0]);
181 m_sender_plugins[conn_id] = sender;
182 }
183
184 if (m_sender_plugins.count(conn_id)) {
185 return m_sender_plugins[conn_id];
186 }
187
188 return nullptr;
189}
190
191void
193{
194 TLOG_DEBUG(10) << "Removing sender for connection " << conn_id.uid;
195
196 std::lock_guard<std::mutex> lk(m_sender_plugin_map_mutex);
197 m_sender_plugins.erase(conn_id);
198}
199
200bool
202{
203 auto response = get_connections(conn_id);
204 bool is_pubsub = response.connections[0].connection_type == ConnectionType::kPubSub;
205
206 // TLOG() << "Returning " << std::boolalpha << is_pubsub << " for request " << request;
207 return is_pubsub;
208}
209
211NetworkManager::get_connections(ConnectionId const& conn_id, bool restrict_single) const
212{
213 auto response = get_preconfigured_connections(conn_id);
214 if (restrict_single && response.connections.size() > 1) {
215 throw NameCollision(ERS_HERE, conn_id.uid);
216 }
217 if (m_config_client != nullptr) {
218 auto start_time = std::chrono::steady_clock::now();
219 while (
220 std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - start_time).count() <
221 1000) {
222 try {
223 auto client_response = m_config_client->resolve_connection(conn_id, conn_id.session);
224 if (restrict_single && client_response.connections.size() > 1) {
225 throw NameCollision(ERS_HERE, conn_id.uid);
226 }
227
228 if (client_response.connections.size() > 0) {
229 response = client_response;
230 }
231 break;
232 } catch (FailedLookup const& lf) {
233 if (m_config_client->is_connected()) {
234 throw ConnectionNotFound(ERS_HERE, conn_id.uid, conn_id.data_type, lf);
235 }
236 usleep(1000);
237 }
238 }
239 }
240
241 if (response.connections.size() == 0) {
242 throw ConnectionNotFound(ERS_HERE, conn_id.uid, conn_id.data_type);
243 }
244
245 return response;
246}
247
250{
251 ConnectionResponse matching_connections;
252 for (auto& conn : m_preconfigured_connections) {
253 if (is_match(conn_id, conn.first)) {
254 matching_connections.connections.emplace_back(conn.second);
255 }
256 }
257
258 return matching_connections;
259}
260
261std::set<std::string>
262NetworkManager::get_datatypes(std::string const& uid) const
263{
264 std::set<std::string> output;
265 for (auto& conn : m_preconfigured_connections) {
266 if (conn.second->UID() == uid)
267 output.insert(conn.second->get_data_type());
268 }
269
270 return output;
271}
272
273std::vector<std::string>
274NetworkManager::get_pubsub_connection_strings(std::vector<ConnectionInfo> const& connections)
275{
276 std::vector<std::string> uris;
277 for (auto& conn : connections) {
278 // Check for case where both ends are in app and ConnectivityService hasn't received other end yet, or
279 // ConnectivityService was no longer available
280 if (conn.uri.find("*") != std::string::npos || conn.uri.find("0.0.0.0") != std::string::npos) {
281 continue;
282 }
283 uris.push_back(conn.uri);
284 }
285 return uris;
286}
287
288std::shared_ptr<ipm::Receiver>
289NetworkManager::create_receiver(std::vector<ConnectionInfo> connections, ConnectionId const& conn_id)
290{
291 TLOG_DEBUG(12) << "START";
292 if (connections.size() == 0) {
293 return nullptr;
294 }
295
296 bool is_pubsub = connections[0].connection_type == ConnectionType::kPubSub;
297 if (connections.size() > 1 && !is_pubsub) {
298 throw OperationFailed(ERS_HERE,
299 "Trying to configure a kSendRecv receiver with multiple Connections is not allowed!");
300 }
301
302 auto plugin_type =
304
305 TLOG_DEBUG(12) << "Creating plugin of type " << plugin_type;
306 auto plugin = dunedaq::ipm::make_ipm_receiver(plugin_type);
307
308 nlohmann::json config_json;
309 if (is_pubsub) {
310 std::vector<std::string> uris = get_pubsub_connection_strings(connections);
311 if (uris.size() == 0) {
312 return nullptr;
313 }
314 config_json["connection_strings"] = uris;
315 } else {
316 config_json["connection_string"] = connections[0].uri;
317 }
318 auto newCs = plugin->connect_for_receives(config_json);
319 TLOG_DEBUG(12) << "Receiver reports connected to URI " << newCs;
320
321 // Replace with resolved if there are wildcards (host and/or port)
322 if (connections[0].uri.find("*") != std::string::npos || connections[0].uri.find("0.0.0.0") != std::string::npos) {
323 TLOG_DEBUG(14) << "Wildcard found in connection URI " << connections[0].uri << ", adjusting before publish";
324 utilities::ZmqUri newUri(newCs);
325 utilities::ZmqUri oldUri(connections[0].uri);
326
327 if (oldUri.port == "*")
328 oldUri.port = newUri.port;
329 if (oldUri.host == "*" || oldUri.host == "0.0.0.0")
330 oldUri.host = newUri.host;
331
332 connections[0].uri = oldUri.to_string();
333 TLOG_DEBUG(14) << "Connection URI is now " << connections[0].uri;
334 }
335
336 if (is_pubsub) {
337 TLOG_DEBUG(12) << "Subscribing to topic " << connections[0].data_type << " after connect_for_receives";
338 auto subscriber = std::dynamic_pointer_cast<ipm::Subscriber>(plugin);
339 subscriber->subscribe(connections[0].data_type);
340 std::lock_guard<std::mutex> lkk(m_subscriber_plugin_map_mutex);
341 m_subscriber_plugins[conn_id] = subscriber;
344 m_subscriber_update_thread = std::make_unique<std::thread>(&NetworkManager::update_subscribers, this);
345 }
346 }
347
348 if (m_config_client != nullptr && !is_pubsub) {
349 m_config_client->publish(connections[0]);
350 }
351
352 register_monitorable_node(plugin, m_receiver_opmon_link, conn_id.uid, is_pubsub);
353
354 TLOG_DEBUG(12) << "END";
355 return plugin;
356}
357
358std::shared_ptr<ipm::Sender>
360{
361 auto is_pubsub = connection.connection_type == ConnectionType::kPubSub;
362 auto plugin_type =
364
365 // Check for case where both ends are in app and ConnectivityService hasn't received other end yet
366 if (!is_pubsub &&
367 (connection.uri.find("*") != std::string::npos || connection.uri.find("0.0.0.0") != std::string::npos)) {
368 return nullptr;
369 }
370
371 TLOG_DEBUG(11) << "Creating sender plugin of type " << plugin_type;
372 auto plugin = dunedaq::ipm::make_ipm_sender(plugin_type);
373 TLOG_DEBUG(11) << "Connecting sender plugin to " << connection.uri;
374 auto newCs =
375 plugin->connect_for_sends({ { "connection_string", connection.uri }, { "capacity", connection.capacity } });
376 TLOG_DEBUG(11) << "Sender Plugin connected, reports URI " << newCs;
377
378 // Replace with resolved if there are wildcards (host and/or port)
379 if (connection.uri.find("*") != std::string::npos || connection.uri.find("0.0.0.0") != std::string::npos) {
380 TLOG_DEBUG(13) << "Wildcard found in connection URI " << connection.uri << ", adjusting before publish";
381 utilities::ZmqUri newUri(newCs);
382 utilities::ZmqUri oldUri(connection.uri);
383
384 if (oldUri.port == "*")
385 oldUri.port = newUri.port;
386 if (oldUri.host == "*" || oldUri.host == "0.0.0.0")
387 oldUri.host = newUri.host;
388
389 connection.uri = oldUri.to_string();
390 TLOG_DEBUG(13) << "Connection URI is now " << connection.uri;
391 }
392
393 if (m_config_client != nullptr && is_pubsub) {
394 m_config_client->publish(connection);
395 }
396
397 register_monitorable_node(plugin, m_sender_opmon_link, connection.uid, is_pubsub);
398
399 return plugin;
400}
401
402void
404{
406 {
407 TLOG_DEBUG(14) << "Updating registered subscribers";
408 std::lock_guard<std::mutex> lk(m_subscriber_plugin_map_mutex);
409 for (auto& subscriber_pair : m_subscriber_plugins) {
410 try {
411 auto response = get_connections(subscriber_pair.first, false);
412
413 nlohmann::json config_json;
414 std::vector<std::string> uris = get_pubsub_connection_strings(response.connections);
415 if (uris.size() == 0) {
416 TLOG_DEBUG(14) << "No valid connection strings found, is the Connectivity Service running?!";
417 continue;
418 }
419 config_json["connection_strings"] = uris;
420
421 subscriber_pair.second->connect_for_receives(config_json);
422 } catch (ers::Issue&) {
423 }
424 }
425 }
426 std::this_thread::sleep_for(m_config_client_interval);
427 }
428}
429
430void
431NetworkManager::register_monitorable_node(std::shared_ptr<opmonlib::MonitorableObject> conn,
432 std::shared_ptr<opmonlib::OpMonLink> link,
433 const std::string& name,
434 bool /*is_pubsub*/)
435{
436
437 try {
438 link->register_node(name, conn);
439 } catch (const opmonlib::NonUniqueNodeName& err) {
440 bool success = false;
441 size_t counter = 1;
442 do {
443 auto fname = fmt::format("{}--{}", name, counter);
444 try {
445 link->register_node(fname, conn);
446 success = true;
447 } catch (const opmonlib::NonUniqueNodeName& err) {
448 ++counter;
449 }
450 } while (!success);
451 }
452}
453
454} // namespace dunedaq::iomanager
#define ERS_HERE
uint32_t get_interval_ms() const
Get "interval_ms" attribute value. Interval between publishes and polls of connectivity service infor...
const dunedaq::confmodel::Service * get_service() const
Get "service" relationship value. Connectivity service Service definition.
const std::string & get_host() const
Get "host" attribute value. Connectivity service Host.
ConnectionResponse get_preconfigured_connections(ConnectionId const &conn_id) const
void configure(const std::string &session_name, const std::vector< const confmodel::NetworkConnection * > &connections, const confmodel::ConnectivityService *conn_svc, dunedaq::opmonlib::OpMonManager &)
std::atomic< bool > m_subscriber_update_thread_running
std::unordered_map< ConnectionId, std::shared_ptr< ipm::Receiver > > m_receiver_plugins
std::shared_ptr< ipm::Receiver > create_receiver(std::vector< ConnectionInfo > connections, ConnectionId const &conn_id)
std::unordered_map< ConnectionId, std::shared_ptr< ipm::Sender > > m_sender_plugins
std::unordered_map< ConnectionId, const confmodel::NetworkConnection * > m_preconfigured_connections
std::unique_ptr< std::thread > m_subscriber_update_thread
std::set< std::string > get_datatypes(std::string const &uid) const
static NetworkManager & get()
std::shared_ptr< dunedaq::opmonlib::OpMonLink > m_sender_opmon_link
std::shared_ptr< ipm::Sender > create_sender(ConnectionInfo connection)
std::shared_ptr< ipm::Sender > get_sender(ConnectionId const &conn_id)
std::unique_ptr< ConfigClient > m_config_client
static void register_monitorable_node(std::shared_ptr< opmonlib::MonitorableObject > conn, std::shared_ptr< opmonlib::OpMonLink > link, const std::string &name, bool is_pubsub)
ConnectionResponse get_connections(ConnectionId const &conn_id, bool restrict_single=false) const
bool is_pubsub_connection(ConnectionId const &conn_id) const
static std::unique_ptr< NetworkManager > s_instance
std::chrono::milliseconds m_config_client_interval
std::shared_ptr< ipm::Receiver > get_receiver(ConnectionId const &conn_id)
std::vector< std::string > get_pubsub_connection_strings(std::vector< ConnectionInfo > const &connections)
void remove_sender(ConnectionId const &conn_id)
std::unordered_map< ConnectionId, std::shared_ptr< ipm::Subscriber > > m_subscriber_plugins
std::shared_ptr< dunedaq::opmonlib::OpMonLink > m_receiver_opmon_link
void register_node(ElementId name, NewNodePtr)
Base class for any user define issue.
Definition Issue.hpp:69
#define TLOG_DEBUG(lvl,...)
Definition Logging.hpp:112
bool is_match(ConnectionId const &search, ConnectionId const &check)
std::shared_ptr< Sender > make_ipm_sender(std::string const &plugin_name)
Definition Sender.hpp:121
std::string get_recommended_plugin_name(IpmPluginType type)
std::shared_ptr< Receiver > make_ipm_receiver(std::string const &plugin_name)
Definition Receiver.hpp:126
Unsupported std::string uri Execution of command std::string error Failed to create CommandFacility uri
Definition Issues.hpp:77
Cannot add TPSet with start_time
void error(const Issue &issue)
Definition ers.hpp:81
std::vector< ConnectionInfo > connections