DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
ConfigClient.cpp
Go to the documentation of this file.
1
11
12#include "logging/Logging.hpp"
13
14#include <boost/beast/http.hpp>
15
16#include <chrono>
17#include <cstdlib>
18#include <iostream>
19#include <string>
20
21using tcp = net::ip::tcp; // from <boost/asio/ip/tcp.hpp>
22namespace http = beast::http; // from <boost/beast/http.hpp>
23using nlohmann::json;
24
25using namespace dunedaq::iomanager;
26
27ConfigClient::ConfigClient(const std::string& server,
28 const std::string& port,
29 const std::string& session_name,
30 std::chrono::milliseconds publish_interval)
31{
32 m_session = session_name;
33
34 tcp::resolver resolver(m_ioContext);
35 m_addr = resolver.resolve(server, port);
36 m_active = true;
37 m_thread = std::thread([this, publish_interval]() {
38 while (m_active) {
39 try {
40 publish();
41 m_connected = true;
42 TLOG_DEBUG(24) << "Automatic publish complete";
43 } catch (ers::Issue& ex) {
44 if (m_connected)
45 ers::error(ex);
46 else
47 TLOG() << ex;
48 } catch (std::exception& ex) {
49 auto ers_ex = PublishException(ERS_HERE, ex.what());
50 if (m_connected)
51 ers::error(ers_ex);
52 else
53 TLOG() << ers_ex;
54 }
55 std::this_thread::sleep_for(publish_interval);
56 }
57 retract();
58 if (!m_connected) {
59 ers::error(PublishException(ERS_HERE, "Publish thread was unable to publish to Connectivity Service!"));
60 }
61 });
62}
63
65{
66 m_active = false;
67 if (m_thread.joinable()) {
68 m_thread.join();
69 }
70 m_connected = false;
71 retract();
72}
73
76{
77 if (session == "") {
79 }
80 TLOG_DEBUG(25) << "Getting connections matching <" << query.uid_regex << "> in session " << session;
81 std::string target = "/getconnection/" + session;
82 http::request<http::string_body> req{ http::verb::post, target, 11 };
83 req.set(http::field::content_type, "application/json");
84 nlohmann::json jquery = query;
85 req.body() = jquery.dump();
86 req.prepare_payload();
87
88 http::response<http::string_body> response;
89 boost::beast::tcp_stream stream(m_ioContext);
90 beast::error_code ec;
91 try {
92 stream.connect(m_addr);
93 http::write(stream, req);
94
95 boost::beast::flat_buffer buffer;
96 http::read(stream, buffer, response);
97
98 stream.socket().shutdown(tcp::socket::shutdown_both, ec);
99 TLOG_DEBUG(25) << "get " << target << " response: " << response;
100
101 if (response.result_int() != 200) {
102 throw(FailedLookup(ERS_HERE, query.uid_regex, target, std::string(response.reason())));
103 }
104 } catch (ers::Issue const&) {
105 m_connected = false;
106 stream.socket().shutdown(tcp::socket::shutdown_both, ec);
107 throw;
108 } catch (std::exception const& ex) {
109 m_connected = false;
110 stream.socket().shutdown(tcp::socket::shutdown_both, ec);
111 ers::error(FailedLookup(ERS_HERE, query.uid_regex, target, ex.what()));
112 return ConnectionResponse();
113 }
114 m_connected = true;
115 json result = json::parse(response.body());
116 TLOG_DEBUG(25) << result.dump();
118 for (auto item : result) {
119 res.connections.emplace_back(item.get<ConnectionInfo>());
120 }
121 return res;
122}
123
124void
126{
127 {
128 std::lock_guard<std::mutex> lock(m_mutex);
129 TLOG_DEBUG(26) << "Adding connection with UID " << connection.uid << " and URI " << connection.uri
130 << " to publish list";
131
132 m_registered_connections.insert(connection);
133 }
134}
135
136void
137ConfigClient::publish(const std::vector<ConnectionRegistration>& connections)
138{
139 {
140 std::lock_guard<std::mutex> lock(m_mutex);
141 for (auto& entry : connections) {
142 TLOG_DEBUG(26) << "Adding connection with UID " << entry.uid << " and URI " << entry.uri << " to publish list";
143
144 m_registered_connections.insert(entry);
145 }
146 }
147}
148
149void
151{
152 json content{ { "partition", m_session } };
153 json connections = json::array();
154 {
155 std::lock_guard<std::mutex> lock(m_mutex);
156 for (auto& entry : m_registered_connections) {
157 json item = entry;
158 connections.push_back(item);
159 }
160 if (connections.size() == 0) {
161 return;
162 }
163 }
164 content["connections"] = connections;
165 http::request<http::string_body> req{ http::verb::post, "/publish", 11 };
166 req.set(http::field::content_type, "application/json");
167 req.body() = content.dump();
168 req.prepare_payload();
169
170 boost::beast::tcp_stream stream(m_ioContext);
171 beast::error_code ec;
172 try {
173 stream.connect(m_addr);
174 http::write(stream, req);
175
176 http::response<http::string_body> response;
177 boost::beast::flat_buffer buffer;
178 http::read(stream, buffer, response);
179 stream.socket().shutdown(tcp::socket::shutdown_both, ec);
180 if (response.result_int() != 200) {
181 throw(FailedPublish(ERS_HERE, std::string(response.reason())));
182 }
183 } catch (ers::Issue const&) {
184 m_connected = false;
185 stream.socket().shutdown(tcp::socket::shutdown_both, ec);
186 throw;
187 } catch (std::exception const& ex) {
188 m_connected = false;
189 stream.socket().shutdown(tcp::socket::shutdown_both, ec);
190 throw(FailedPublish(ERS_HERE, ex.what(), ex));
191 }
192 m_connected = true;
193}
194
195void
197{
198 TLOG_DEBUG(1) << "retract() called, getting connection information";
199 json connections = json::array();
200 {
201 std::lock_guard<std::mutex> lock(m_mutex);
202 for (auto& con : m_registered_connections) {
203 json item;
204 item["connection_id"] = con.uid;
205 item["data_type"] = con.data_type;
206 connections.push_back(item);
207 }
209 }
210 if (connections.size() > 0) {
211 TLOG_DEBUG(1) << "retract(): Retracting " << connections.size() << " connections";
212 http::request<http::string_body> req{ http::verb::post, "/retract", 11 };
213 req.set(http::field::content_type, "application/json");
214 json body{ { "partition", m_session } };
215 body["connections"] = connections;
216 req.body() = body.dump();
217 req.prepare_payload();
218
219 boost::beast::tcp_stream stream(m_ioContext);
220 beast::error_code ec;
221 try {
222 stream.connect(m_addr);
223 http::write(stream, req);
224 http::response<http::string_body> response;
225 boost::beast::flat_buffer buffer;
226 http::read(stream, buffer, response);
227 stream.socket().shutdown(tcp::socket::shutdown_both, ec);
228 if (response.result_int() != 200) {
229 throw(FailedRetract(ERS_HERE, "connection Id vector", std::string(response.reason())));
230 }
231 } catch (ers::Issue const&) {
232 m_connected = false;
233 stream.socket().shutdown(tcp::socket::shutdown_both, ec);
234 throw;
235 } catch (std::exception const& ex) {
236 m_connected = false;
237 stream.socket().shutdown(tcp::socket::shutdown_both, ec);
238 ers::error(FailedRetract(ERS_HERE, "connection Id vector", ex.what()));
239 }
240 m_connected = true;
241 }
242}
243
244void
246{
247 retract(std::vector<ConnectionId>{ connectionId });
248}
249
250void
251ConfigClient::retract(const std::vector<ConnectionId>& connectionIds)
252{
253 http::request<http::string_body> req{ http::verb::post, "/retract", 11 };
254 req.set(http::field::content_type, "application/json");
255
256 json connections = json::array();
257 {
258 std::lock_guard<std::mutex> lock(m_mutex);
259 for (auto& con : connectionIds) {
260 auto reg_it = m_registered_connections.begin();
261 for (; reg_it != m_registered_connections.end(); ++reg_it) {
262 if (con.uid == reg_it->uid && con.data_type == reg_it->data_type)
263 break;
264 }
265 if (reg_it == m_registered_connections.end()) {
267 FailedRetract(ERS_HERE, con.uid + " of type " + con.data_type, "not in registered connections list"));
268 } else {
269 json item;
270 item["connection_id"] = con.uid;
271 item["data_type"] = con.data_type;
272 connections.push_back(item);
273 m_registered_connections.erase(reg_it);
274 }
275 }
276 }
277 json body{ { "partition", m_session } };
278 body["connections"] = connections;
279 req.body() = body.dump();
280 req.prepare_payload();
281
282 boost::beast::tcp_stream stream(m_ioContext);
283 beast::error_code ec;
284 try {
285 stream.connect(m_addr);
286 http::write(stream, req);
287 http::response<http::string_body> response;
288 boost::beast::flat_buffer buffer;
289 http::read(stream, buffer, response);
290 stream.socket().shutdown(tcp::socket::shutdown_both, ec);
291 if (response.result_int() != 200) {
292 throw(FailedRetract(ERS_HERE, "connection Id vector", std::string(response.reason())));
293 }
294 } catch (ers::Issue const&) {
295 m_connected = false;
296 stream.socket().shutdown(tcp::socket::shutdown_both, ec);
297 throw;
298 } catch (std::exception const& ex) {
299 m_connected = false;
300 stream.socket().shutdown(tcp::socket::shutdown_both, ec);
301 ers::error(FailedRetract(ERS_HERE, "connection Id vector", ex.what()));
302 }
303 m_connected = true;
304}
net::ip::tcp tcp
#define ERS_HERE
net::ip::basic_resolver< net::ip::tcp >::results_type m_addr
std::set< ConnectionRegistration > m_registered_connections
ConnectionResponse resolveConnection(const ConnectionRequest &query, std::string session="")
ConfigClient(const std::string &server, const std::string &port, const std::string &session_name, std::chrono::milliseconds publish_interval)
Base class for any user define issue.
Definition Issue.hpp:69
const char * what() const noexcept
General cause of the issue.
Definition Issue.hpp:133
#define TLOG_DEBUG(lvl,...)
Definition Logging.hpp:112
#define TLOG(...)
Definition macro.hpp:22
void error(const Issue &issue)
Definition ers.hpp:81
std::vector< ConnectionInfo > connections