Line data Source code
1 : /**
2 : * @file NetworkManager.hpp NetworkManager Class implementations
3 : *
4 : * This is part of the DUNE DAQ Application Framework, copyright 2020.
5 : * Licensing/copyright details are in the COPYING file that you should have
6 : * received with this code.
7 : */
8 :
9 : #include "iomanager/network/NetworkManager.hpp"
10 : #include "iomanager/SchemaUtils.hpp"
11 :
12 : #include "ipm/PluginInfo.hpp"
13 : #include "logging/Logging.hpp"
14 : #include "utilities/ZmqUri.hpp"
15 :
16 : #include "confmodel/PhysicalHost.hpp"
17 : #include "confmodel/Service.hpp"
18 :
19 : #include <map>
20 : #include <memory>
21 : #include <set>
22 : #include <string>
23 : #include <vector>
24 :
25 : #include <fmt/format.h>
26 :
27 : namespace dunedaq::iomanager {
28 :
29 : std::unique_ptr<NetworkManager> NetworkManager::s_instance = nullptr;
30 :
31 : NetworkManager&
32 2358 : NetworkManager::get()
33 : {
34 2358 : if (!s_instance) {
35 10 : s_instance.reset(new NetworkManager());
36 : }
37 2358 : return *s_instance;
38 : }
39 :
40 : void
41 57 : NetworkManager::configure(const std::string& session_name,
42 : const std::vector<const confmodel::NetworkConnection*>& connections,
43 : const confmodel::ConnectivityService* conn_svc,
44 : dunedaq::opmonlib::OpMonManager& opmgr)
45 : {
46 57 : if (!m_preconfigured_connections.empty()) {
47 1 : throw AlreadyConfigured(ERS_HERE);
48 : }
49 :
50 200 : for (auto& connection : connections) {
51 144 : auto name = connection->UID();
52 144 : TLOG_DEBUG(15) << "Adding connection " << name << " to connection map";
53 144 : ConnectionId id(connection);
54 144 : if (m_preconfigured_connections.count(id)) {
55 0 : TLOG_DEBUG(15) << "Name collision for connection " << name << ", DT " << connection->get_data_type()
56 0 : << " connection_map.count: " << m_preconfigured_connections.count(id);
57 0 : reset();
58 0 : throw NameCollision(ERS_HERE, connection->UID());
59 : }
60 144 : m_preconfigured_connections[id] = connection;
61 144 : }
62 :
63 56 : if (conn_svc != nullptr) {
64 :
65 0 : auto service = conn_svc->get_service();
66 :
67 0 : auto connectionServer = conn_svc->get_host();
68 0 : auto connectionPort = service->get_port();
69 0 : auto config_client_interval = std::chrono::milliseconds(conn_svc->get_interval_ms());
70 :
71 0 : TLOG_DEBUG(17) << "ConnectionServer host and port are " << connectionServer << ":" << connectionPort;
72 0 : if (m_config_client == nullptr) {
73 0 : m_config_client = std::make_unique<ConfigClient>(
74 0 : connectionServer, std::to_string(connectionPort), session_name, config_client_interval);
75 : }
76 0 : m_config_client_interval = config_client_interval;
77 0 : }
78 :
79 56 : opmgr.register_node("senders", m_sender_opmon_link);
80 56 : opmgr.register_node("receivers", m_receiver_opmon_link);
81 56 : }
82 :
83 : void
84 79 : NetworkManager::reset()
85 : {
86 79 : TLOG_DEBUG(5) << "reset() BEGIN";
87 79 : m_subscriber_update_thread_running = false;
88 79 : if (m_subscriber_update_thread && m_subscriber_update_thread->joinable()) {
89 0 : m_subscriber_update_thread->join();
90 : }
91 79 : {
92 79 : std::lock_guard<std::mutex> lkk(m_subscriber_plugin_map_mutex);
93 79 : m_subscriber_plugins.clear();
94 79 : }
95 79 : {
96 79 : std::lock_guard<std::mutex> lk(m_sender_plugin_map_mutex);
97 79 : m_sender_plugins.clear();
98 79 : }
99 79 : {
100 79 : std::lock_guard<std::mutex> lk(m_receiver_plugin_map_mutex);
101 79 : m_receiver_plugins.clear();
102 79 : }
103 :
104 79 : m_preconfigured_connections.clear();
105 79 : if (m_config_client != nullptr) {
106 0 : try {
107 0 : m_config_client->retract();
108 0 : } catch (FailedRetract const& r) {
109 0 : ers::error(r);
110 0 : }
111 : }
112 79 : m_config_client.reset(nullptr);
113 :
114 79 : m_sender_opmon_link = std::make_shared<dunedaq::opmonlib::OpMonLink>();
115 79 : m_receiver_opmon_link = std::make_shared<dunedaq::opmonlib::OpMonLink>();
116 79 : TLOG_DEBUG(5) << "reset() END";
117 79 : }
118 :
119 : void
120 0 : NetworkManager::shutdown()
121 : {
122 0 : TLOG_DEBUG(5) << "shutdown() BEGIN";
123 0 : m_subscriber_update_thread_running = false;
124 0 : if (m_subscriber_update_thread && m_subscriber_update_thread->joinable()) {
125 0 : m_subscriber_update_thread->join();
126 : }
127 0 : {
128 0 : std::lock_guard<std::mutex> lkk(m_subscriber_plugin_map_mutex);
129 0 : m_subscriber_plugins.clear();
130 0 : }
131 0 : {
132 0 : std::lock_guard<std::mutex> lk(m_sender_plugin_map_mutex);
133 0 : m_sender_plugins.clear();
134 0 : }
135 0 : {
136 0 : std::lock_guard<std::mutex> lk(m_receiver_plugin_map_mutex);
137 0 : m_receiver_plugins.clear();
138 0 : }
139 :
140 0 : if (m_config_client != nullptr) {
141 0 : try {
142 0 : m_config_client->retract();
143 0 : } catch (FailedRetract const& r) {
144 0 : ers::error(r);
145 0 : }
146 : }
147 0 : TLOG_DEBUG(5) << "shutdown() END";
148 0 : }
149 :
150 : std::shared_ptr<ipm::Receiver>
151 226 : NetworkManager::get_receiver(ConnectionId const& conn_id)
152 : {
153 226 : TLOG_DEBUG(9) << "Getting receiver for connection " << conn_id.uid;
154 :
155 226 : std::lock_guard<std::mutex> lk(m_receiver_plugin_map_mutex);
156 226 : if (!m_receiver_plugins.count(conn_id) || m_receiver_plugins.at(conn_id) == nullptr) {
157 :
158 226 : auto response = get_connections(conn_id);
159 :
160 28 : TLOG_DEBUG(9) << "Creating receiver for connection " << conn_id.uid;
161 28 : auto receiver = create_receiver(response.connections, conn_id);
162 :
163 28 : m_receiver_plugins[conn_id] = receiver;
164 28 : }
165 :
166 56 : return m_receiver_plugins[conn_id];
167 226 : }
168 :
169 : std::shared_ptr<ipm::Sender>
170 1955 : NetworkManager::get_sender(ConnectionId const& conn_id)
171 : {
172 1955 : TLOG_DEBUG(10) << "Getting sender for connection " << conn_id.uid;
173 :
174 1955 : std::lock_guard<std::mutex> lk(m_sender_plugin_map_mutex);
175 :
176 1955 : if (!m_sender_plugins.count(conn_id) || m_sender_plugins.at(conn_id) == nullptr) {
177 1955 : auto response = get_connections(conn_id, true);
178 :
179 28 : TLOG_DEBUG(10) << "Creating sender for connection " << conn_id.uid;
180 28 : auto sender = create_sender(response.connections[0]);
181 28 : m_sender_plugins[conn_id] = sender;
182 28 : }
183 :
184 28 : if (m_sender_plugins.count(conn_id)) {
185 28 : return m_sender_plugins[conn_id];
186 : }
187 :
188 0 : return nullptr;
189 1955 : }
190 :
191 : void
192 1 : NetworkManager::remove_sender(ConnectionId const& conn_id)
193 : {
194 1 : TLOG_DEBUG(10) << "Removing sender for connection " << conn_id.uid;
195 :
196 1 : std::lock_guard<std::mutex> lk(m_sender_plugin_map_mutex);
197 1 : m_sender_plugins.erase(conn_id);
198 1 : }
199 :
200 : bool
201 35 : NetworkManager::is_pubsub_connection(ConnectionId const& conn_id) const
202 : {
203 35 : auto response = get_connections(conn_id);
204 34 : bool is_pubsub = response.connections[0].connection_type == ConnectionType::kPubSub;
205 :
206 : // TLOG() << "Returning " << std::boolalpha << is_pubsub << " for request " << request;
207 68 : return is_pubsub;
208 34 : }
209 :
210 : ConnectionResponse
211 2216 : NetworkManager::get_connections(ConnectionId const& conn_id, bool restrict_single) const
212 : {
213 2216 : auto response = get_preconfigured_connections(conn_id);
214 2216 : if (restrict_single && response.connections.size() > 1) {
215 0 : throw NameCollision(ERS_HERE, conn_id.uid);
216 : }
217 2216 : if (m_config_client != nullptr) {
218 0 : auto start_time = std::chrono::steady_clock::now();
219 0 : while (
220 0 : std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - start_time).count() <
221 : 1000) {
222 0 : try {
223 0 : auto client_response = m_config_client->resolve_connection(conn_id, conn_id.session);
224 0 : if (restrict_single && client_response.connections.size() > 1) {
225 0 : throw NameCollision(ERS_HERE, conn_id.uid);
226 : }
227 :
228 0 : if (client_response.connections.size() > 0) {
229 0 : response = client_response;
230 : }
231 0 : break;
232 0 : } catch (FailedLookup const& lf) {
233 0 : if (m_config_client->is_connected()) {
234 0 : throw ConnectionNotFound(ERS_HERE, conn_id.uid, conn_id.data_type, lf);
235 : }
236 0 : usleep(1000);
237 0 : }
238 : }
239 : }
240 :
241 2216 : if (response.connections.size() == 0) {
242 2126 : throw ConnectionNotFound(ERS_HERE, conn_id.uid, conn_id.data_type);
243 : }
244 :
245 90 : return response;
246 2126 : }
247 :
248 : ConnectionResponse
249 2222 : NetworkManager::get_preconfigured_connections(ConnectionId const& conn_id) const
250 : {
251 2222 : ConnectionResponse matching_connections;
252 7615 : for (auto& conn : m_preconfigured_connections) {
253 5393 : if (is_match(conn_id, conn.first)) {
254 103 : matching_connections.connections.emplace_back(conn.second);
255 : }
256 : }
257 :
258 2222 : return matching_connections;
259 0 : }
260 :
261 : std::set<std::string>
262 7 : NetworkManager::get_datatypes(std::string const& uid) const
263 : {
264 7 : std::set<std::string> output;
265 39 : for (auto& conn : m_preconfigured_connections) {
266 32 : if (conn.second->UID() == uid)
267 4 : output.insert(conn.second->get_data_type());
268 : }
269 :
270 7 : return output;
271 0 : }
272 :
273 : std::vector<std::string>
274 7 : NetworkManager::get_pubsub_connection_strings(std::vector<ConnectionInfo> const& connections)
275 : {
276 7 : std::vector<std::string> uris;
277 18 : for (auto& conn : connections) {
278 : // Check for case where both ends are in app and ConnectivityService hasn't received other end yet, or
279 : // ConnectivityService was no longer available
280 11 : if (conn.uri.find("*") != std::string::npos || conn.uri.find("0.0.0.0") != std::string::npos) {
281 0 : continue;
282 : }
283 11 : uris.push_back(conn.uri);
284 : }
285 7 : return uris;
286 0 : }
287 :
288 : std::shared_ptr<ipm::Receiver>
289 28 : NetworkManager::create_receiver(std::vector<ConnectionInfo> connections, ConnectionId const& conn_id)
290 : {
291 28 : TLOG_DEBUG(12) << "START";
292 28 : if (connections.size() == 0) {
293 0 : return nullptr;
294 : }
295 :
296 28 : bool is_pubsub = connections[0].connection_type == ConnectionType::kPubSub;
297 28 : if (connections.size() > 1 && !is_pubsub) {
298 0 : throw OperationFailed(ERS_HERE,
299 0 : "Trying to configure a kSendRecv receiver with multiple Connections is not allowed!");
300 : }
301 :
302 28 : auto plugin_type =
303 49 : ipm::get_recommended_plugin_name(is_pubsub ? ipm::IpmPluginType::Subscriber : ipm::IpmPluginType::Receiver);
304 :
305 28 : TLOG_DEBUG(12) << "Creating plugin of type " << plugin_type;
306 28 : auto plugin = dunedaq::ipm::make_ipm_receiver(plugin_type);
307 :
308 28 : nlohmann::json config_json;
309 28 : if (is_pubsub) {
310 7 : std::vector<std::string> uris = get_pubsub_connection_strings(connections);
311 7 : if (uris.size() == 0) {
312 0 : return nullptr;
313 : }
314 7 : config_json["connection_strings"] = uris;
315 7 : } else {
316 21 : config_json["connection_string"] = connections[0].uri;
317 : }
318 28 : auto newCs = plugin->connect_for_receives(config_json);
319 28 : TLOG_DEBUG(12) << "Receiver reports connected to URI " << newCs;
320 :
321 : // Replace with resolved if there are wildcards (host and/or port)
322 28 : if (connections[0].uri.find("*") != std::string::npos || connections[0].uri.find("0.0.0.0") != std::string::npos) {
323 0 : TLOG_DEBUG(14) << "Wildcard found in connection URI " << connections[0].uri << ", adjusting before publish";
324 0 : utilities::ZmqUri newUri(newCs);
325 0 : utilities::ZmqUri oldUri(connections[0].uri);
326 :
327 0 : if (oldUri.port == "*")
328 0 : oldUri.port = newUri.port;
329 0 : if (oldUri.host == "*" || oldUri.host == "0.0.0.0")
330 0 : oldUri.host = newUri.host;
331 :
332 0 : connections[0].uri = oldUri.to_string();
333 0 : TLOG_DEBUG(14) << "Connection URI is now " << connections[0].uri;
334 0 : }
335 :
336 28 : if (is_pubsub) {
337 7 : TLOG_DEBUG(12) << "Subscribing to topic " << connections[0].data_type << " after connect_for_receives";
338 7 : auto subscriber = std::dynamic_pointer_cast<ipm::Subscriber>(plugin);
339 7 : subscriber->subscribe(connections[0].data_type);
340 7 : std::lock_guard<std::mutex> lkk(m_subscriber_plugin_map_mutex);
341 7 : m_subscriber_plugins[conn_id] = subscriber;
342 7 : if (!m_subscriber_update_thread_running && m_config_client != nullptr) {
343 0 : m_subscriber_update_thread_running = true;
344 0 : m_subscriber_update_thread = std::make_unique<std::thread>(&NetworkManager::update_subscribers, this);
345 : }
346 7 : }
347 :
348 28 : if (m_config_client != nullptr && !is_pubsub) {
349 0 : m_config_client->publish(connections[0]);
350 : }
351 :
352 28 : register_monitorable_node(plugin, m_receiver_opmon_link, conn_id.uid, is_pubsub);
353 :
354 28 : TLOG_DEBUG(12) << "END";
355 28 : return plugin;
356 28 : }
357 :
358 : std::shared_ptr<ipm::Sender>
359 28 : NetworkManager::create_sender(ConnectionInfo connection)
360 : {
361 28 : auto is_pubsub = connection.connection_type == ConnectionType::kPubSub;
362 28 : auto plugin_type =
363 50 : ipm::get_recommended_plugin_name(is_pubsub ? ipm::IpmPluginType::Publisher : ipm::IpmPluginType::Sender);
364 :
365 : // Check for case where both ends are in app and ConnectivityService hasn't received other end yet
366 28 : if (!is_pubsub &&
367 22 : (connection.uri.find("*") != std::string::npos || connection.uri.find("0.0.0.0") != std::string::npos)) {
368 0 : return nullptr;
369 : }
370 :
371 28 : TLOG_DEBUG(11) << "Creating sender plugin of type " << plugin_type;
372 28 : auto plugin = dunedaq::ipm::make_ipm_sender(plugin_type);
373 28 : TLOG_DEBUG(11) << "Connecting sender plugin to " << connection.uri;
374 28 : auto newCs =
375 196 : plugin->connect_for_sends({ { "connection_string", connection.uri }, { "capacity", connection.capacity } });
376 28 : TLOG_DEBUG(11) << "Sender Plugin connected, reports URI " << newCs;
377 :
378 : // Replace with resolved if there are wildcards (host and/or port)
379 28 : if (connection.uri.find("*") != std::string::npos || connection.uri.find("0.0.0.0") != std::string::npos) {
380 0 : TLOG_DEBUG(13) << "Wildcard found in connection URI " << connection.uri << ", adjusting before publish";
381 0 : utilities::ZmqUri newUri(newCs);
382 0 : utilities::ZmqUri oldUri(connection.uri);
383 :
384 0 : if (oldUri.port == "*")
385 0 : oldUri.port = newUri.port;
386 0 : if (oldUri.host == "*" || oldUri.host == "0.0.0.0")
387 0 : oldUri.host = newUri.host;
388 :
389 0 : connection.uri = oldUri.to_string();
390 0 : TLOG_DEBUG(13) << "Connection URI is now " << connection.uri;
391 0 : }
392 :
393 28 : if (m_config_client != nullptr && is_pubsub) {
394 0 : m_config_client->publish(connection);
395 : }
396 :
397 28 : register_monitorable_node(plugin, m_sender_opmon_link, connection.uid, is_pubsub);
398 :
399 28 : return plugin;
400 196 : }
401 :
402 : void
403 0 : NetworkManager::update_subscribers()
404 : {
405 0 : while (m_subscriber_update_thread_running.load()) {
406 0 : {
407 0 : TLOG_DEBUG(14) << "Updating registered subscribers";
408 0 : std::lock_guard<std::mutex> lk(m_subscriber_plugin_map_mutex);
409 0 : for (auto& subscriber_pair : m_subscriber_plugins) {
410 0 : try {
411 0 : auto response = get_connections(subscriber_pair.first, false);
412 :
413 0 : nlohmann::json config_json;
414 0 : std::vector<std::string> uris = get_pubsub_connection_strings(response.connections);
415 0 : if (uris.size() == 0) {
416 0 : TLOG_DEBUG(14) << "No valid connection strings found, is the Connectivity Service running?!";
417 0 : continue;
418 0 : }
419 0 : config_json["connection_strings"] = uris;
420 :
421 0 : subscriber_pair.second->connect_for_receives(config_json);
422 0 : } catch (ers::Issue&) {
423 0 : }
424 : }
425 0 : }
426 0 : std::this_thread::sleep_for(m_config_client_interval);
427 : }
428 0 : }
429 :
430 : void
431 56 : NetworkManager::register_monitorable_node(std::shared_ptr<opmonlib::MonitorableObject> conn,
432 : std::shared_ptr<opmonlib::OpMonLink> link,
433 : const std::string& name,
434 : bool /*is_pubsub*/)
435 : {
436 :
437 56 : try {
438 58 : link->register_node(name, conn);
439 2 : } catch (const opmonlib::NonUniqueNodeName& err) {
440 2 : bool success = false;
441 2 : size_t counter = 1;
442 2 : do {
443 2 : auto fname = fmt::format("{}--{}", name, counter);
444 2 : try {
445 2 : link->register_node(fname, conn);
446 2 : success = true;
447 0 : } catch (const opmonlib::NonUniqueNodeName& err) {
448 0 : ++counter;
449 0 : }
450 2 : } while (!success);
451 2 : }
452 56 : }
453 :
454 : } // namespace dunedaq::iomanager
|