91 std::string target =
"/getconnection/" +
session;
92 http::request<http::string_body> req{ http::verb::post, target,
HTTP_V1_1 };
93 req.set(http::field::content_type,
"application/json");
94 nlohmann::json jquery = query;
95 req.body() = jquery.dump();
96 req.prepare_payload();
98 http::response<http::string_body> response;
100 beast::error_code ec;
103 http::write(stream, req);
105 boost::beast::flat_buffer buffer;
106 http::read(stream, buffer, response);
108 stream.socket().shutdown(tcp::socket::shutdown_both, ec);
111 if (response.result_int() != 200) {
112 throw(FailedLookup(
ERS_HERE, query.
uid_regex, target, std::string(response.reason())));
116 stream.socket().shutdown(tcp::socket::shutdown_both, ec);
118 }
catch (std::exception
const& ex) {
120 stream.socket().shutdown(tcp::socket::shutdown_both, ec);
125 json result = json::parse(response.body());
128 for (
auto const& item : result) {
163 json content{ {
"partition",
m_session } };
164 json connections = json::array();
166 std::lock_guard<std::mutex> lock(
m_mutex);
169 connections.push_back(item);
171 if (connections.size() == 0) {
175 content[
"connections"] = connections;
176 http::request<http::string_body> req{ http::verb::post,
"/publish",
HTTP_V1_1 };
177 req.set(http::field::content_type,
"application/json");
178 req.body() = content.dump();
179 req.prepare_payload();
182 beast::error_code ec;
185 http::write(stream, req);
187 http::response<http::string_body> response;
188 boost::beast::flat_buffer buffer;
189 http::read(stream, buffer, response);
190 stream.socket().shutdown(tcp::socket::shutdown_both, ec);
191 if (response.result_int() != 200) {
192 throw(FailedPublish(
ERS_HERE, std::string(response.reason())));
196 stream.socket().shutdown(tcp::socket::shutdown_both, ec);
198 }
catch (std::exception
const& ex) {
200 stream.socket().shutdown(tcp::socket::shutdown_both, ec);
201 throw(FailedPublish(
ERS_HERE, ex.what(), ex));
210 json connections = json::array();
212 std::lock_guard<std::mutex> lock(
m_mutex);
215 item[
"connection_id"] = con.uid;
216 item[
"data_type"] = con.data_type;
217 connections.push_back(item);
221 if (connections.size() > 0) {
223 http::request<http::string_body> req{ http::verb::post,
"/retract",
HTTP_V1_1 };
224 req.set(http::field::content_type,
"application/json");
226 body[
"connections"] = connections;
227 req.body() = body.dump();
228 req.prepare_payload();
231 beast::error_code ec;
234 http::write(stream, req);
235 http::response<http::string_body> response;
236 boost::beast::flat_buffer buffer;
237 http::read(stream, buffer, response);
238 stream.socket().shutdown(tcp::socket::shutdown_both, ec);
239 if (response.result_int() != 200) {
240 throw(FailedRetract(
ERS_HERE,
"connection Id vector", std::string(response.reason())));
244 stream.socket().shutdown(tcp::socket::shutdown_both, ec);
246 }
catch (std::exception
const& ex) {
248 stream.socket().shutdown(tcp::socket::shutdown_both, ec);
264 http::request<http::string_body> req{ http::verb::post,
"/retract",
HTTP_V1_1 };
265 req.set(http::field::content_type,
"application/json");
267 json connections = json::array();
269 std::lock_guard<std::mutex> lock(
m_mutex);
270 for (
auto& con : connectionIds) {
273 if (con.uid == reg_it->uid && con.data_type == reg_it->data_type)
278 FailedRetract(
ERS_HERE, con.uid +
" of type " + con.data_type,
"not in registered connections list"));
281 item[
"connection_id"] = con.uid;
282 item[
"data_type"] = con.data_type;
283 connections.push_back(item);
289 body[
"connections"] = connections;
290 req.body() = body.dump();
291 req.prepare_payload();
294 beast::error_code ec;
297 http::write(stream, req);
298 http::response<http::string_body> response;
299 boost::beast::flat_buffer buffer;
300 http::read(stream, buffer, response);
301 stream.socket().shutdown(tcp::socket::shutdown_both, ec);
302 if (response.result_int() != 200) {
303 throw(FailedRetract(
ERS_HERE,
"connection Id vector", std::string(response.reason())));
307 stream.socket().shutdown(tcp::socket::shutdown_both, ec);
309 }
catch (std::exception
const& ex) {
311 stream.socket().shutdown(tcp::socket::shutdown_both, ec);