DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
dunedaq::iomanager::ConfigClient Class Reference

#include <ConfigClient.hpp>

Public Member Functions

 ConfigClient (const std::string &server, const std::string &port, const std::string &session_name, std::chrono::milliseconds publish_interval)
 
 ~ConfigClient ()
 
ConnectionResponse resolveConnection (const ConnectionRequest &query, std::string session="")
 
void publish (ConnectionRegistration const &connection)
 
void publish (std::vector< ConnectionRegistration > const &connections)
 
void retract (const ConnectionId &connectionId)
 
void retract (const std::vector< ConnectionId > &connectionIds)
 
void retract ()
 
bool is_connected ()
 

Private Member Functions

void publish ()
 

Private Attributes

std::string m_session
 
net::io_context m_ioContext
 
net::ip::basic_resolver< net::ip::tcp >::results_type m_addr
 
std::mutex m_mutex
 
std::set< ConnectionRegistrationm_registered_connections
 
std::thread m_thread
 
bool m_active
 
std::atomic< bool > m_connected { false }
 

Detailed Description

Definition at line 34 of file ConfigClient.hpp.

Constructor & Destructor Documentation

◆ ConfigClient()

ConfigClient::ConfigClient ( const std::string & server,
const std::string & port,
const std::string & session_name,
std::chrono::milliseconds publish_interval )

Constructor: Starts a thread that publishes all know connection information once a second

Parameters
serverName/address of the connection server to publish to
portPort on the connection server to connect to
session_nameName of the current Session
publish_intervalTime to wait between connection republish (keep-alive)

Definition at line 27 of file ConfigClient.cpp.

31{
32 m_session = session_name;
33
34 tcp::resolver resolver(m_ioContext);
35 m_addr = resolver.resolve(server, port);
36 m_active = true;
37 m_thread = std::thread([this, publish_interval]() {
38 while (m_active) {
39 try {
40 publish();
41 m_connected = true;
42 TLOG_DEBUG(24) << "Automatic publish complete";
43 } catch (ers::Issue& ex) {
44 if (m_connected)
45 ers::error(ex);
46 else
47 TLOG() << ex;
48 } catch (std::exception& ex) {
49 auto ers_ex = PublishException(ERS_HERE, ex.what());
50 if (m_connected)
51 ers::error(ers_ex);
52 else
53 TLOG() << ers_ex;
54 }
55 std::this_thread::sleep_for(publish_interval);
56 }
57 retract();
58 if (!m_connected) {
59 ers::error(PublishException(ERS_HERE, "Publish thread was unable to publish to Connectivity Service!"));
60 }
61 });
62}
#define ERS_HERE
net::ip::basic_resolver< net::ip::tcp >::results_type m_addr
Base class for any user define issue.
Definition Issue.hpp:69
#define TLOG_DEBUG(lvl,...)
Definition Logging.hpp:112
#define TLOG(...)
Definition macro.hpp:22
void error(const Issue &issue)
Definition ers.hpp:81

◆ ~ConfigClient()

ConfigClient::~ConfigClient ( )

Destructor: stops the publishing hread and retracts all published information

Definition at line 64 of file ConfigClient.cpp.

65{
66 m_active = false;
67 if (m_thread.joinable()) {
68 m_thread.join();
69 }
70 m_connected = false;
71 retract();
72}

Member Function Documentation

◆ is_connected()

bool dunedaq::iomanager::ConfigClient::is_connected ( )
inline

Definition at line 97 of file ConfigClient.hpp.

97{ return m_connected.load(); }

◆ publish() [1/3]

void ConfigClient::publish ( )
private

Definition at line 150 of file ConfigClient.cpp.

151{
152 json content{ { "partition", m_session } };
153 json connections = json::array();
154 {
155 std::lock_guard<std::mutex> lock(m_mutex);
156 for (auto& entry : m_registered_connections) {
157 json item = entry;
158 connections.push_back(item);
159 }
160 if (connections.size() == 0) {
161 return;
162 }
163 }
164 content["connections"] = connections;
165 http::request<http::string_body> req{ http::verb::post, "/publish", 11 };
166 req.set(http::field::content_type, "application/json");
167 req.body() = content.dump();
168 req.prepare_payload();
169
170 boost::beast::tcp_stream stream(m_ioContext);
171 beast::error_code ec;
172 try {
173 stream.connect(m_addr);
174 http::write(stream, req);
175
176 http::response<http::string_body> response;
177 boost::beast::flat_buffer buffer;
178 http::read(stream, buffer, response);
179 stream.socket().shutdown(tcp::socket::shutdown_both, ec);
180 if (response.result_int() != 200) {
181 throw(FailedPublish(ERS_HERE, std::string(response.reason())));
182 }
183 } catch (ers::Issue const&) {
184 m_connected = false;
185 stream.socket().shutdown(tcp::socket::shutdown_both, ec);
186 throw;
187 } catch (std::exception const& ex) {
188 m_connected = false;
189 stream.socket().shutdown(tcp::socket::shutdown_both, ec);
190 throw(FailedPublish(ERS_HERE, ex.what(), ex));
191 }
192 m_connected = true;
193}
std::set< ConnectionRegistration > m_registered_connections

