Line data Source code
1 : /**
2 : * @file SocketWriterModule.cpp Boost.Asio-based socket writer plugin for low-bandwidth devices
3 : *
4 : * This is part of the DUNE DAQ , copyright 2020.
5 : * Licensing/copyright details are in the COPYING file that you should have
6 : * received with this code.
7 : */
8 :
9 : #include "SocketWriterModule.hpp"
10 :
11 : #include "CreateGenericReceiver.hpp"
12 :
13 : #include "appfwk/ConfigurationManager.hpp"
14 : #include "appmodel/SocketDataSender.hpp"
15 : #include "appmodel/NWDetDataSender.hpp"
16 : #include "appmodel/SocketReceiver.hpp"
17 : #include "appmodel/SocketWriterConf.hpp"
18 : #include "confmodel/DetectorStream.hpp"
19 : #include "appmodel/NetworkDetectorToDaqConnection.hpp"
20 : #include "confmodel/QueueWithSourceId.hpp"
21 :
22 : #include "datahandlinglibs/DataHandlingIssues.hpp"
23 : #include "datahandlinglibs/DataMoveCallbackRegistry.hpp"
24 :
25 : #include "asiolibs/opmon/SocketWriterModule.pb.h"
26 :
27 : #include "asiolibs/AsioIssues.hpp"
28 :
29 : #include <string>
30 : #include <memory>
31 : #include <vector>
32 : #include <utility>
33 :
34 : namespace dunedaq::asiolibs {
35 :
36 0 : SocketWriterModule::SocketWriterModule(const std::string& name)
37 : : DAQModule(name)
38 0 : , m_work_guard(boost::asio::make_work_guard(m_io_context))
39 : {
40 0 : register_command("conf", &SocketWriterModule::do_configure);
41 0 : register_command("start", &SocketWriterModule::do_start);
42 0 : register_command("stop_trigger_sources", &SocketWriterModule::do_stop);
43 0 : }
44 :
45 : void
46 0 : SocketWriterModule::init(const std::shared_ptr<appfwk::ConfigurationManager> mcfg)
47 : {
48 0 : m_cfg = mcfg;
49 0 : auto* mdal = m_cfg->get_dal<appmodel::SocketDataWriterModule>(get_name());
50 0 : auto* module_conf = mdal->get_configuration()->cast<appmodel::SocketWriterConf>();
51 :
52 0 : m_callback_conf = mdal->get_raw_data_callback();
53 0 : const auto remote_ip = module_conf->get_remote_ip();
54 :
55 0 : m_socket_type = string_to_socket_type(module_conf->get_socket_type());
56 0 : if (m_socket_type != SocketType::TCP && m_socket_type != SocketType::UDP) {
57 0 : throw std::invalid_argument("Error: Only TCP and UDP are allowed!");
58 : }
59 :
60 0 : for (auto* d2d_conn : mdal->get_connections()) {
61 0 : if (d2d_conn->is_disabled(*(m_cfg->get_session()))) {
62 0 : continue;
63 : }
64 :
65 0 : for (auto* nw_sender : d2d_conn->get_net_senders()) {
66 :
67 0 : if (nw_sender->is_disabled(*(m_cfg->get_session()))) {
68 0 : continue;
69 : }
70 :
71 0 : if (nw_sender->get_streams().size() > 1) {
72 0 : dunedaq::datahandlinglibs::GenericConfigurationError err(ERS_HERE,
73 0 : "Multiple streams currently are not supported!");
74 0 : ers::fatal(err);
75 0 : throw err;
76 0 : }
77 0 : const auto* socket_sender = nw_sender->cast<appmodel::SocketDataSender>();
78 0 : if (socket_sender == nullptr) {
79 0 : throw dunedaq::datahandlinglibs::InitializationError(
80 0 : ERS_HERE,
81 0 : fmt::format("Found {} of type {} in connection {} while expecting type SocketDetDataSender",
82 0 : nw_sender->class_name(),
83 0 : nw_sender->UID(),
84 0 : d2d_conn->UID()));
85 : }
86 0 : m_writer_configs.emplace_back(remote_ip, socket_sender->get_port(), std::make_shared<SocketStats>());
87 : }
88 : }
89 :
90 0 : m_writers.reserve(m_writer_configs.size());
91 0 : if (m_socket_type == SocketType::TCP) {
92 0 : for (std::size_t i = 0; i < m_writer_configs.size(); ++i) {
93 0 : m_writers.emplace_back(TCPWriter());
94 : }
95 : } else {
96 0 : for (std::size_t i = 0; i < m_writer_configs.size(); ++i) {
97 0 : m_writers.emplace_back(UDPWriter());
98 : }
99 : }
100 :
101 : // Raw input connection sensibility check
102 0 : if (m_callback_conf == nullptr) {
103 0 : TLOG() << "No callback configuration given!";
104 : //ers::error(ConfigurationError(ERS_HERE, m_sourceid, No callback configuration given!"));
105 : }
106 0 : }
107 :
108 : SocketWriterModule::SocketType
109 0 : SocketWriterModule::string_to_socket_type(const std::string& socket_type) const
110 : {
111 0 : if (socket_type == "TCP") {
112 : return SocketWriterModule::SocketType::TCP;
113 0 : } else if (socket_type == "UDP") {
114 0 : return SocketWriterModule::SocketType::UDP;
115 : }
116 : return SocketWriterModule::SocketType::INVALID;
117 : }
118 :
119 : void
120 0 : SocketWriterModule::consume_payload(GenericReceiverConcept::TypeErasedPayload payload)
121 : {
122 0 : for (auto& writer : m_writers) {
123 0 : std::visit([this, payload](auto& w) mutable { // lets payload to be moved
124 0 : boost::asio::co_spawn(m_io_context, w.start(std::move(payload)), boost::asio::detached);
125 0 : }, writer);
126 : }
127 0 : }
128 :
129 : void
130 0 : SocketWriterModule::do_configure(const CommandData_t&)
131 : {
132 : // Configure and register consume callback
133 0 : m_consume_callback = std::bind(&SocketWriterModule::consume_payload, this, std::placeholders::_1);
134 :
135 : // Register callback
136 0 : auto dmcbr = datahandlinglibs::DataMoveCallbackRegistry::get();
137 0 : dmcbr->register_callback<GenericReceiverConcept::TypeErasedPayload>(m_callback_conf, m_consume_callback);
138 :
139 0 : for (std::size_t i = 0; i < m_writers.size(); ++i) {
140 0 : const auto& writer_config = m_writer_configs[i];
141 0 : std::visit([this, &writer_config](auto& writer) { writer.configure(m_io_context, writer_config); }, m_writers[i]);
142 : }
143 0 : }
144 :
145 : void
146 0 : SocketWriterModule::do_start(const CommandData_t&)
147 : {
148 0 : for (const auto& writer_config : m_writer_configs) {
149 : // Reset opmon variables
150 0 : writer_config.socket_stats->sum_payloads = 0;
151 0 : writer_config.socket_stats->num_payloads = 0;
152 0 : writer_config.socket_stats->sum_bytes = 0;
153 0 : writer_config.socket_stats->rawq_timeout_count = 0;
154 0 : writer_config.socket_stats->stats_packet_count = 0;
155 : }
156 :
157 0 : m_t0 = std::chrono::steady_clock::now();
158 :
159 0 : m_io_thread = std::jthread([this] { m_io_context.run(); });
160 0 : }
161 :
162 : void
163 0 : SocketWriterModule::do_stop(const CommandData_t&)
164 : {
165 0 : for (auto& writer : m_writers) {
166 0 : std::visit([](auto& writer) { writer.stop(); }, writer);
167 : }
168 :
169 0 : m_work_guard.reset();
170 0 : }
171 :
172 : void
173 0 : SocketWriterModule::generate_opmon_data()
174 : {
175 0 : for (const auto& writer_config : m_writer_configs) {
176 0 : opmon::SocketWriterStats stats;
177 0 : stats.set_sum_payloads(writer_config.socket_stats->sum_payloads.load());
178 0 : stats.set_num_payloads(writer_config.socket_stats->num_payloads.exchange(0));
179 0 : stats.set_sum_bytes(writer_config.socket_stats->sum_bytes.load());
180 0 : stats.set_num_data_input_timeouts(writer_config.socket_stats->rawq_timeout_count.exchange(0));
181 :
182 0 : auto now = std::chrono::steady_clock::now();
183 0 : int new_packets = writer_config.socket_stats->stats_packet_count.exchange(0);
184 0 : double seconds = std::chrono::duration_cast<std::chrono::microseconds>(now - m_t0).count() / 1000000.;
185 0 : m_t0 = now;
186 :
187 0 : stats.set_rate_payloads_consumed(new_packets / seconds / 1000.);
188 :
189 0 : publish(std::move(stats), { { "socket-writer", std::to_string(writer_config.remote_port) } });
190 0 : }
191 0 : }
192 :
193 : void
194 0 : SocketWriterModule::TCPWriter::configure(boost::asio::io_context& io_context, const WriterConfig& writer_config)
195 : {
196 0 : m_socket_stats = writer_config.socket_stats;
197 :
198 0 : while (true) {
199 0 : try {
200 0 : m_socket = std::make_unique<boost::asio::ip::tcp::socket>(io_context);
201 :
202 0 : m_socket->connect(boost::asio::ip::tcp::endpoint(
203 0 : boost::asio::ip::address::from_string(writer_config.remote_ip), writer_config.remote_port));
204 0 : break;
205 0 : } catch (const boost::system::system_error& e) {
206 0 : TLOG() << "Connection failed: " << e.what() << ". Retrying in 1 second...";
207 0 : std::this_thread::sleep_for(std::chrono::seconds(1));
208 0 : }
209 : }
210 :
211 0 : TLOG() << "Established TCP connection to " << writer_config.remote_ip << ":" << writer_config.remote_port;
212 0 : }
213 :
214 : boost::asio::awaitable<void>
215 0 : SocketWriterModule::TCPWriter::start(GenericReceiverConcept::TypeErasedPayload payload) // TODO (DTE): Rename
216 : {
217 : const auto bytes_sent =
218 : co_await boost::asio::async_write(*m_socket, boost::asio::buffer(payload.data, payload.size), boost::asio::use_awaitable);
219 : ++m_socket_stats->num_payloads;
220 : ++m_socket_stats->sum_payloads;
221 : m_socket_stats->sum_bytes.fetch_add(bytes_sent);
222 : ++m_socket_stats->stats_packet_count;
223 0 : }
224 :
225 : void
226 0 : SocketWriterModule::TCPWriter::stop()
227 : {
228 0 : m_socket->shutdown(boost::asio::ip::tcp::socket::shutdown_both);
229 0 : m_socket->close();
230 0 : TLOG() << "Shutdown TCP connection";
231 0 : }
232 :
233 : void
234 0 : SocketWriterModule::UDPWriter::configure(boost::asio::io_context& io_context, const WriterConfig& writer_config)
235 : {
236 0 : m_writer_config = writer_config;
237 :
238 : // Let the OS pick an available local IP and port for sending packets
239 0 : const boost::asio::ip::udp::endpoint sender_endpoint(boost::asio::ip::udp::v4(), 0);
240 :
241 0 : m_socket = std::make_unique<boost::asio::ip::udp::socket>(io_context, sender_endpoint);
242 :
243 0 : TLOG() << "Created UDP socket on " << m_socket->local_endpoint().address() << ":"
244 0 : << m_socket->local_endpoint().port();
245 0 : }
246 :
247 : boost::asio::awaitable<void>
248 0 : SocketWriterModule::UDPWriter::start(GenericReceiverConcept::TypeErasedPayload payload)
249 : {
250 : boost::asio::ip::udp::endpoint receiver_endpoint(boost::asio::ip::address::from_string(m_writer_config.remote_ip),
251 : m_writer_config.remote_port);
252 : const auto bytes_sent = co_await m_socket->async_send_to(
253 : boost::asio::buffer(payload.data, payload.size), receiver_endpoint, boost::asio::use_awaitable);
254 : ++m_writer_config.socket_stats->num_payloads;
255 : ++m_writer_config.socket_stats->sum_payloads;
256 : m_writer_config.socket_stats->sum_bytes.fetch_add(bytes_sent);
257 : ++m_writer_config.socket_stats->stats_packet_count;
258 0 : }
259 :
260 : void
261 0 : SocketWriterModule::UDPWriter::stop()
262 : {
263 0 : m_socket->close();
264 0 : TLOG() << "Closed UDP socket";
265 0 : }
266 :
267 : } // namespace dunedaq::asiolibs
268 :
269 0 : DEFINE_DUNE_DAQ_MODULE(dunedaq::asiolibs::SocketWriterModule)
|