Line data Source code
1 : /**
2 : * @file ConfigClient.cpp
3 : *
4 : * This is part of the DUNE DAQ Application Framework, copyright 2020.
5 : * Licensing/copyright details are in the COPYING file that you should have
6 : * received with this code.
7 : */
8 :
9 : #include "iomanager/network/ConfigClient.hpp"
10 : #include "iomanager/network/NetworkIssues.hpp"
11 :
12 : #include "logging/Logging.hpp"
13 :
14 : #include <boost/beast/http.hpp>
15 :
16 : #include <chrono>
17 : #include <cstdlib>
18 : #include <iostream>
19 : #include <string>
20 : #include <vector>
21 :
22 : using tcp = net::ip::tcp; // from <boost/asio/ip/tcp.hpp>
23 : namespace http = beast::http; // from <boost/beast/http.hpp>
24 : using nlohmann::json;
25 :
26 : using namespace dunedaq::iomanager;
27 :
28 : static constexpr int HTTP_V1_1 = 11;
29 :
30 : enum
31 : {
32 : TLVL_PUBLISH = 20,
33 : TLVL_RETRACT = 25,
34 : TLVL_RESOLVE = 30
35 : };
36 :
37 0 : ConfigClient::ConfigClient(const std::string& server,
38 : const std::string& port,
39 : const std::string& session_name,
40 0 : std::chrono::milliseconds publish_interval)
41 : {
42 0 : m_session = session_name;
43 :
44 0 : tcp::resolver resolver(m_io_context);
45 0 : m_addr = resolver.resolve(server, port);
46 0 : m_active = true;
47 0 : m_thread = std::thread([this, publish_interval]() {
48 0 : while (m_active) {
49 0 : try {
50 0 : publish();
51 0 : m_connected = true;
52 0 : TLOG_DEBUG(TLVL_PUBLISH) << "Automatic publish complete";
53 0 : } catch (ers::Issue& ex) {
54 0 : if (m_connected)
55 0 : ers::error(ex);
56 : else
57 0 : TLOG() << ex;
58 0 : } catch (std::exception& ex) {
59 0 : auto ers_ex = PublishException(ERS_HERE, ex.what());
60 0 : if (m_connected)
61 0 : ers::error(ers_ex);
62 : else
63 0 : TLOG() << ers_ex;
64 0 : }
65 0 : std::this_thread::sleep_for(publish_interval);
66 : }
67 0 : retract();
68 0 : if (!m_connected) {
69 0 : ers::error(PublishException(ERS_HERE, "Publish thread was unable to publish to Connectivity Service!"));
70 : }
71 0 : });
72 0 : }
73 :
74 0 : ConfigClient::~ConfigClient()
75 : {
76 0 : m_active = false;
77 0 : if (m_thread.joinable()) {
78 0 : m_thread.join();
79 : }
80 0 : m_connected = false;
81 0 : retract();
82 0 : }
83 :
84 : ConnectionResponse
85 0 : ConfigClient::resolve_connection(const ConnectionRequest& query, std::string session)
86 : {
87 0 : if (session == "") {
88 0 : session = m_session;
89 : }
90 0 : TLOG_DEBUG(TLVL_RESOLVE) << "Getting connections matching <" << query.uid_regex << "> in session " << session;
91 0 : std::string target = "/getconnection/" + session;
92 0 : http::request<http::string_body> req{ http::verb::post, target, HTTP_V1_1 };
93 0 : req.set(http::field::content_type, "application/json");
94 0 : nlohmann::json jquery = query;
95 0 : req.body() = jquery.dump();
96 0 : req.prepare_payload();
97 :
98 0 : http::response<http::string_body> response;
99 0 : boost::beast::tcp_stream stream(m_io_context);
100 0 : beast::error_code ec;
101 0 : try {
102 0 : stream.connect(m_addr);
103 0 : http::write(stream, req);
104 :
105 0 : boost::beast::flat_buffer buffer;
106 0 : http::read(stream, buffer, response);
107 :
108 0 : stream.socket().shutdown(tcp::socket::shutdown_both, ec); // NOLINT
109 0 : TLOG_DEBUG(TLVL_RESOLVE) << "get " << target << " response: " << response;
110 :
111 0 : if (response.result_int() != 200) {
112 0 : throw(FailedLookup(ERS_HERE, query.uid_regex, target, std::string(response.reason())));
113 : }
114 0 : } catch (ers::Issue const&) {
115 0 : m_connected = false;
116 0 : stream.socket().shutdown(tcp::socket::shutdown_both, ec); // NOLINT
117 0 : throw;
118 0 : } catch (std::exception const& ex) {
119 0 : m_connected = false;
120 0 : stream.socket().shutdown(tcp::socket::shutdown_both, ec); // NOLINT
121 0 : ers::error(FailedLookup(ERS_HERE, query.uid_regex, target, ex.what()));
122 0 : return ConnectionResponse();
123 0 : }
124 0 : m_connected = true;
125 0 : json result = json::parse(response.body());
126 0 : TLOG_DEBUG(TLVL_RESOLVE) << result.dump();
127 0 : ConnectionResponse res;
128 0 : for (auto const& item : result) {
129 0 : res.connections.emplace_back(item.get<ConnectionInfo>());
130 : }
131 0 : return res;
132 0 : }
133 :
134 : void
135 0 : ConfigClient::publish(ConnectionRegistration const& connection)
136 : {
137 0 : {
138 0 : std::lock_guard<std::mutex> lock(m_mutex);
139 0 : TLOG_DEBUG(TLVL_PUBLISH) << "Adding connection with UID " << connection.uid << " and URI " << connection.uri
140 0 : << " to publish list";
141 :
142 0 : m_registered_connections.insert(connection);
143 0 : }
144 0 : }
145 :
146 : void
147 0 : ConfigClient::publish(const std::vector<ConnectionRegistration>& connections)
148 : {
149 0 : {
150 0 : std::lock_guard<std::mutex> lock(m_mutex);
151 0 : for (auto& entry : connections) {
152 0 : TLOG_DEBUG(TLVL_PUBLISH) << "Adding connection with UID " << entry.uid << " and URI " << entry.uri
153 0 : << " to publish list";
154 :
155 0 : m_registered_connections.insert(entry);
156 : }
157 0 : }
158 0 : }
159 :
160 : void
161 0 : ConfigClient::publish()
162 : {
163 0 : json content{ { "partition", m_session } };
164 0 : json connections = json::array();
165 0 : {
166 0 : std::lock_guard<std::mutex> lock(m_mutex);
167 0 : for (auto& entry : m_registered_connections) {
168 0 : json item = entry;
169 0 : connections.push_back(item);
170 0 : }
171 0 : if (connections.size() == 0) {
172 0 : return;
173 : }
174 0 : }
175 0 : content["connections"] = connections;
176 0 : http::request<http::string_body> req{ http::verb::post, "/publish", HTTP_V1_1 };
177 0 : req.set(http::field::content_type, "application/json");
178 0 : req.body() = content.dump();
179 0 : req.prepare_payload();
180 :
181 0 : boost::beast::tcp_stream stream(m_io_context);
182 0 : beast::error_code ec;
183 0 : try {
184 0 : stream.connect(m_addr);
185 0 : http::write(stream, req);
186 :
187 0 : http::response<http::string_body> response;
188 0 : boost::beast::flat_buffer buffer;
189 0 : http::read(stream, buffer, response);
190 0 : stream.socket().shutdown(tcp::socket::shutdown_both, ec); // NOLINT
191 0 : if (response.result_int() != 200) {
192 0 : throw(FailedPublish(ERS_HERE, std::string(response.reason())));
193 : }
194 0 : } catch (ers::Issue const&) {
195 0 : m_connected = false;
196 0 : stream.socket().shutdown(tcp::socket::shutdown_both, ec); // NOLINT
197 0 : throw;
198 0 : } catch (std::exception const& ex) {
199 0 : m_connected = false;
200 0 : stream.socket().shutdown(tcp::socket::shutdown_both, ec); // NOLINT
201 0 : throw(FailedPublish(ERS_HERE, ex.what(), ex));
202 0 : }
203 0 : m_connected = true;
204 0 : }
205 :
206 : void
207 0 : ConfigClient::retract()
208 : {
209 0 : TLOG_DEBUG(TLVL_RETRACT) << "retract() called, getting connection information";
210 0 : json connections = json::array();
211 0 : {
212 0 : std::lock_guard<std::mutex> lock(m_mutex);
213 0 : for (auto& con : m_registered_connections) {
214 0 : json item;
215 0 : item["connection_id"] = con.uid;
216 0 : item["data_type"] = con.data_type;
217 0 : connections.push_back(item);
218 0 : }
219 0 : m_registered_connections.clear();
220 0 : }
221 0 : if (connections.size() > 0) {
222 0 : TLOG_DEBUG(TLVL_RETRACT) << "retract(): Retracting " << connections.size() << " connections";
223 0 : http::request<http::string_body> req{ http::verb::post, "/retract", HTTP_V1_1 };
224 0 : req.set(http::field::content_type, "application/json");
225 0 : json body{ { "partition", m_session } };
226 0 : body["connections"] = connections;
227 0 : req.body() = body.dump();
228 0 : req.prepare_payload();
229 :
230 0 : boost::beast::tcp_stream stream(m_io_context);
231 0 : beast::error_code ec;
232 0 : try {
233 0 : stream.connect(m_addr);
234 0 : http::write(stream, req);
235 0 : http::response<http::string_body> response;
236 0 : boost::beast::flat_buffer buffer;
237 0 : http::read(stream, buffer, response);
238 0 : stream.socket().shutdown(tcp::socket::shutdown_both, ec); // NOLINT
239 0 : if (response.result_int() != 200) {
240 0 : throw(FailedRetract(ERS_HERE, "connection Id vector", std::string(response.reason())));
241 : }
242 0 : } catch (ers::Issue const&) {
243 0 : m_connected = false;
244 0 : stream.socket().shutdown(tcp::socket::shutdown_both, ec); // NOLINT
245 0 : throw;
246 0 : } catch (std::exception const& ex) {
247 0 : m_connected = false;
248 0 : stream.socket().shutdown(tcp::socket::shutdown_both, ec); // NOLINT
249 0 : ers::error(FailedRetract(ERS_HERE, "connection Id vector", ex.what()));
250 0 : }
251 0 : m_connected = true;
252 0 : }
253 0 : }
254 :
255 : void
256 0 : ConfigClient::retract(const ConnectionId& connectionId)
257 : {
258 0 : retract(std::vector<ConnectionId>{ connectionId });
259 0 : }
260 :
261 : void
262 0 : ConfigClient::retract(const std::vector<ConnectionId>& connectionIds)
263 : {
264 0 : http::request<http::string_body> req{ http::verb::post, "/retract", HTTP_V1_1 };
265 0 : req.set(http::field::content_type, "application/json");
266 :
267 0 : json connections = json::array();
268 0 : {
269 0 : std::lock_guard<std::mutex> lock(m_mutex);
270 0 : for (auto& con : connectionIds) {
271 0 : auto reg_it = m_registered_connections.begin();
272 0 : for (; reg_it != m_registered_connections.end(); ++reg_it) {
273 0 : if (con.uid == reg_it->uid && con.data_type == reg_it->data_type)
274 : break;
275 : }
276 0 : if (reg_it == m_registered_connections.end()) {
277 0 : ers::error(
278 0 : FailedRetract(ERS_HERE, con.uid + " of type " + con.data_type, "not in registered connections list"));
279 : } else {
280 0 : json item;
281 0 : item["connection_id"] = con.uid;
282 0 : item["data_type"] = con.data_type;
283 0 : connections.push_back(item);
284 0 : m_registered_connections.erase(reg_it);
285 0 : }
286 : }
287 0 : }
288 0 : json body{ { "partition", m_session } };
289 0 : body["connections"] = connections;
290 0 : req.body() = body.dump();
291 0 : req.prepare_payload();
292 :
293 0 : boost::beast::tcp_stream stream(m_io_context);
294 0 : beast::error_code ec;
295 0 : try {
296 0 : stream.connect(m_addr);
297 0 : http::write(stream, req);
298 0 : http::response<http::string_body> response;
299 0 : boost::beast::flat_buffer buffer;
300 0 : http::read(stream, buffer, response);
301 0 : stream.socket().shutdown(tcp::socket::shutdown_both, ec); // NOLINT
302 0 : if (response.result_int() != 200) {
303 0 : throw(FailedRetract(ERS_HERE, "connection Id vector", std::string(response.reason())));
304 : }
305 0 : } catch (ers::Issue const&) {
306 0 : m_connected = false;
307 0 : stream.socket().shutdown(tcp::socket::shutdown_both, ec); // NOLINT
308 0 : throw;
309 0 : } catch (std::exception const& ex) {
310 0 : m_connected = false;
311 0 : stream.socket().shutdown(tcp::socket::shutdown_both, ec); // NOLINT
312 0 : ers::error(FailedRetract(ERS_HERE, "connection Id vector", ex.what()));
313 0 : }
314 0 : m_connected = true;
315 0 : }
|