81 std::string target =
"/getconnection/" +
session;
82 http::request<http::string_body> req{ http::verb::post, target, 11 };
83 req.set(http::field::content_type,
"application/json");
84 nlohmann::json jquery = query;
85 req.body() = jquery.dump();
86 req.prepare_payload();
88 http::response<http::string_body> response;
93 http::write(stream, req);
95 boost::beast::flat_buffer buffer;
96 http::read(stream, buffer, response);
98 stream.socket().shutdown(tcp::socket::shutdown_both, ec);
99 TLOG_DEBUG(25) <<
"get " << target <<
" response: " << response;
101 if (response.result_int() != 200) {
102 throw(FailedLookup(
ERS_HERE, query.
uid_regex, target, std::string(response.reason())));
106 stream.socket().shutdown(tcp::socket::shutdown_both, ec);
108 }
catch (std::exception
const& ex) {
110 stream.socket().shutdown(tcp::socket::shutdown_both, ec);
115 json result = json::parse(response.body());
118 for (
auto item : result) {
152 json content{ {
"partition",
m_session } };
153 json connections = json::array();
155 std::lock_guard<std::mutex> lock(
m_mutex);
158 connections.push_back(item);
160 if (connections.size() == 0) {
164 content[
"connections"] = connections;
165 http::request<http::string_body> req{ http::verb::post,
"/publish", 11 };
166 req.set(http::field::content_type,
"application/json");
167 req.body() = content.dump();
168 req.prepare_payload();
171 beast::error_code ec;
174 http::write(stream, req);
176 http::response<http::string_body> response;
177 boost::beast::flat_buffer buffer;
178 http::read(stream, buffer, response);
179 stream.socket().shutdown(tcp::socket::shutdown_both, ec);
180 if (response.result_int() != 200) {
181 throw(FailedPublish(
ERS_HERE, std::string(response.reason())));
185 stream.socket().shutdown(tcp::socket::shutdown_both, ec);
187 }
catch (std::exception
const& ex) {
189 stream.socket().shutdown(tcp::socket::shutdown_both, ec);
190 throw(FailedPublish(
ERS_HERE, ex.what(), ex));
198 TLOG_DEBUG(1) <<
"retract() called, getting connection information";
199 json connections = json::array();
201 std::lock_guard<std::mutex> lock(
m_mutex);
204 item[
"connection_id"] = con.uid;
205 item[
"data_type"] = con.data_type;
206 connections.push_back(item);
210 if (connections.size() > 0) {
211 TLOG_DEBUG(1) <<
"retract(): Retracting " << connections.size() <<
" connections";
212 http::request<http::string_body> req{ http::verb::post,
"/retract", 11 };
213 req.set(http::field::content_type,
"application/json");
215 body[
"connections"] = connections;
216 req.body() = body.dump();
217 req.prepare_payload();
220 beast::error_code ec;
223 http::write(stream, req);
224 http::response<http::string_body> response;
225 boost::beast::flat_buffer buffer;
226 http::read(stream, buffer, response);
227 stream.socket().shutdown(tcp::socket::shutdown_both, ec);
228 if (response.result_int() != 200) {
229 throw(FailedRetract(
ERS_HERE,
"connection Id vector", std::string(response.reason())));
233 stream.socket().shutdown(tcp::socket::shutdown_both, ec);
235 }
catch (std::exception
const& ex) {
237 stream.socket().shutdown(tcp::socket::shutdown_both, ec);
253 http::request<http::string_body> req{ http::verb::post,
"/retract", 11 };
254 req.set(http::field::content_type,
"application/json");
256 json connections = json::array();
258 std::lock_guard<std::mutex> lock(
m_mutex);
259 for (
auto& con : connectionIds) {
262 if (con.uid == reg_it->uid && con.data_type == reg_it->data_type)
267 FailedRetract(
ERS_HERE, con.uid +
" of type " + con.data_type,
"not in registered connections list"));
270 item[
"connection_id"] = con.uid;
271 item[
"data_type"] = con.data_type;
272 connections.push_back(item);
278 body[
"connections"] = connections;
279 req.body() = body.dump();
280 req.prepare_payload();
283 beast::error_code ec;
286 http::write(stream, req);
287 http::response<http::string_body> response;
288 boost::beast::flat_buffer buffer;
289 http::read(stream, buffer, response);
290 stream.socket().shutdown(tcp::socket::shutdown_both, ec);
291 if (response.result_int() != 200) {
292 throw(FailedRetract(
ERS_HERE,
"connection Id vector", std::string(response.reason())));
296 stream.socket().shutdown(tcp::socket::shutdown_both, ec);
298 }
catch (std::exception
const& ex) {
300 stream.socket().shutdown(tcp::socket::shutdown_both, ec);