Line data Source code
1 : /**
2 : * @file RestCommandFacility.cpp
3 : *
4 : * This is part of the DUNE DAQ Application Framework, copyright 2020.
5 : * Licensing/copyright details are in the COPYING file that you should have
6 : * received with this code.
7 : */
8 : #include "restcmd/RestEndpoint.hpp"
9 :
10 : #include "cmdlib/CommandFacility.hpp"
11 : #include "cmdlib/Issues.hpp"
12 :
13 : #include "confmodel/ConnectivityService.hpp"
14 : #include "iomanager/network/ConfigClient.hpp"
15 : #include "iomanager/network/ConfigClientStructs.hpp"
16 : #include "utilities/get_ips.hpp"
17 :
18 : #include <cetlib/BasicPluginFactory.h>
19 : #include "logging/Logging.hpp"
20 : // #include <tbb/concurrent_queue.h>
21 : #include <folly/Uri.h>
22 :
23 : #include <chrono>
24 : #include <fstream>
25 : #include <functional>
26 : #include <limits.h>
27 : #include <map>
28 : #include <memory>
29 : #include <string>
30 : #include <unistd.h>
31 :
32 : using namespace dunedaq::cmdlib;
33 : using namespace dunedaq::restcmd;
34 :
35 : class restCommandFacility : public CommandFacility
36 : {
37 :
38 : public:
39 : // friend backend for calling functions on CF
40 : friend class RestEndpoint;
41 :
42 0 : explicit restCommandFacility(std::string uri,
43 : std::string session_name,
44 : const dunedaq::confmodel::ConnectivityService* connectivity_service)
45 0 : : CommandFacility(uri)
46 0 : , m_session_name(session_name)
47 : {
48 :
49 :
50 0 : folly::Uri furi(uri);
51 :
52 0 : std::string hostname = furi.hostname();
53 0 : int port = furi.port();
54 :
55 0 : if (connectivity_service != nullptr) {
56 0 : auto connectivity_service_port = std::to_string(connectivity_service->get_service()->get_port());
57 0 : m_connectivity_client = std::make_unique<dunedaq::iomanager::ConfigClient>(
58 0 : connectivity_service->get_host(),
59 : connectivity_service_port,
60 : m_session_name,
61 0 : std::chrono::milliseconds(connectivity_service->get_interval_ms()));
62 0 : }
63 :
64 0 : if (port == 0 && connectivity_service == nullptr) {
65 0 : throw dunedaq::cmdlib::MalformedUri(
66 0 : ERS_HERE, "Can't bind the REST API to port 0 without connectivity service", std::to_string(port));
67 : }
68 :
69 0 : try { // to setup backend
70 0 : command_executor_ = std::bind(&inherited::execute_command, this, std::placeholders::_1, std::placeholders::_2);
71 0 : rest_endpoint_ = std::make_unique<dunedaq::restcmd::RestEndpoint>(hostname, port, command_executor_);
72 0 : rest_endpoint_->init(1); // 1 thread
73 0 : TLOG() << std::format("Endpoint open on host: {} port: {}", hostname, port);
74 :
75 0 : } catch (const std::exception& ex) {
76 0 : ers::error(dunedaq::cmdlib::CommandFacilityInitialization(ERS_HERE, ex.what()));
77 0 : }
78 :
79 : // Store hostname for connectivity service registration
80 0 : m_hostname = hostname;
81 0 : }
82 :
83 0 : void run(std::atomic<bool>& end_marker)
84 : {
85 :
86 0 : char* app_name_c = std::getenv("DUNEDAQ_APPLICATION_NAME");
87 0 : if (!app_name_c)
88 0 : throw dunedaq::restcmd::EnvVarNotFound(ERS_HERE, "DUNEDAQ_APPLICATION_NAME");
89 0 : std::string app_name = std::string(app_name_c);
90 :
91 : // Start endpoint
92 0 : try {
93 0 : rest_endpoint_->start();
94 :
95 0 : if (m_connectivity_client) {
96 0 : int port = rest_endpoint_->getPort();
97 :
98 0 : auto ips = dunedaq::utilities::get_hostname_ips(m_hostname);
99 :
100 0 : if (ips.size() == 0)
101 0 : throw dunedaq::cmdlib::CommandFacilityInitialization(ERS_HERE, "Could not resolve hostname to IP address");
102 :
103 0 : TLOG() << "Registering the control endpoint (" << app_name
104 0 : << "_control) on the connectivity service: " << ips[0] << ":" << port;
105 0 : dunedaq::iomanager::ConnectionRegistration cr;
106 0 : cr.uid = app_name + "_control";
107 0 : cr.data_type = "RunControlMessage";
108 0 : cr.uri = "rest://" + ips[0] + ":" + std::to_string(port);
109 0 : cr.connection_type = dunedaq::iomanager::ConnectionType::kSendRecv;
110 0 : m_connectivity_client->publish(cr);
111 0 : }
112 0 : } catch (const std::exception& ex) {
113 0 : ers::fatal(dunedaq::cmdlib::RunLoopTerminated(ERS_HERE, ex.what()));
114 0 : }
115 :
116 : // Wait until marked
117 0 : while (end_marker) {
118 0 : std::this_thread::sleep_for(std::chrono::seconds(1));
119 : }
120 :
121 : // Shutdown
122 0 : rest_endpoint_->shutdown();
123 0 : if (m_connectivity_client) {
124 0 : dunedaq::iomanager::ConnectionId ci{ app_name + "_control", "RunControlMessage", m_session_name };
125 0 : m_connectivity_client->retract(ci);
126 0 : }
127 0 : }
128 :
129 : protected:
130 : typedef CommandFacility inherited;
131 :
132 : // Implementation of completionHandler interface
133 0 : void completion_callback(const cmdobj_t& cmd, cmd::CommandReply& meta)
134 : {
135 0 : rest_endpoint_->handleResponseCommand(cmd, meta);
136 0 : }
137 :
138 : private:
139 : // Manager, HTTP REST Endpoint and backend resources
140 : mutable std::unique_ptr<RestEndpoint> rest_endpoint_;
141 :
142 : typedef std::function<void(const cmdobj_t&, cmd::CommandReply)> RequestCallback;
143 : RequestCallback command_executor_;
144 :
145 : std::string m_session_name;
146 : std::string m_hostname;
147 : std::unique_ptr<dunedaq::iomanager::ConfigClient> m_connectivity_client;
148 : };
149 :
150 : extern "C"
151 : {
152 0 : std::shared_ptr<dunedaq::cmdlib::CommandFacility> make(std::string uri,
153 : std::string session_name, const dunedaq::confmodel::ConnectivityService* connectivity_service)
154 : {
155 :
156 0 : return std::shared_ptr<dunedaq::cmdlib::CommandFacility>(
157 0 : new restCommandFacility(uri, session_name, connectivity_service));
158 : }
159 : }
|