◆ publish() [2/3]

void ConfigClient::publish ( ConnectionRegistration const & connection)

Publish information for a single connection

Parameters
connectionIdThe connection Id to be published
uriThe uri corresponding to the connection id

Definition at line 125 of file ConfigClient.cpp.

126{
127 {
128 std::lock_guard<std::mutex> lock(m_mutex);
129 TLOG_DEBUG(26) << "Adding connection with UID " << connection.uid << " and URI " << connection.uri
130 << " to publish list";
131
132 m_registered_connections.insert(connection);
133 }
134}

◆ publish() [3/3]

void ConfigClient::publish ( std::vector< ConnectionRegistration > const & connections)

Publish information for multiple connections

Parameters
connectionIdA vector of connection Ids to be published
uriA vector of uris corresponding to the connection ids. This vector must be the same length as the connection id vector

Definition at line 137 of file ConfigClient.cpp.

138{
139 {
140 std::lock_guard<std::mutex> lock(m_mutex);
141 for (auto& entry : connections) {
142 TLOG_DEBUG(26) << "Adding connection with UID " << entry.uid << " and URI " << entry.uri << " to publish list";
143
144 m_registered_connections.insert(entry);
145 }
146 }
147}

◆ resolveConnection()

ConnectionResponse ConfigClient::resolveConnection ( const ConnectionRequest & query,
std::string session = "" )

Look up a connection in the connection server and return a list of uris that correspond to connection ids that match

Parameters
queryQuery string to send to the server. Query is a regular expression that can match with multiple connection ids
sessionThe session that the requested connection is part of

Definition at line 75 of file ConfigClient.cpp.

76{
77 if (session == "") {
79 }
80 TLOG_DEBUG(25) << "Getting connections matching <" << query.uid_regex << "> in session " << session;
81 std::string target = "/getconnection/" + session;
82 http::request<http::string_body> req{ http::verb::post, target, 11 };
83 req.set(http::field::content_type, "application/json");
84 nlohmann::json jquery = query;
85 req.body() = jquery.dump();
86 req.prepare_payload();
87
88 http::response<http::string_body> response;
89 boost::beast::tcp_stream stream(m_ioContext);
90 beast::error_code ec;
91 try {
92 stream.connect(m_addr);
93 http::write(stream, req);
94
95 boost::beast::flat_buffer buffer;
96 http::read(stream, buffer, response);
97
98 stream.socket().shutdown(tcp::socket::shutdown_both, ec);
99 TLOG_DEBUG(25) << "get " << target << " response: " << response;
100
101 if (response.result_int() != 200) {
102 throw(FailedLookup(ERS_HERE, query.uid_regex, target, std::string(response.reason())));
103 }
104 } catch (ers::Issue const&) {
105 m_connected = false;
106 stream.socket().shutdown(tcp::socket::shutdown_both, ec);
107 throw;
108 } catch (std::exception const& ex) {
109 m_connected = false;
110 stream.socket().shutdown(tcp::socket::shutdown_both, ec);
111 ers::error(FailedLookup(ERS_HERE, query.uid_regex, target, ex.what()));
112 return ConnectionResponse();
113 }
114 m_connected = true;
115 json result = json::parse(response.body());
116 TLOG_DEBUG(25) << result.dump();
118 for (auto item : result) {
119 res.connections.emplace_back(item.get<ConnectionInfo>());
120 }
121 return res;
122}
std::vector< ConnectionInfo > connections

◆ retract() [1/3]

void ConfigClient::retract ( )

Retract all connection information that ew have published

Definition at line 196 of file ConfigClient.cpp.

