DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
dunedaq::iomanager::ConfigClient Class Reference

#include <ConfigClient.hpp>

Public Member Functions

 ConfigClient (const std::string &server, const std::string &port, const std::string &session_name, std::chrono::milliseconds publish_interval)
 
 ~ConfigClient ()
 
ConnectionResponse resolve_connection (const ConnectionRequest &query, std::string session="")
 
void publish (ConnectionRegistration const &connection)
 
void publish (std::vector< ConnectionRegistration > const &connections)
 
void retract (const ConnectionId &connectionId)
 
void retract (const std::vector< ConnectionId > &connectionIds)
 
void retract ()
 
bool is_connected ()
 

Private Member Functions

void publish ()
 

Private Attributes

std::string m_session
 
net::io_context m_io_context
 
net::ip::basic_resolver< net::ip::tcp >::results_type m_addr
 
std::mutex m_mutex
 
std::set< ConnectionRegistrationm_registered_connections
 
std::thread m_thread
 
bool m_active
 
std::atomic< boolm_connected { false }
 

Detailed Description

Definition at line 34 of file ConfigClient.hpp.

Constructor & Destructor Documentation

◆ ConfigClient()

ConfigClient::ConfigClient ( const std::string & server,
const std::string & port,
const std::string & session_name,
std::chrono::milliseconds publish_interval )

Constructor: Starts a thread that publishes all know connection information once a second

Parameters
serverName/address of the connection server to publish to
portPort on the connection server to connect to
session_nameName of the current Session
publish_intervalTime to wait between connection republish (keep-alive)

Definition at line 37 of file ConfigClient.cpp.

41{
42 m_session = session_name;
43
44 tcp::resolver resolver(m_io_context);
45 m_addr = resolver.resolve(server, port);
46 m_active = true;
47 m_thread = std::thread([this, publish_interval]() {
48 while (m_active) {
49 try {
50 publish();
51 m_connected = true;
52 TLOG_DEBUG(TLVL_PUBLISH) << "Automatic publish complete";
53 } catch (ers::Issue& ex) {
54 if (m_connected)
55 ers::error(ex);
56 else
57 TLOG() << ex;
58 } catch (std::exception& ex) {
59 auto ers_ex = PublishException(ERS_HERE, ex.what());
60 if (m_connected)
61 ers::error(ers_ex);
62 else
63 TLOG() << ers_ex;
64 }
65 std::this_thread::sleep_for(publish_interval);
66 }
67 retract();
68 if (!m_connected) {
69 ers::error(PublishException(ERS_HERE, "Publish thread was unable to publish to Connectivity Service!"));
70 }
71 });
72}
@ TLVL_PUBLISH
#define ERS_HERE
net::ip::basic_resolver< net::ip::tcp >::results_type m_addr
Base class for any user define issue.
Definition Issue.hpp:69
#define TLOG_DEBUG(lvl,...)
Definition Logging.hpp:112
#define TLOG(...)
Definition macro.hpp:22
void error(const Issue &issue)
Definition ers.hpp:81

◆ ~ConfigClient()

ConfigClient::~ConfigClient ( )

Destructor: stops the publishing hread and retracts all published information

Definition at line 74 of file ConfigClient.cpp.

75{
76 m_active = false;
77 if (m_thread.joinable()) {
78 m_thread.join();
79 }
80 m_connected = false;
81 retract();
82}

Member Function Documentation

◆ is_connected()

bool dunedaq::iomanager::ConfigClient::is_connected ( )
inline

Definition at line 86 of file ConfigClient.hpp.

86{ return m_connected.load(); }

◆ publish() [1/3]

void ConfigClient::publish ( )
private

Definition at line 161 of file ConfigClient.cpp.

162{
163 json content{ { "partition", m_session } };
164 json connections = json::array();
165 {
166 std::lock_guard<std::mutex> lock(m_mutex);
167 for (auto& entry : m_registered_connections) {
168 json item = entry;
169 connections.push_back(item);
170 }
171 if (connections.size() == 0) {
172 return;
173 }
174 }
175 content["connections"] = connections;
176 http::request<http::string_body> req{ http::verb::post, "/publish", HTTP_V1_1 };
177 req.set(http::field::content_type, "application/json");
178 req.body() = content.dump();
179 req.prepare_payload();
180
181 boost::beast::tcp_stream stream(m_io_context);
182 beast::error_code ec;
183 try {
184 stream.connect(m_addr);
185 http::write(stream, req);
186
187 http::response<http::string_body> response;
188 boost::beast::flat_buffer buffer;
189 http::read(stream, buffer, response);
190 stream.socket().shutdown(tcp::socket::shutdown_both, ec); // NOLINT
191 if (response.result_int() != 200) {
192 throw(FailedPublish(ERS_HERE, std::string(response.reason())));
193 }
194 } catch (ers::Issue const&) {
195 m_connected = false;
196 stream.socket().shutdown(tcp::socket::shutdown_both, ec); // NOLINT
197 throw;
198 } catch (std::exception const& ex) {
199 m_connected = false;
200 stream.socket().shutdown(tcp::socket::shutdown_both, ec); // NOLINT
201 throw(FailedPublish(ERS_HERE, ex.what(), ex));
202 }
203 m_connected = true;
204}
static constexpr int HTTP_V1_1
std::set< ConnectionRegistration > m_registered_connections

