DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
NetworkManager.cpp
Go to the documentation of this file.
1
11
12#include "ipm/PluginInfo.hpp"
13#include "logging/Logging.hpp"
15
17#include "confmodel/Service.hpp"
18
19#include <map>
20#include <memory>
21#include <string>
22#include <vector>
23
24#include <fmt/format.h>
25
26namespace dunedaq::iomanager {
27
28std::unique_ptr<NetworkManager> NetworkManager::s_instance = nullptr;
29
30NetworkManager&
32{
33 if (!s_instance) {
34 s_instance.reset(new NetworkManager());
35 }
36 return *s_instance;
37}
38
39void
40NetworkManager::configure(const std::string& session_name,
41 const std::vector<const confmodel::NetworkConnection*>& connections,
42 const confmodel::ConnectivityService* conn_svc,
44{
45 if (!m_preconfigured_connections.empty()) {
46 throw AlreadyConfigured(ERS_HERE);
47 }
48
49 for (auto& connection : connections) {
50 auto name = connection->UID();
51 TLOG_DEBUG(15) << "Adding connection " << name << " to connection map";
52 ConnectionId id(connection);
53 if (m_preconfigured_connections.count(id)) {
54 TLOG_DEBUG(15) << "Name collision for connection " << name << ", DT " << connection->get_data_type()
55 << " connection_map.count: " << m_preconfigured_connections.count(id);
56 reset();
57 throw NameCollision(ERS_HERE, connection->UID());
58 }
59 m_preconfigured_connections[id] = connection;
60 }
61
62 if (conn_svc != nullptr) {
63
64 auto service = conn_svc->get_service();
65
66 auto connectionServer = conn_svc->get_host();
67 auto connectionPort = service->get_port();
68 auto config_client_interval = std::chrono::milliseconds(conn_svc->get_interval_ms());
69
70 TLOG_DEBUG(17) << "ConnectionServer host and port are " << connectionServer << ":" << connectionPort;
71 if (m_config_client == nullptr) {
73 std::make_unique<ConfigClient>(connectionServer, std::to_string(connectionPort),session_name, config_client_interval);
74 }
75 m_config_client_interval = config_client_interval;
76 }
77
78 opmgr.register_node("senders", m_sender_opmon_link);
79 opmgr.register_node("receivers", m_receiver_opmon_link);
80}
81
82void
84{
85 TLOG_DEBUG(5) << "reset() BEGIN";
89 }
90 {
91 std::lock_guard<std::mutex> lkk(m_subscriber_plugin_map_mutex);
93 }
94 {
95 std::lock_guard<std::mutex> lk(m_sender_plugin_map_mutex);
96 m_sender_plugins.clear();
97 }
98 {
99 std::lock_guard<std::mutex> lk(m_receiver_plugin_map_mutex);
100 m_receiver_plugins.clear();
101 }
102
104 if (m_config_client != nullptr) {
105 try {
106 m_config_client->retract();
107 } catch (FailedRetract const& r) {
108 ers::error(r);
109 }
110 }
111 m_config_client.reset(nullptr);
112
113 m_sender_opmon_link = std::make_shared<dunedaq::opmonlib::OpMonLink>();
114 m_receiver_opmon_link = std::make_shared<dunedaq::opmonlib::OpMonLink>();
115 TLOG_DEBUG(5) << "reset() END";
116}
117
118void
120{
121 TLOG_DEBUG(5) << "shutdown() BEGIN";
125 }
126 {
127 std::lock_guard<std::mutex> lkk(m_subscriber_plugin_map_mutex);
128 m_subscriber_plugins.clear();
129 }
130 {
131 std::lock_guard<std::mutex> lk(m_sender_plugin_map_mutex);
132 m_sender_plugins.clear();
133 }
134 {
135 std::lock_guard<std::mutex> lk(m_receiver_plugin_map_mutex);
136 m_receiver_plugins.clear();
137 }
138
139 if (m_config_client != nullptr) {
140 try {
141 m_config_client->retract();
142 } catch (FailedRetract const& r) {
143 ers::error(r);
144 }
145 }
146 TLOG_DEBUG(5) << "shutdown() END";
147}
148
149std::shared_ptr<ipm::Receiver>
151{
152 TLOG_DEBUG(9) << "Getting receiver for connection " << conn_id.uid;
153
154 std::lock_guard<std::mutex> lk(m_receiver_plugin_map_mutex);
155 if (!m_receiver_plugins.count(conn_id) || m_receiver_plugins.at(conn_id) == nullptr) {
156
157 auto response = get_connections(conn_id);
158
159 TLOG_DEBUG(9) << "Creating receiver for connection " << conn_id.uid;
160 auto receiver = create_receiver(response.connections, conn_id);
161
162 m_receiver_plugins[conn_id] = receiver;
163 }
164
165 return m_receiver_plugins[conn_id];
166}
167
168std::shared_ptr<ipm::Sender>
170{
171 TLOG_DEBUG(10) << "Getting sender for connection " << conn_id.uid;
172
173 std::lock_guard<std::mutex> lk(m_sender_plugin_map_mutex);
174
175 if (!m_sender_plugins.count(conn_id) || m_sender_plugins.at(conn_id) == nullptr) {
176 auto response = get_connections(conn_id, true);
177
178 TLOG_DEBUG(10) << "Creating sender for connection " << conn_id.uid;
179 auto sender = create_sender(response.connections[0]);
180 m_sender_plugins[conn_id] = sender;
181 }
182
183 if (m_sender_plugins.count(conn_id)) {
184 return m_sender_plugins[conn_id];
185 }
186
187 return nullptr;
188}
189
190void
192{
193 TLOG_DEBUG(10) << "Removing sender for connection " << conn_id.uid;
194
195 std::lock_guard<std::mutex> lk(m_sender_plugin_map_mutex);
196 m_sender_plugins.erase(conn_id);
197}
198
199
200bool
202{
203 auto response = get_connections(conn_id);
204 bool is_pubsub = response.connections[0].connection_type == ConnectionType::kPubSub;
205
206 // TLOG() << "Returning " << std::boolalpha << is_pubsub << " for request " << request;
207 return is_pubsub;
208}
209
211NetworkManager::get_connections(ConnectionId const& conn_id, bool restrict_single) const
212{
213 auto response = get_preconfigured_connections(conn_id);
214 if (restrict_single && response.connections.size() > 1) {
215 throw NameCollision(ERS_HERE, conn_id.uid);
216 }
217 if (m_config_client != nullptr) {
218 auto start_time = std::chrono::steady_clock::now();
219 while (
220 std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - start_time).count() <
221 1000) {
222 try {
223 auto client_response = m_config_client->resolveConnection(conn_id, conn_id.session);
224 if (restrict_single && client_response.connections.size() > 1) {
225 throw NameCollision(ERS_HERE, conn_id.uid);
226 }
227
228 if (client_response.connections.size() > 0) {
229 response = client_response;
230 }
231 break;
232 } catch (FailedLookup const& lf) {
233 if (m_config_client->is_connected()) {
234 throw ConnectionNotFound(ERS_HERE, conn_id.uid, conn_id.data_type, lf);
235 }
236 usleep(1000);
237 }
238 }
239 }
240
241 if (response.connections.size() == 0) {
242 throw ConnectionNotFound(ERS_HERE, conn_id.uid, conn_id.data_type);
243 }
244
245 return response;
246}
247
250{
251 ConnectionResponse matching_connections;
252 for (auto& conn : m_preconfigured_connections) {
253 if (is_match(conn_id, conn.first)) {
254 matching_connections.connections.push_back(conn.second);
255 }
256 }
257
258 return matching_connections;
259}
260
261std::set<std::string>
262NetworkManager::get_datatypes(std::string const& uid) const
263{
264 std::set<std::string> output;
265 for (auto& conn : m_preconfigured_connections) {
266 if (conn.second->UID() == uid)
267 output.insert(conn.second->get_data_type());
268 }
269
270 return output;
271}
272
273std::vector<std::string>
274NetworkManager::get_pubsub_connection_strings(std::vector<ConnectionInfo> const& connections)
275{
276 std::vector<std::string> uris;
277 for (auto& conn : connections) {
278 // Check for case where both ends are in app and ConnectivityService hasn't received other end yet, or ConnectivityService was no longer available
279 if (conn.uri.find("*") != std::string::npos || conn.uri.find("0.0.0.0") != std::string::npos) {
280 continue;
281 }
282 uris.push_back(conn.uri);
283 }
284 return uris;
285}
286
287std::shared_ptr<ipm::Receiver>
288NetworkManager::create_receiver(std::vector<ConnectionInfo> connections, ConnectionId const& conn_id)
289{
290 TLOG_DEBUG(12) << "START";
291 if (connections.size() == 0) {
292 return nullptr;
293 }
294
295 bool is_pubsub = connections[0].connection_type == ConnectionType::kPubSub;
296 if (connections.size() > 1 && !is_pubsub) {
297 throw OperationFailed(ERS_HERE,
298 "Trying to configure a kSendRecv receiver with multiple Connections is not allowed!");
299 }
300
301 auto plugin_type =
303
304 TLOG_DEBUG(12) << "Creating plugin of type " << plugin_type;
305 auto plugin = dunedaq::ipm::make_ipm_receiver(plugin_type);
306
307 nlohmann::json config_json;
308 if (is_pubsub) {
309 std::vector<std::string> uris = get_pubsub_connection_strings(connections);
310 if (uris.size() == 0) {
311 return nullptr;
312 }
313 config_json["connection_strings"] = uris;
314 } else {
315 config_json["connection_string"] = connections[0].uri;
316 }
317 auto newCs = plugin->connect_for_receives(config_json);
318 TLOG_DEBUG(12) << "Receiver reports connected to URI " << newCs;
319
320 // Replace with resolved if there are wildcards (host and/or port)
321 if (connections[0].uri.find("*") != std::string::npos || connections[0].uri.find("0.0.0.0") != std::string::npos) {
322 TLOG_DEBUG(14) << "Wildcard found in connection URI " << connections[0].uri << ", adjusting before publish";
323 auto newUri = utilities::parse_connection_string(newCs);
324 auto oldUri = utilities::parse_connection_string(connections[0].uri);
325
326 if (oldUri.port == "*")
327 oldUri.port = newUri.port;
328 if (oldUri.host == "*" || oldUri.host == "0.0.0.0")
329 oldUri.host = newUri.host;
330
331 connections[0].uri = oldUri.to_string();
332 TLOG_DEBUG(14) << "Connection URI is now " << connections[0].uri;
333 }
334
335 if (is_pubsub) {
336 TLOG_DEBUG(12) << "Subscribing to topic " << connections[0].data_type << " after connect_for_receives";
337 auto subscriber = std::dynamic_pointer_cast<ipm::Subscriber>(plugin);
338 subscriber->subscribe(connections[0].data_type);
339 std::lock_guard<std::mutex> lkk(m_subscriber_plugin_map_mutex);
340 m_subscriber_plugins[conn_id] = subscriber;
344 }
345 }
346
347 if (m_config_client != nullptr && !is_pubsub) {
348 m_config_client->publish(connections[0]);
349 }
350
351 register_monitorable_node(plugin, m_receiver_opmon_link, conn_id.uid, is_pubsub);
352
353 TLOG_DEBUG(12) << "END";
354 return plugin;
355}
356
357std::shared_ptr<ipm::Sender>
359{
360 auto is_pubsub = connection.connection_type == ConnectionType::kPubSub;
361 auto plugin_type =
363
364 // Check for case where both ends are in app and ConnectivityService hasn't received other end yet
365 if (!is_pubsub &&
366 (connection.uri.find("*") != std::string::npos || connection.uri.find("0.0.0.0") != std::string::npos)) {
367 return nullptr;
368 }
369
370 TLOG_DEBUG(11) << "Creating sender plugin of type " << plugin_type;
371 auto plugin = dunedaq::ipm::make_ipm_sender(plugin_type);
372 TLOG_DEBUG(11) << "Connecting sender plugin to " << connection.uri;
373 auto newCs = plugin->connect_for_sends({ { "connection_string", connection.uri } });
374 TLOG_DEBUG(11) << "Sender Plugin connected, reports URI " << newCs;
375
376 // Replace with resolved if there are wildcards (host and/or port)
377 if (connection.uri.find("*") != std::string::npos || connection.uri.find("0.0.0.0") != std::string::npos) {
378 TLOG_DEBUG(13) << "Wildcard found in connection URI " << connection.uri << ", adjusting before publish";
379 auto newUri = utilities::parse_connection_string(newCs);
380 auto oldUri = utilities::parse_connection_string(connection.uri);
381
382 if (oldUri.port == "*")
383 oldUri.port = newUri.port;
384 if (oldUri.host == "*" || oldUri.host == "0.0.0.0")
385 oldUri.host = newUri.host;
386
387 connection.uri = oldUri.to_string();
388 TLOG_DEBUG(13) << "Connection URI is now " << connection.uri;
389 }
390
391 if (m_config_client != nullptr && is_pubsub) {
392 m_config_client->publish(connection);
393 }
394
395 register_monitorable_node(plugin, m_sender_opmon_link, connection.uid, is_pubsub);
396
397 return plugin;
398}
399
400void
402{
404 {
405 TLOG_DEBUG(14) << "Updating registered subscribers";
406 std::lock_guard<std::mutex> lk(m_subscriber_plugin_map_mutex);
407 for (auto& subscriber_pair : m_subscriber_plugins) {
408 try {
409 auto response = get_connections(subscriber_pair.first, false);
410
411 nlohmann::json config_json;
412 std::vector<std::string> uris = get_pubsub_connection_strings(response.connections);
413 if (uris.size() == 0) {
414 TLOG_DEBUG(14) << "No valid connection strings found, is the Connectivity Service running?!";
415 continue;
416 }
417 config_json["connection_strings"] = uris;
418
419 subscriber_pair.second->connect_for_receives(config_json);
420 } catch (ers::Issue&) {
421 }
422 }
423 }
424 std::this_thread::sleep_for(m_config_client_interval);
425 }
426}
427
428void
429NetworkManager::register_monitorable_node(std::shared_ptr<opmonlib::MonitorableObject> conn,
430 std::shared_ptr<opmonlib::OpMonLink> link,
431 const std::string& name,
432 bool /*is_pubsub*/)
433{
434
435 try {
436 link->register_node(name, conn);
437 } catch (const opmonlib::NonUniqueNodeName& err) {
438 bool success = false;
439 size_t counter = 1;
440 do {
441 auto fname = fmt::format("{}--{}", name, counter);
442 try {
443 link->register_node(fname, conn);
444 success = true;
445 } catch (const opmonlib::NonUniqueNodeName& err) {
446 ++counter;
447 }
448 } while (!success);
449 }
450}
451
452} // namespace dunedaq::iomanager
#define ERS_HERE
uint32_t get_interval_ms() const
Get "interval_ms" attribute value. Interval between publishes and polls of connectivity service infor...
const dunedaq::confmodel::Service * get_service() const
Get "service" relationship value. Connectivity service Service definition.
const std::string & get_host() const
Get "host" attribute value. Connectivity service Host.
ConnectionResponse get_preconfigured_connections(ConnectionId const &conn_id) const
void configure(const std::string &session_name, const std::vector< const confmodel::NetworkConnection * > &connections, const confmodel::ConnectivityService *conn_svc, dunedaq::opmonlib::OpMonManager &)
std::atomic< bool > m_subscriber_update_thread_running
std::unordered_map< ConnectionId, std::shared_ptr< ipm::Receiver > > m_receiver_plugins
std::shared_ptr< ipm::Receiver > create_receiver(std::vector< ConnectionInfo > connections, ConnectionId const &conn_id)
std::unordered_map< ConnectionId, std::shared_ptr< ipm::Sender > > m_sender_plugins
std::unordered_map< ConnectionId, const confmodel::NetworkConnection * > m_preconfigured_connections
std::unique_ptr< std::thread > m_subscriber_update_thread
std::set< std::string > get_datatypes(std::string const &uid) const
static NetworkManager & get()
std::shared_ptr< dunedaq::opmonlib::OpMonLink > m_sender_opmon_link
std::shared_ptr< ipm::Sender > create_sender(ConnectionInfo connection)
std::shared_ptr< ipm::Sender > get_sender(ConnectionId const &conn_id)
std::unique_ptr< ConfigClient > m_config_client
static void register_monitorable_node(std::shared_ptr< opmonlib::MonitorableObject > conn, std::shared_ptr< opmonlib::OpMonLink > link, const std::string &name, bool is_pubsub)
ConnectionResponse get_connections(ConnectionId const &conn_id, bool restrict_single=false) const
bool is_pubsub_connection(ConnectionId const &conn_id) const
static std::unique_ptr< NetworkManager > s_instance
std::chrono::milliseconds m_config_client_interval
std::shared_ptr< ipm::Receiver > get_receiver(ConnectionId const &conn_id)
std::vector< std::string > get_pubsub_connection_strings(std::vector< ConnectionInfo > const &connections)
void remove_sender(ConnectionId const &conn_id)
std::unordered_map< ConnectionId, std::shared_ptr< ipm::Subscriber > > m_subscriber_plugins
std::shared_ptr< dunedaq::opmonlib::OpMonLink > m_receiver_opmon_link
void register_node(ElementId name, NewNodePtr)
Base class for any user define issue.
Definition Issue.hpp:69
#define TLOG_DEBUG(lvl,...)
Definition Logging.hpp:112
bool is_match(ConnectionId const &search, ConnectionId const &check)
std::shared_ptr< Sender > make_ipm_sender(std::string const &plugin_name)
Definition Sender.hpp:119
std::string get_recommended_plugin_name(IpmPluginType type)
std::shared_ptr< Receiver > make_ipm_receiver(std::string const &plugin_name)
Definition Receiver.hpp:124
ZmqUri parse_connection_string(std::string connection_string)
Definition Resolver.cpp:72
Unsupported std::string uri Execution of command std::string error Failed to create CommandFacility uri
Definition Issues.hpp:77
Cannot add TPSet with start_time
void error(const Issue &issue)
Definition ers.hpp:81
std::vector< ConnectionInfo > connections