197{
198 TLOG_DEBUG(1) << "retract() called, getting connection information";
199 json connections = json::array();
200 {
201 std::lock_guard<std::mutex> lock(m_mutex);
202 for (auto& con : m_registered_connections) {
203 json item;
204 item["connection_id"] = con.uid;
205 item["data_type"] = con.data_type;
206 connections.push_back(item);
207 }
209 }
210 if (connections.size() > 0) {
211 TLOG_DEBUG(1) << "retract(): Retracting " << connections.size() << " connections";
212 http::request<http::string_body> req{ http::verb::post, "/retract", 11 };
213 req.set(http::field::content_type, "application/json");
214 json body{ { "partition", m_session } };
215 body["connections"] = connections;
216 req.body() = body.dump();
217 req.prepare_payload();
218
219 boost::beast::tcp_stream stream(m_ioContext);
220 beast::error_code ec;
221 try {
222 stream.connect(m_addr);
223 http::write(stream, req);
224 http::response<http::string_body> response;
225 boost::beast::flat_buffer buffer;
226 http::read(stream, buffer, response);
227 stream.socket().shutdown(tcp::socket::shutdown_both, ec);
228 if (response.result_int() != 200) {
229 throw(FailedRetract(ERS_HERE, "connection Id vector", std::string(response.reason())));
230 }
231 } catch (ers::Issue const&) {
232 m_connected = false;
233 stream.socket().shutdown(tcp::socket::shutdown_both, ec);
234 throw;
235 } catch (std::exception const& ex) {
236 m_connected = false;
237 stream.socket().shutdown(tcp::socket::shutdown_both, ec);
238 ers::error(FailedRetract(ERS_HERE, "connection Id vector", ex.what()));
239 }
240 m_connected = true;
241 }
242}

◆ retract() [2/3]

void ConfigClient::retract ( const ConnectionId & connectionId)

Retract a single published connection

Definition at line 245 of file ConfigClient.cpp.

246{
247 retract(std::vector<ConnectionId>{ connectionId });
248}

◆ retract() [3/3]

void ConfigClient::retract ( const std::vector< ConnectionId > & connectionIds)

Retract multiple published connections

Parameters
connectionIdA vector of previously published connection Ids to be retracted

Definition at line 251 of file ConfigClient.cpp.

252{
253 http::request<http::string_body> req{ http::verb::post, "/retract", 11 };
254 req.set(http::field::content_type, "application/json");
255
256 json connections = json::array();
257 {
258 std::lock_guard<std::mutex> lock(m_mutex);
259 for (auto& con : connectionIds) {
260 auto reg_it = m_registered_connections.begin();
261 for (; reg_it != m_registered_connections.end(); ++reg_it) {
262 if (con.uid == reg_it->uid && con.data_type == reg_it->data_type)
263 break;
264 }
265 if (reg_it == m_registered_connections.end()) {
267 FailedRetract(ERS_HERE, con.uid + " of type " + con.data_type, "not in registered connections list"));
268 } else {
269 json item;
270 item["connection_id"] = con.uid;
271 item["data_type"] = con.data_type;
272 connections.push_back(item);
273 m_registered_connections.erase(reg_it);
274 }
275 }
276 }
277 json body{ { "partition", m_session } };
278 body["connections"] = connections;
279 req.body() = body.dump();
280 req.prepare_payload();
281
282 boost::beast::tcp_stream stream(m_ioContext);
283 beast::error_code ec;
284 try {
285 stream.connect(m_addr);
286 http::write(stream, req);
287 http::response<http::string_body> response;
288 boost::beast::flat_buffer buffer;
289 http::read(stream, buffer, response);
290 stream.socket().shutdown(tcp::socket::shutdown_both, ec);
291 if (response.result_int() != 200) {
292 throw(FailedRetract(ERS_HERE, "connection Id vector", std::string(response.reason())));
293 }
294 } catch (ers::Issue const&) {
295 m_connected = false;
296 stream.socket().shutdown(tcp::socket::shutdown_both, ec);
297 throw;
298 } catch (std::exception const& ex) {
299 m_connected = false;
300 stream.socket().shutdown(tcp::socket::shutdown_both, ec);
301 ers::error(FailedRetract(ERS_HERE, "connection Id vector", ex.what()));
302 }
303 m_connected = true;
304}

Member Data Documentation

◆ m_active

bool dunedaq::iomanager::ConfigClient::m_active
private

Definition at line 108 of file ConfigClient.hpp.

◆ m_addr

net::ip::basic_resolver<net::ip::tcp>::results_type dunedaq::iomanager::ConfigClient::m_addr
private

Definition at line 103 of file ConfigClient.hpp.

◆ m_connected

std::atomic<bool> dunedaq::iomanager::ConfigClient::m_connected { false }
private

Definition at line 109 of file ConfigClient.hpp.

109{ false };

◆ m_ioContext

net::io_context dunedaq::iomanager::ConfigClient::m_ioContext
private

Definition at line 102 of file ConfigClient.hpp.

◆ m_mutex

std::mutex dunedaq::iomanager::ConfigClient::m_mutex
private

Definition at line 105 of file ConfigClient.hpp.

◆ m_registered_connections

std::set<ConnectionRegistration> dunedaq::iomanager::ConfigClient::m_registered_connections
private

Definition at line 106 of file ConfigClient.hpp.

◆ m_session

std::string dunedaq::iomanager::ConfigClient::m_session
private

Definition at line 101 of file ConfigClient.hpp.

◆ m_thread

std::thread dunedaq::iomanager::ConfigClient::m_thread
private

Definition at line 107 of file ConfigClient.hpp.


The documentation for this class was generated from the following files: