LCOV - code coverage report
Current view: top level - iomanager/src/network - ConfigClient.cpp (source / functions) Coverage Total Hit
Test: code.result Lines: 0.0 % 224 0
Test Date: 2025-12-21 13:07:08 Functions: 0.0 % 20 0

            Line data    Source code
       1              : /**
       2              :  * @file ConfigClient.cpp
       3              :  *
       4              :  * This is part of the DUNE DAQ Application Framework, copyright 2020.
       5              :  * Licensing/copyright details are in the COPYING file that you should have
       6              :  * received with this code.
       7              :  */
       8              : 
       9              : #include "iomanager/network/ConfigClient.hpp"
      10              : #include "iomanager/network/NetworkIssues.hpp"
      11              : 
      12              : #include "logging/Logging.hpp"
      13              : 
      14              : #include <boost/beast/http.hpp>
      15              : 
      16              : #include <chrono>
      17              : #include <cstdlib>
      18              : #include <iostream>
      19              : #include <string>
      20              : #include <vector>
      21              : 
      22              : using tcp = net::ip::tcp;     // from <boost/asio/ip/tcp.hpp>
      23              : namespace http = beast::http; // from <boost/beast/http.hpp>
      24              : using nlohmann::json;
      25              : 
      26              : using namespace dunedaq::iomanager;
      27              : 
      28              : static constexpr int HTTP_V1_1 = 11;
      29              : 
      30              : enum
      31              : {
      32              :   TLVL_PUBLISH = 20,
      33              :   TLVL_RETRACT = 25,
      34              :   TLVL_RESOLVE = 30
      35              : };
      36              : 
      37            0 : ConfigClient::ConfigClient(const std::string& server,
      38              :                            const std::string& port,
      39              :                            const std::string& session_name,
      40            0 :                            std::chrono::milliseconds publish_interval)
      41              : {
      42            0 :   m_session = session_name;
      43              : 
      44            0 :   tcp::resolver resolver(m_io_context);
      45            0 :   m_addr = resolver.resolve(server, port);
      46            0 :   m_active = true;
      47            0 :   m_thread = std::thread([this, publish_interval]() {
      48            0 :     while (m_active) {
      49            0 :       try {
      50            0 :         publish();
      51            0 :         m_connected = true;
      52            0 :         TLOG_DEBUG(TLVL_PUBLISH) << "Automatic publish complete";
      53            0 :       } catch (ers::Issue& ex) {
      54            0 :         if (m_connected)
      55            0 :           ers::error(ex);
      56              :         else
      57            0 :           TLOG() << ex;
      58            0 :       } catch (std::exception& ex) {
      59            0 :         auto ers_ex = PublishException(ERS_HERE, ex.what());
      60            0 :         if (m_connected)
      61            0 :           ers::error(ers_ex);
      62              :         else
      63            0 :           TLOG() << ers_ex;
      64            0 :       }
      65            0 :       std::this_thread::sleep_for(publish_interval);
      66              :     }
      67            0 :     retract();
      68            0 :     if (!m_connected) {
      69            0 :       ers::error(PublishException(ERS_HERE, "Publish thread was unable to publish to Connectivity Service!"));
      70              :     }
      71            0 :   });
      72            0 : }
      73              : 
      74            0 : ConfigClient::~ConfigClient()
      75              : {
      76            0 :   m_active = false;
      77            0 :   if (m_thread.joinable()) {
      78            0 :     m_thread.join();
      79              :   }
      80            0 :   m_connected = false;
      81            0 :   retract();
      82            0 : }
      83              : 
      84              : ConnectionResponse
      85            0 : ConfigClient::resolve_connection(const ConnectionRequest& query, std::string session)
      86              : {
      87            0 :   if (session == "") {
      88            0 :     session = m_session;
      89              :   }
      90            0 :   TLOG_DEBUG(TLVL_RESOLVE) << "Getting connections matching <" << query.uid_regex << "> in session " << session;
      91            0 :   std::string target = "/getconnection/" + session;
      92            0 :   http::request<http::string_body> req{ http::verb::post, target, HTTP_V1_1 };
      93            0 :   req.set(http::field::content_type, "application/json");
      94            0 :   nlohmann::json jquery = query;
      95            0 :   req.body() = jquery.dump();
      96            0 :   req.prepare_payload();
      97              : 
      98            0 :   http::response<http::string_body> response;
      99            0 :   boost::beast::tcp_stream stream(m_io_context);
     100            0 :   beast::error_code ec;
     101            0 :   try {
     102            0 :     stream.connect(m_addr);
     103            0 :     http::write(stream, req);
     104              : 
     105            0 :     boost::beast::flat_buffer buffer;
     106            0 :     http::read(stream, buffer, response);
     107              : 
     108            0 :     stream.socket().shutdown(tcp::socket::shutdown_both, ec); // NOLINT
     109            0 :     TLOG_DEBUG(TLVL_RESOLVE) << "get " << target << " response: " << response;
     110              : 
     111            0 :     if (response.result_int() != 200) {
     112            0 :       throw(FailedLookup(ERS_HERE, query.uid_regex, target, std::string(response.reason())));
     113              :     }
     114            0 :   } catch (ers::Issue const&) {
     115            0 :     m_connected = false;
     116            0 :     stream.socket().shutdown(tcp::socket::shutdown_both, ec); // NOLINT
     117            0 :     throw;
     118            0 :   } catch (std::exception const& ex) {
     119            0 :     m_connected = false;
     120            0 :     stream.socket().shutdown(tcp::socket::shutdown_both, ec); // NOLINT
     121            0 :     ers::error(FailedLookup(ERS_HERE, query.uid_regex, target, ex.what()));
     122            0 :     return ConnectionResponse();
     123            0 :   }
     124            0 :   m_connected = true;
     125            0 :   json result = json::parse(response.body());
     126            0 :   TLOG_DEBUG(TLVL_RESOLVE) << result.dump();
     127            0 :   ConnectionResponse res;
     128            0 :   for (auto const& item : result) {
     129            0 :     res.connections.emplace_back(item.get<ConnectionInfo>());
     130              :   }
     131            0 :   return res;
     132            0 : }
     133              : 
     134              : void
     135            0 : ConfigClient::publish(ConnectionRegistration const& connection)
     136              : {
     137            0 :   {
     138            0 :     std::lock_guard<std::mutex> lock(m_mutex);
     139            0 :     TLOG_DEBUG(TLVL_PUBLISH) << "Adding connection with UID " << connection.uid << " and URI " << connection.uri
     140            0 :                              << " to publish list";
     141              : 
     142            0 :     m_registered_connections.insert(connection);
     143            0 :   }
     144            0 : }
     145              : 
     146              : void
     147            0 : ConfigClient::publish(const std::vector<ConnectionRegistration>& connections)
     148              : {
     149            0 :   {
     150            0 :     std::lock_guard<std::mutex> lock(m_mutex);
     151            0 :     for (auto& entry : connections) {
     152            0 :       TLOG_DEBUG(TLVL_PUBLISH) << "Adding connection with UID " << entry.uid << " and URI " << entry.uri
     153            0 :                                << " to publish list";
     154              : 
     155            0 :       m_registered_connections.insert(entry);
     156              :     }
     157            0 :   }
     158            0 : }
     159              : 
     160              : void
     161            0 : ConfigClient::publish()
     162              : {
     163            0 :   json content{ { "partition", m_session } };
     164            0 :   json connections = json::array();
     165            0 :   {
     166            0 :     std::lock_guard<std::mutex> lock(m_mutex);
     167            0 :     for (auto& entry : m_registered_connections) {
     168            0 :       json item = entry;
     169            0 :       connections.push_back(item);
     170            0 :     }
     171            0 :     if (connections.size() == 0) {
     172            0 :       return;
     173              :     }
     174            0 :   }
     175            0 :   content["connections"] = connections;
     176            0 :   http::request<http::string_body> req{ http::verb::post, "/publish", HTTP_V1_1 };
     177            0 :   req.set(http::field::content_type, "application/json");
     178            0 :   req.body() = content.dump();
     179            0 :   req.prepare_payload();
     180              : 
     181            0 :   boost::beast::tcp_stream stream(m_io_context);
     182            0 :   beast::error_code ec;
     183            0 :   try {
     184            0 :     stream.connect(m_addr);
     185            0 :     http::write(stream, req);
     186              : 
     187            0 :     http::response<http::string_body> response;
     188            0 :     boost::beast::flat_buffer buffer;
     189            0 :     http::read(stream, buffer, response);
     190            0 :     stream.socket().shutdown(tcp::socket::shutdown_both, ec); // NOLINT
     191            0 :     if (response.result_int() != 200) {
     192            0 :       throw(FailedPublish(ERS_HERE, std::string(response.reason())));
     193              :     }
     194            0 :   } catch (ers::Issue const&) {
     195            0 :     m_connected = false;
     196            0 :     stream.socket().shutdown(tcp::socket::shutdown_both, ec); // NOLINT
     197            0 :     throw;
     198            0 :   } catch (std::exception const& ex) {
     199            0 :     m_connected = false;
     200            0 :     stream.socket().shutdown(tcp::socket::shutdown_both, ec); // NOLINT
     201            0 :     throw(FailedPublish(ERS_HERE, ex.what(), ex));
     202            0 :   }
     203            0 :   m_connected = true;
     204            0 : }
     205              : 
     206              : void
     207            0 : ConfigClient::retract()
     208              : {
     209            0 :   TLOG_DEBUG(TLVL_RETRACT) << "retract() called, getting connection information";
     210            0 :   json connections = json::array();
     211            0 :   {
     212            0 :     std::lock_guard<std::mutex> lock(m_mutex);
     213            0 :     for (auto& con : m_registered_connections) {
     214            0 :       json item;
     215            0 :       item["connection_id"] = con.uid;
     216            0 :       item["data_type"] = con.data_type;
     217            0 :       connections.push_back(item);
     218            0 :     }
     219            0 :     m_registered_connections.clear();
     220            0 :   }
     221            0 :   if (connections.size() > 0) {
     222            0 :     TLOG_DEBUG(TLVL_RETRACT) << "retract(): Retracting " << connections.size() << " connections";
     223            0 :     http::request<http::string_body> req{ http::verb::post, "/retract", HTTP_V1_1 };
     224            0 :     req.set(http::field::content_type, "application/json");
     225            0 :     json body{ { "partition", m_session } };
     226            0 :     body["connections"] = connections;
     227            0 :     req.body() = body.dump();
     228            0 :     req.prepare_payload();
     229              : 
     230            0 :     boost::beast::tcp_stream stream(m_io_context);
     231            0 :     beast::error_code ec;
     232            0 :     try {
     233            0 :       stream.connect(m_addr);
     234            0 :       http::write(stream, req);
     235            0 :       http::response<http::string_body> response;
     236            0 :       boost::beast::flat_buffer buffer;
     237            0 :       http::read(stream, buffer, response);
     238            0 :       stream.socket().shutdown(tcp::socket::shutdown_both, ec); // NOLINT
     239            0 :       if (response.result_int() != 200) {
     240            0 :         throw(FailedRetract(ERS_HERE, "connection Id vector", std::string(response.reason())));
     241              :       }
     242            0 :     } catch (ers::Issue const&) {
     243            0 :       m_connected = false;
     244            0 :       stream.socket().shutdown(tcp::socket::shutdown_both, ec); // NOLINT
     245            0 :       throw;
     246            0 :     } catch (std::exception const& ex) {
     247            0 :       m_connected = false;
     248            0 :       stream.socket().shutdown(tcp::socket::shutdown_both, ec); // NOLINT
     249            0 :       ers::error(FailedRetract(ERS_HERE, "connection Id vector", ex.what()));
     250            0 :     }
     251            0 :     m_connected = true;
     252            0 :   }
     253            0 : }
     254              : 
     255              : void
     256            0 : ConfigClient::retract(const ConnectionId& connectionId)
     257              : {
     258            0 :   retract(std::vector<ConnectionId>{ connectionId });
     259            0 : }
     260              : 
     261              : void
     262            0 : ConfigClient::retract(const std::vector<ConnectionId>& connectionIds)
     263              : {
     264            0 :   http::request<http::string_body> req{ http::verb::post, "/retract", HTTP_V1_1 };
     265            0 :   req.set(http::field::content_type, "application/json");
     266              : 
     267            0 :   json connections = json::array();
     268            0 :   {
     269            0 :     std::lock_guard<std::mutex> lock(m_mutex);
     270            0 :     for (auto& con : connectionIds) {
     271            0 :       auto reg_it = m_registered_connections.begin();
     272            0 :       for (; reg_it != m_registered_connections.end(); ++reg_it) {
     273            0 :         if (con.uid == reg_it->uid && con.data_type == reg_it->data_type)
     274              :           break;
     275              :       }
     276            0 :       if (reg_it == m_registered_connections.end()) {
     277            0 :         ers::error(
     278            0 :           FailedRetract(ERS_HERE, con.uid + " of type " + con.data_type, "not in registered connections list"));
     279              :       } else {
     280            0 :         json item;
     281            0 :         item["connection_id"] = con.uid;
     282            0 :         item["data_type"] = con.data_type;
     283            0 :         connections.push_back(item);
     284            0 :         m_registered_connections.erase(reg_it);
     285            0 :       }
     286              :     }
     287            0 :   }
     288            0 :   json body{ { "partition", m_session } };
     289            0 :   body["connections"] = connections;
     290            0 :   req.body() = body.dump();
     291            0 :   req.prepare_payload();
     292              : 
     293            0 :   boost::beast::tcp_stream stream(m_io_context);
     294            0 :   beast::error_code ec;
     295            0 :   try {
     296            0 :     stream.connect(m_addr);
     297            0 :     http::write(stream, req);
     298            0 :     http::response<http::string_body> response;
     299            0 :     boost::beast::flat_buffer buffer;
     300            0 :     http::read(stream, buffer, response);
     301            0 :     stream.socket().shutdown(tcp::socket::shutdown_both, ec); // NOLINT
     302            0 :     if (response.result_int() != 200) {
     303            0 :       throw(FailedRetract(ERS_HERE, "connection Id vector", std::string(response.reason())));
     304              :     }
     305            0 :   } catch (ers::Issue const&) {
     306            0 :     m_connected = false;
     307            0 :     stream.socket().shutdown(tcp::socket::shutdown_both, ec); // NOLINT
     308            0 :     throw;
     309            0 :   } catch (std::exception const& ex) {
     310            0 :     m_connected = false;
     311            0 :     stream.socket().shutdown(tcp::socket::shutdown_both, ec); // NOLINT
     312            0 :     ers::error(FailedRetract(ERS_HERE, "connection Id vector", ex.what()));
     313            0 :   }
     314            0 :   m_connected = true;
     315            0 : }
        

Generated by: LCOV version 2.0-1