DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
ConfigClient.cpp
Go to the documentation of this file.
1
11
12#include "logging/Logging.hpp"
13
14#include <boost/beast/http.hpp>
15
16#include <chrono>
17#include <cstdlib>
18#include <iostream>
19#include <string>
20#include <vector>
21
22using tcp = net::ip::tcp; // from <boost/asio/ip/tcp.hpp>
23namespace http = beast::http; // from <boost/beast/http.hpp>
24using nlohmann::json;
25
26using namespace dunedaq::iomanager;
27
28static constexpr int HTTP_V1_1 = 11;
29
30enum
31{
34 TLVL_RESOLVE = 30
35};
36
37ConfigClient::ConfigClient(const std::string& server,
38 const std::string& port,
39 const std::string& session_name,
40 std::chrono::milliseconds publish_interval)
41{
42 m_session = session_name;
43
44 tcp::resolver resolver(m_io_context);
45 m_addr = resolver.resolve(server, port);
46 m_active = true;
47 m_thread = std::thread([this, publish_interval]() {
48 while (m_active) {
49 try {
50 publish();
51 m_connected = true;
52 TLOG_DEBUG(TLVL_PUBLISH) << "Automatic publish complete";
53 } catch (ers::Issue& ex) {
54 if (m_connected)
55 ers::error(ex);
56 else
57 TLOG() << ex;
58 } catch (std::exception& ex) {
59 auto ers_ex = PublishException(ERS_HERE, ex.what());
60 if (m_connected)
61 ers::error(ers_ex);
62 else
63 TLOG() << ers_ex;
64 }
65 std::this_thread::sleep_for(publish_interval);
66 }
67 retract();
68 if (!m_connected) {
69 ers::error(PublishException(ERS_HERE, "Publish thread was unable to publish to Connectivity Service!"));
70 }
71 });
72}
73
75{
76 m_active = false;
77 if (m_thread.joinable()) {
78 m_thread.join();
79 }
80 m_connected = false;
81 retract();
82}
83
86{
87 if (session == "") {
89 }
90 TLOG_DEBUG(TLVL_RESOLVE) << "Getting connections matching <" << query.uid_regex << "> in session " << session;
91 std::string target = "/getconnection/" + session;
92 http::request<http::string_body> req{ http::verb::post, target, HTTP_V1_1 };
93 req.set(http::field::content_type, "application/json");
94 nlohmann::json jquery = query;
95 req.body() = jquery.dump();
96 req.prepare_payload();
97
98 http::response<http::string_body> response;
99 boost::beast::tcp_stream stream(m_io_context);
100 beast::error_code ec;
101 try {
102 stream.connect(m_addr);
103 http::write(stream, req);
104
105 boost::beast::flat_buffer buffer;
106 http::read(stream, buffer, response);
107
108 stream.socket().shutdown(tcp::socket::shutdown_both, ec); // NOLINT
109 TLOG_DEBUG(TLVL_RESOLVE) << "get " << target << " response: " << response;
110
111 if (response.result_int() != 200) {
112 throw(FailedLookup(ERS_HERE, query.uid_regex, target, std::string(response.reason())));
113 }
114 } catch (ers::Issue const&) {
115 m_connected = false;
116 stream.socket().shutdown(tcp::socket::shutdown_both, ec); // NOLINT
117 throw;
118 } catch (std::exception const& ex) {
119 m_connected = false;
120 stream.socket().shutdown(tcp::socket::shutdown_both, ec); // NOLINT
121 ers::error(FailedLookup(ERS_HERE, query.uid_regex, target, ex.what()));
122 return ConnectionResponse();
123 }
124 m_connected = true;
125 json result = json::parse(response.body());
126 TLOG_DEBUG(TLVL_RESOLVE) << result.dump();
128 for (auto const& item : result) {
129 res.connections.emplace_back(item.get<ConnectionInfo>());
130 }
131 return res;
132}
133
134void
136{
137 {
138 std::lock_guard<std::mutex> lock(m_mutex);
139 TLOG_DEBUG(TLVL_PUBLISH) << "Adding connection with UID " << connection.uid << " and URI " << connection.uri
140 << " to publish list";
141
142 m_registered_connections.insert(connection);
143 }
144}
145
146void
147ConfigClient::publish(const std::vector<ConnectionRegistration>& connections)
148{
149 {
150 std::lock_guard<std::mutex> lock(m_mutex);
151 for (auto& entry : connections) {
152 TLOG_DEBUG(TLVL_PUBLISH) << "Adding connection with UID " << entry.uid << " and URI " << entry.uri
153 << " to publish list";
154
155 m_registered_connections.insert(entry);
156 }
157 }
158}
159
160void
162{
163 json content{ { "partition", m_session } };
164 json connections = json::array();
165 {
166 std::lock_guard<std::mutex> lock(m_mutex);
167 for (auto& entry : m_registered_connections) {
168 json item = entry;
169 connections.push_back(item);
170 }
171 if (connections.size() == 0) {
172 return;
173 }
174 }
175 content["connections"] = connections;
176 http::request<http::string_body> req{ http::verb::post, "/publish", HTTP_V1_1 };
177 req.set(http::field::content_type, "application/json");
178 req.body() = content.dump();
179 req.prepare_payload();
180
181 boost::beast::tcp_stream stream(m_io_context);
182 beast::error_code ec;
183 try {
184 stream.connect(m_addr);
185 http::write(stream, req);
186
187 http::response<http::string_body> response;
188 boost::beast::flat_buffer buffer;
189 http::read(stream, buffer, response);
190 stream.socket().shutdown(tcp::socket::shutdown_both, ec); // NOLINT
191 if (response.result_int() != 200) {
192 throw(FailedPublish(ERS_HERE, std::string(response.reason())));
193 }
194 } catch (ers::Issue const&) {
195 m_connected = false;
196 stream.socket().shutdown(tcp::socket::shutdown_both, ec); // NOLINT
197 throw;
198 } catch (std::exception const& ex) {
199 m_connected = false;
200 stream.socket().shutdown(tcp::socket::shutdown_both, ec); // NOLINT
201 throw(FailedPublish(ERS_HERE, ex.what(), ex));
202 }
203 m_connected = true;
204}
205
206void
208{
209 TLOG_DEBUG(TLVL_RETRACT) << "retract() called, getting connection information";
210 json connections = json::array();
211 {
212 std::lock_guard<std::mutex> lock(m_mutex);
213 for (auto& con : m_registered_connections) {
214 json item;
215 item["connection_id"] = con.uid;
216 item["data_type"] = con.data_type;
217 connections.push_back(item);
218 }
220 }
221 if (connections.size() > 0) {
222 TLOG_DEBUG(TLVL_RETRACT) << "retract(): Retracting " << connections.size() << " connections";
223 http::request<http::string_body> req{ http::verb::post, "/retract", HTTP_V1_1 };
224 req.set(http::field::content_type, "application/json");
225 json body{ { "partition", m_session } };
226 body["connections"] = connections;
227 req.body() = body.dump();
228 req.prepare_payload();
229
230 boost::beast::tcp_stream stream(m_io_context);
231 beast::error_code ec;
232 try {
233 stream.connect(m_addr);
234 http::write(stream, req);
235 http::response<http::string_body> response;
236 boost::beast::flat_buffer buffer;
237 http::read(stream, buffer, response);
238 stream.socket().shutdown(tcp::socket::shutdown_both, ec); // NOLINT
239 if (response.result_int() != 200) {
240 throw(FailedRetract(ERS_HERE, "connection Id vector", std::string(response.reason())));
241 }
242 } catch (ers::Issue const&) {
243 m_connected = false;
244 stream.socket().shutdown(tcp::socket::shutdown_both, ec); // NOLINT
245 throw;
246 } catch (std::exception const& ex) {
247 m_connected = false;
248 stream.socket().shutdown(tcp::socket::shutdown_both, ec); // NOLINT
249 ers::error(FailedRetract(ERS_HERE, "connection Id vector", ex.what()));
250 }
251 m_connected = true;
252 }
253}
254
255void
257{
258 retract(std::vector<ConnectionId>{ connectionId });
259}
260
261void
262ConfigClient::retract(const std::vector<ConnectionId>& connectionIds)
263{
264 http::request<http::string_body> req{ http::verb::post, "/retract", HTTP_V1_1 };
265 req.set(http::field::content_type, "application/json");
266
267 json connections = json::array();
268 {
269 std::lock_guard<std::mutex> lock(m_mutex);
270 for (auto& con : connectionIds) {
271 auto reg_it = m_registered_connections.begin();
272 for (; reg_it != m_registered_connections.end(); ++reg_it) {
273 if (con.uid == reg_it->uid && con.data_type == reg_it->data_type)
274 break;
275 }
276 if (reg_it == m_registered_connections.end()) {
278 FailedRetract(ERS_HERE, con.uid + " of type " + con.data_type, "not in registered connections list"));
279 } else {
280 json item;
281 item["connection_id"] = con.uid;
282 item["data_type"] = con.data_type;
283 connections.push_back(item);
284 m_registered_connections.erase(reg_it);
285 }
286 }
287 }
288 json body{ { "partition", m_session } };
289 body["connections"] = connections;
290 req.body() = body.dump();
291 req.prepare_payload();
292
293 boost::beast::tcp_stream stream(m_io_context);
294 beast::error_code ec;
295 try {
296 stream.connect(m_addr);
297 http::write(stream, req);
298 http::response<http::string_body> response;
299 boost::beast::flat_buffer buffer;
300 http::read(stream, buffer, response);
301 stream.socket().shutdown(tcp::socket::shutdown_both, ec); // NOLINT
302 if (response.result_int() != 200) {
303 throw(FailedRetract(ERS_HERE, "connection Id vector", std::string(response.reason())));
304 }
305 } catch (ers::Issue const&) {
306 m_connected = false;
307 stream.socket().shutdown(tcp::socket::shutdown_both, ec); // NOLINT
308 throw;
309 } catch (std::exception const& ex) {
310 m_connected = false;
311 stream.socket().shutdown(tcp::socket::shutdown_both, ec); // NOLINT
312 ers::error(FailedRetract(ERS_HERE, "connection Id vector", ex.what()));
313 }
314 m_connected = true;
315}
static constexpr int HTTP_V1_1
net::ip::tcp tcp
@ TLVL_RETRACT
@ TLVL_RESOLVE
@ TLVL_PUBLISH
#define ERS_HERE
ConnectionResponse resolve_connection(const ConnectionRequest &query, std::string session="")
net::ip::basic_resolver< net::ip::tcp >::results_type m_addr
std::set< ConnectionRegistration > m_registered_connections
ConfigClient(const std::string &server, const std::string &port, const std::string &session_name, std::chrono::milliseconds publish_interval)
Base class for any user define issue.
Definition Issue.hpp:69
const char * what() const noexcept
General cause of the issue.
Definition Issue.hpp:133
#define TLOG_DEBUG(lvl,...)
Definition Logging.hpp:112
#define TLOG(...)
Definition macro.hpp:22
void error(const Issue &issue)
Definition ers.hpp:81
std::vector< ConnectionInfo > connections