◆ publish() [2/3]

void ConfigClient::publish ( ConnectionRegistration const & connection)

Definition at line 135 of file ConfigClient.cpp.

136{
137 {
138 std::lock_guard<std::mutex> lock(m_mutex);
139 TLOG_DEBUG(TLVL_PUBLISH) << "Adding connection with UID " << connection.uid << " and URI " << connection.uri
140 << " to publish list";
141
142 m_registered_connections.insert(connection);
143 }
144}

◆ publish() [3/3]

void ConfigClient::publish ( std::vector< ConnectionRegistration > const & connections)

Definition at line 147 of file ConfigClient.cpp.

148{
149 {
150 std::lock_guard<std::mutex> lock(m_mutex);
151 for (auto& entry : connections) {
152 TLOG_DEBUG(TLVL_PUBLISH) << "Adding connection with UID " << entry.uid << " and URI " << entry.uri
153 << " to publish list";
154
155 m_registered_connections.insert(entry);
156 }
157 }
158}

◆ resolve_connection()

ConnectionResponse ConfigClient::resolve_connection ( const ConnectionRequest & query,
std::string session = "" )

Look up a connection in the connection server and return a list of uris that correspond to connection ids that match

Parameters
queryQuery string to send to the server. Query is a regular expression that can match with multiple connection ids
sessionThe session that the requested connection is part of

Definition at line 85 of file ConfigClient.cpp.

86{
87 if (session == "") {
89 }
90 TLOG_DEBUG(TLVL_RESOLVE) << "Getting connections matching <" << query.uid_regex << "> in session " << session;
91 std::string target = "/getconnection/" + session;
92 http::request<http::string_body> req{ http::verb::post, target, HTTP_V1_1 };
93 req.set(http::field::content_type, "application/json");
94 nlohmann::json jquery = query;
95 req.body() = jquery.dump();
96 req.prepare_payload();
97
98 http::response<http::string_body> response;
99 boost::beast::tcp_stream stream(m_io_context);
100 beast::error_code ec;
101 try {
102 stream.connect(m_addr);
103 http::write(stream, req);
104
105 boost::beast::flat_buffer buffer;
106 http::read(stream, buffer, response);
107
108 stream.socket().shutdown(tcp::socket::shutdown_both, ec); // NOLINT
109 TLOG_DEBUG(TLVL_RESOLVE) << "get " << target << " response: " << response;
110
111 if (response.result_int() != 200) {
112 throw(FailedLookup(ERS_HERE, query.uid_regex, target, std::string(response.reason())));
113 }
114 } catch (ers::Issue const&) {
115 m_connected = false;
116 stream.socket().shutdown(tcp::socket::shutdown_both, ec); // NOLINT
117 throw;
118 } catch (std::exception const& ex) {
119 m_connected = false;
120 stream.socket().shutdown(tcp::socket::shutdown_both, ec); // NOLINT
121 ers::error(FailedLookup(ERS_HERE, query.uid_regex, target, ex.what()));
122 return ConnectionResponse();
123 }
124 m_connected = true;
125 json result = json::parse(response.body());
126 TLOG_DEBUG(TLVL_RESOLVE) << result.dump();
128 for (auto const& item : result) {
129 res.connections.emplace_back(item.get<ConnectionInfo>());
130 }
131 return res;
132}
@ TLVL_RESOLVE
std::vector< ConnectionInfo > connections

◆ retract() [1/3]

void ConfigClient::retract ( )

Retract all connection information that ew have published

Definition at line 207 of file ConfigClient.cpp.

208{
209 TLOG_DEBUG(TLVL_RETRACT) << "retract() called, getting connection information";
210 json connections = json::array();
211 {
212 std::lock_guard<std::mutex> lock(m_mutex);
213 for (auto& con : m_registered_connections) {
214 json item;
215 item["connection_id"] = con.uid;
216 item["data_type"] = con.data_type;
217 connections.push_back(item);
218 }
220 }
221 if (connections.size() > 0) {
222 TLOG_DEBUG(TLVL_RETRACT) << "retract(): Retracting " << connections.size() << " connections";
223 http::request<http::string_body> req{ http::verb::post, "/retract", HTTP_V1_1 };
224 req.set(http::field::content_type, "application/json");
225 json body{ { "partition", m_session } };
226 body["connections"] = connections;
227 req.body() = body.dump();
228 req.prepare_payload();
229
230 boost::beast::tcp_stream stream(m_io_context);
231 beast::error_code ec;
232 try {
233 stream.connect(m_addr);
234 http::write(stream, req);
235 http::response<http::string_body> response;
236 boost::beast::flat_buffer buffer;
237 http::read(stream, buffer, response);
238 stream.socket().shutdown(tcp::socket::shutdown_both, ec); // NOLINT
239 if (response.result_int() != 200) {
240 throw(FailedRetract(ERS_HERE, "connection Id vector", std::string(response.reason())));
241 }
242 } catch (ers::Issue const&) {
243 m_connected = false;
244 stream.socket().shutdown(tcp::socket::shutdown_both, ec); // NOLINT
245 throw;
246 } catch (std::exception const& ex) {
247 m_connected = false;
248 stream.socket().shutdown(tcp::socket::shutdown_both, ec); // NOLINT
249 ers::error(FailedRetract(ERS_HERE, "connection Id vector", ex.what()));
250 }
251 m_connected = true;
252 }
253}
@ TLVL_RETRACT

◆ retract() [2/3]

void ConfigClient::retract ( const ConnectionId & connectionId)

Definition at line 256 of file ConfigClient.cpp.

257{
258 retract(std::vector<ConnectionId>{ connectionId });
259}

◆ retract() [3/3]

void ConfigClient::retract ( const std::vector< ConnectionId > & connectionIds)

Retract multiple published connections

Parameters
connectionIdsA vector of previously published connection Ids to be retracted

Definition at line 262 of file ConfigClient.cpp.

263{
264 http::request<http::string_body> req{ http::verb::post, "/retract", HTTP_V1_1 };
265 req.set(http::field::content_type, "application/json");
266
267 json connections = json::array();
268 {
269 std::lock_guard<std::mutex> lock(m_mutex);
270 for (auto& con : connectionIds) {
271 auto reg_it = m_registered_connections.begin();
272 for (; reg_it != m_registered_connections.end(); ++reg_it) {
273 if (con.uid == reg_it->uid && con.data_type == reg_it->data_type)
274 break;
275 }
276 if (reg_it == m_registered_connections.end()) {
278 FailedRetract(ERS_HERE, con.uid + " of type " + con.data_type, "not in registered connections list"));
279 } else {
280 json item;
281 item["connection_id"] = con.uid;
282 item["data_type"] = con.data_type;
283 connections.push_back(item);
284 m_registered_connections.erase(reg_it);
285 }
286 }
287 }
288 json body{ { "partition", m_session } };
289 body["connections"] = connections;
290 req.body() = body.dump();
291 req.prepare_payload();
292
293 boost::beast::tcp_stream stream(m_io_context);
294 beast::error_code ec;
295 try {
296 stream.connect(m_addr);
297 http::write(stream, req);
298 http::response<http::string_body> response;
299 boost::beast::flat_buffer buffer;
300 http::read(stream, buffer, response);
301 stream.socket().shutdown(tcp::socket::shutdown_both, ec); // NOLINT
302 if (response.result_int() != 200) {
303 throw(FailedRetract(ERS_HERE, "connection Id vector", std::string(response.reason())));
304 }
305 } catch (ers::Issue const&) {
306 m_connected = false;
307 stream.socket().shutdown(tcp::socket::shutdown_both, ec); // NOLINT
308 throw;
309 } catch (std::exception const& ex) {
310 m_connected = false;
311 stream.socket().shutdown(tcp::socket::shutdown_both, ec); // NOLINT
312 ers::error(FailedRetract(ERS_HERE, "connection Id vector", ex.what()));
313 }
314 m_connected = true;
315}

Member Data Documentation

◆ m_active

bool dunedaq::iomanager::ConfigClient::m_active
private

Definition at line 97 of file ConfigClient.hpp.

◆ m_addr

net::ip::basic_resolver<net::ip::tcp>::results_type dunedaq::iomanager::ConfigClient::m_addr
private

Definition at line 92 of file ConfigClient.hpp.

◆ m_connected

std::atomic<bool> dunedaq::iomanager::ConfigClient::m_connected { false }
private

Definition at line 98 of file ConfigClient.hpp.

98{ false };

◆ m_io_context

net::io_context dunedaq::iomanager::ConfigClient::m_io_context
private

Definition at line 91 of file ConfigClient.hpp.

◆ m_mutex

std::mutex dunedaq::iomanager::ConfigClient::m_mutex
private

Definition at line 94 of file ConfigClient.hpp.

◆ m_registered_connections

std::set<ConnectionRegistration> dunedaq::iomanager::ConfigClient::m_registered_connections
private

Definition at line 95 of file ConfigClient.hpp.

◆ m_session

std::string dunedaq::iomanager::ConfigClient::m_session
private

Definition at line 90 of file ConfigClient.hpp.

◆ m_thread

std::thread dunedaq::iomanager::ConfigClient::m_thread
private

Definition at line 96 of file ConfigClient.hpp.


The documentation for this class was generated from the following files: