Line data Source code
1 : /**
2 : * @file SocketWriterModule.cpp Boost.Asio-based socket writer plugin for low-bandwidth devices
3 : *
4 : * This is part of the DUNE DAQ , copyright 2020.
5 : * Licensing/copyright details are in the COPYING file that you should have
6 : * received with this code.
7 : */
8 :
9 : #include "SocketWriterModule.hpp"
10 :
11 : #include "CreateGenericReceiver.hpp"
12 :
13 : #include "appfwk/ConfigurationManager.hpp"
14 : #include "appmodel/SocketDataSender.hpp"
15 : #include "appmodel/NWDetDataSender.hpp"
16 : #include "appmodel/SocketReceiver.hpp"
17 : #include "appmodel/SocketWriterConf.hpp"
18 : #include "confmodel/DetectorStream.hpp"
19 : #include "appmodel/NetworkDetectorToDaqConnection.hpp"
20 : #include "confmodel/QueueWithSourceId.hpp"
21 :
22 : #include "datahandlinglibs/DataHandlingIssues.hpp"
23 : #include "datahandlinglibs/DataMoveCallbackRegistry.hpp"
24 :
25 : #include "asiolibs/opmon/SocketWriterModule.pb.h"
26 :
27 : #include "asiolibs/AsioIssues.hpp"
28 :
29 : #include <string>
30 : #include <memory>
31 : #include <vector>
32 : #include <utility>
33 :
34 : namespace dunedaq::asiolibs {
35 :
36 0 : SocketWriterModule::SocketWriterModule(const std::string& name)
37 : : DAQModule(name)
38 0 : , m_work_guard(boost::asio::make_work_guard(m_io_context))
39 : {
40 0 : register_command("conf", &SocketWriterModule::do_configure);
41 0 : register_command("start", &SocketWriterModule::do_start);
42 0 : register_command("stop_trigger_sources", &SocketWriterModule::do_stop);
43 0 : }
44 :
45 : void
46 0 : SocketWriterModule::get_dal_inputs(const dunedaq::appmodel::SocketDataWriterModule* mdal)
47 : {
48 0 : if (mdal->get_inputs().empty()) {
49 0 : auto err = dunedaq::datahandlinglibs::InitializationError(ERS_HERE,
50 0 : "No inputs defined for socket writer in configuration.");
51 0 : ers::fatal(err);
52 0 : throw err;
53 0 : }
54 :
55 0 : for (auto* input : mdal->get_inputs()) {
56 0 : m_raw_data_receiver_connection_name = input->UID();
57 : // Parse for prefix
58 0 : std::string conn_name = input->UID();
59 0 : const char delim = '_';
60 0 : std::vector<std::string> words;
61 0 : std::size_t start;
62 0 : std::size_t end = 0;
63 0 : while ((start = conn_name.find_first_not_of(delim, end)) != std::string::npos) {
64 0 : end = conn_name.find(delim, start);
65 0 : words.push_back(conn_name.substr(start, end - start));
66 : }
67 :
68 0 : TLOG_DEBUG() << "Initialize connection based on uid: "
69 0 : << m_raw_data_receiver_connection_name << " front word: " << words.front();
70 :
71 0 : std::string cb_prefix("cb");
72 0 : if (words.front() == cb_prefix) {
73 0 : m_callback_mode = true;
74 : }
75 :
76 0 : if (!m_callback_mode) {
77 0 : const auto recv_timeout_ms = input->get_recv_timeout_ms();
78 0 : if (recv_timeout_ms == 0) {
79 0 : ers::warning(InvalidRawReceiverTimeout(ERS_HERE, m_raw_receiver_timeout_ms.count()));
80 : } else {
81 0 : m_raw_receiver_timeout_ms = std::chrono::milliseconds(recv_timeout_ms);
82 : }
83 : }
84 :
85 0 : auto* queue = input->cast<confmodel::QueueWithSourceId>();
86 0 : if (queue == nullptr) {
87 0 : auto err = dunedaq::datahandlinglibs::InitializationError(ERS_HERE, "Inputs are not of type QueueWithGeoId.");
88 0 : ers::fatal(err);
89 0 : throw err;
90 0 : }
91 :
92 0 : m_raw_data_receiver = createGenericReceiver(queue->UID(), m_raw_data_receiver_connection_name); // FIXME (DTE): Overwriting doesn't make sense
93 0 : }
94 0 : }
95 :
96 : void
97 0 : SocketWriterModule::init(const std::shared_ptr<appfwk::ConfigurationManager> mcfg)
98 : {
99 0 : m_cfg = mcfg;
100 0 : auto* mdal = m_cfg->get_dal<appmodel::SocketDataWriterModule>(get_name());
101 0 : auto* module_conf = mdal->get_configuration()->cast<appmodel::SocketWriterConf>();
102 :
103 0 : const auto remote_ip = module_conf->get_remote_ip();
104 :
105 0 : m_socket_type = string_to_socket_type(module_conf->get_socket_type());
106 0 : if (m_socket_type != SocketType::TCP && m_socket_type != SocketType::UDP) {
107 0 : throw std::invalid_argument("Error: Only TCP and UDP are allowed!");
108 : }
109 :
110 0 : for (auto* d2d_conn : mdal->get_connections()) {
111 0 : if (d2d_conn->is_disabled(*(m_cfg->get_session()))) {
112 0 : continue;
113 : }
114 :
115 0 : for (auto* nw_sender : d2d_conn->get_net_senders()) {
116 :
117 0 : if (nw_sender->is_disabled(*(m_cfg->get_session()))) {
118 0 : continue;
119 : }
120 :
121 0 : if (nw_sender->get_streams().size() > 1) {
122 0 : dunedaq::datahandlinglibs::GenericConfigurationError err(ERS_HERE,
123 0 : "Multiple streams currently are not supported!");
124 0 : ers::fatal(err);
125 0 : throw err;
126 0 : }
127 0 : const auto* socket_sender = nw_sender->cast<appmodel::SocketDataSender>();
128 0 : if (socket_sender == nullptr) {
129 0 : throw dunedaq::datahandlinglibs::InitializationError(
130 0 : ERS_HERE,
131 0 : fmt::format("Found {} of type {} in connection {} while expecting type SocketDetDataSender",
132 0 : nw_sender->class_name(),
133 0 : nw_sender->UID(),
134 0 : d2d_conn->UID()));
135 : }
136 0 : m_writer_configs.emplace_back(remote_ip, socket_sender->get_port(), std::make_shared<SocketStats>());
137 : }
138 : }
139 :
140 0 : m_writers.reserve(m_writer_configs.size());
141 0 : if (m_socket_type == SocketType::TCP) {
142 0 : for (std::size_t i = 0; i < m_writer_configs.size(); ++i) {
143 0 : m_writers.emplace_back(TCPWriter());
144 : }
145 : } else {
146 0 : for (std::size_t i = 0; i < m_writer_configs.size(); ++i) {
147 0 : m_writers.emplace_back(UDPWriter());
148 : }
149 : }
150 :
151 0 : get_dal_inputs(mdal);
152 :
153 : // Raw input connection sensibility check
154 0 : if (!m_callback_mode && m_raw_data_receiver == nullptr) {
155 0 : TLOG() << "Non callback mode, and receiver is unset!";
156 : //ers::error(ConfigurationError(ERS_HERE, m_sourceid, "Non callback mode, and receiver is unset!"));
157 : }
158 0 : }
159 :
160 : SocketWriterModule::SocketType
161 0 : SocketWriterModule::string_to_socket_type(const std::string& socket_type) const
162 : {
163 0 : if (socket_type == "TCP") {
164 : return SocketWriterModule::SocketType::TCP;
165 0 : } else if (socket_type == "UDP") {
166 0 : return SocketWriterModule::SocketType::UDP;
167 : }
168 : return SocketWriterModule::SocketType::INVALID;
169 : }
170 :
171 : void
172 0 : SocketWriterModule::run_consume()
173 : {
174 0 : TLOG() << "Consumer thread started..."; // TODO (DTE): Make debug logs
175 :
176 0 : while (m_run_marker.load()) {
177 : // Try to acquire data
178 :
179 0 : if (auto opt_payload = m_raw_data_receiver->try_receive(m_raw_receiver_timeout_ms)) {
180 0 : consume_payload(std::move(*opt_payload));
181 : } else {
182 0 : for (const auto& writer_config : m_writer_configs) {
183 0 : ++writer_config.socket_stats->rawq_timeout_count;
184 : }
185 0 : }
186 : }
187 :
188 0 : TLOG() << "Consumer thread joins... ";
189 0 : }
190 :
191 : void
192 0 : SocketWriterModule::consume_payload(GenericReceiverConcept::TypeErasedPayload payload)
193 : {
194 0 : for (auto& writer : m_writers) {
195 0 : std::visit([this, payload](auto& w) mutable { // lets payload to be moved
196 0 : boost::asio::co_spawn(m_io_context, w.start(std::move(payload)), boost::asio::detached);
197 0 : }, writer);
198 : }
199 0 : }
200 :
201 : void
202 0 : SocketWriterModule::do_configure(const CommandData_t&)
203 : {
204 : // Register callbacks if operating in that mode.
205 0 : if (m_callback_mode) {
206 : // Configure and register consume callback
207 0 : m_consume_callback = std::bind(&SocketWriterModule::consume_payload, this, std::placeholders::_1);
208 :
209 : // Register callback
210 0 : auto dmcbr = datahandlinglibs::DataMoveCallbackRegistry::get();
211 0 : dmcbr->register_callback<GenericReceiverConcept::TypeErasedPayload>(m_raw_data_receiver_connection_name, m_consume_callback);
212 0 : }
213 :
214 0 : for (std::size_t i = 0; i < m_writers.size(); ++i) {
215 0 : const auto& writer_config = m_writer_configs[i];
216 0 : std::visit([this, &writer_config](auto& writer) { writer.configure(m_io_context, writer_config); }, m_writers[i]);
217 : }
218 0 : }
219 :
220 : void
221 0 : SocketWriterModule::do_start(const CommandData_t&)
222 : {
223 0 : for (const auto& writer_config : m_writer_configs) {
224 : // Reset opmon variables
225 0 : writer_config.socket_stats->sum_payloads = 0;
226 0 : writer_config.socket_stats->num_payloads = 0;
227 0 : writer_config.socket_stats->sum_bytes = 0;
228 0 : writer_config.socket_stats->rawq_timeout_count = 0;
229 0 : writer_config.socket_stats->stats_packet_count = 0;
230 : }
231 :
232 0 : m_t0 = std::chrono::high_resolution_clock::now();
233 :
234 0 : if (!m_callback_mode) {
235 0 : m_run_marker.store(true);
236 0 : m_consumer_thread.set_work(&SocketWriterModule::run_consume, this);
237 : }
238 :
239 0 : m_io_thread = std::jthread([this] { m_io_context.run(); });
240 0 : }
241 :
242 : void
243 0 : SocketWriterModule::do_stop(const CommandData_t&)
244 : {
245 0 : if (!m_callback_mode) {
246 0 : m_run_marker.store(false);
247 0 : while (!m_consumer_thread.get_readiness()) {
248 0 : std::this_thread::sleep_for(std::chrono::milliseconds(10));
249 : }
250 : }
251 :
252 0 : for (auto& writer : m_writers) {
253 0 : std::visit([](auto& writer) { writer.stop(); }, writer);
254 : }
255 :
256 0 : m_work_guard.reset();
257 0 : }
258 :
259 : void
260 0 : SocketWriterModule::generate_opmon_data()
261 : {
262 0 : for (const auto& writer_config : m_writer_configs) {
263 0 : opmon::SocketWriterStats stats;
264 0 : stats.set_sum_payloads(writer_config.socket_stats->sum_payloads.load());
265 0 : stats.set_num_payloads(writer_config.socket_stats->num_payloads.exchange(0));
266 0 : stats.set_sum_bytes(writer_config.socket_stats->sum_bytes.load());
267 0 : stats.set_num_data_input_timeouts(writer_config.socket_stats->rawq_timeout_count.exchange(0));
268 :
269 0 : auto now = std::chrono::high_resolution_clock::now();
270 0 : int new_packets = writer_config.socket_stats->stats_packet_count.exchange(0);
271 0 : double seconds = std::chrono::duration_cast<std::chrono::microseconds>(now - m_t0).count() / 1000000.;
272 0 : m_t0 = now;
273 :
274 0 : stats.set_rate_payloads_consumed(new_packets / seconds / 1000.);
275 :
276 0 : publish(std::move(stats), { { "socket-writer", std::to_string(writer_config.remote_port) } });
277 0 : }
278 0 : }
279 :
280 : void
281 0 : SocketWriterModule::TCPWriter::configure(boost::asio::io_context& io_context, const WriterConfig& writer_config)
282 : {
283 0 : m_socket_stats = writer_config.socket_stats;
284 :
285 0 : while (true) {
286 0 : try {
287 0 : m_socket = std::make_unique<boost::asio::ip::tcp::socket>(io_context);
288 :
289 0 : m_socket->connect(boost::asio::ip::tcp::endpoint(
290 0 : boost::asio::ip::address::from_string(writer_config.remote_ip), writer_config.remote_port));
291 0 : break;
292 0 : } catch (const boost::system::system_error& e) {
293 0 : TLOG() << "Connection failed: " << e.what() << ". Retrying in 1 second...";
294 0 : std::this_thread::sleep_for(std::chrono::seconds(1));
295 0 : }
296 : }
297 :
298 0 : TLOG() << "Established TCP connection to " << writer_config.remote_ip << ":" << writer_config.remote_port;
299 0 : }
300 :
301 : boost::asio::awaitable<void>
302 0 : SocketWriterModule::TCPWriter::start(GenericReceiverConcept::TypeErasedPayload payload) // TODO (DTE): Rename
303 : {
304 : const auto bytes_sent =
305 : co_await boost::asio::async_write(*m_socket, boost::asio::buffer(payload.data, payload.size), boost::asio::use_awaitable);
306 : ++m_socket_stats->num_payloads;
307 : ++m_socket_stats->sum_payloads;
308 : m_socket_stats->sum_bytes.fetch_add(bytes_sent);
309 : ++m_socket_stats->stats_packet_count;
310 0 : }
311 :
312 : void
313 0 : SocketWriterModule::TCPWriter::stop()
314 : {
315 0 : m_socket->shutdown(boost::asio::ip::tcp::socket::shutdown_both);
316 0 : m_socket->close();
317 0 : TLOG() << "Shutdown TCP connection";
318 0 : }
319 :
320 : void
321 0 : SocketWriterModule::UDPWriter::configure(boost::asio::io_context& io_context, const WriterConfig& writer_config)
322 : {
323 0 : m_writer_config = writer_config;
324 :
325 : // Let the OS pick an available local IP and port for sending packets
326 0 : const boost::asio::ip::udp::endpoint sender_endpoint(boost::asio::ip::udp::v4(), 0);
327 :
328 0 : m_socket = std::make_unique<boost::asio::ip::udp::socket>(io_context, sender_endpoint);
329 :
330 0 : TLOG() << "Created UDP socket on " << m_socket->local_endpoint().address() << ":"
331 0 : << m_socket->local_endpoint().port();
332 0 : }
333 :
334 : boost::asio::awaitable<void>
335 0 : SocketWriterModule::UDPWriter::start(GenericReceiverConcept::TypeErasedPayload payload)
336 : {
337 : boost::asio::ip::udp::endpoint receiver_endpoint(boost::asio::ip::address::from_string(m_writer_config.remote_ip),
338 : m_writer_config.remote_port);
339 : const auto bytes_sent = co_await m_socket->async_send_to(
340 : boost::asio::buffer(payload.data, payload.size), receiver_endpoint, boost::asio::use_awaitable);
341 : ++m_writer_config.socket_stats->num_payloads;
342 : ++m_writer_config.socket_stats->sum_payloads;
343 : m_writer_config.socket_stats->sum_bytes.fetch_add(bytes_sent);
344 : ++m_writer_config.socket_stats->stats_packet_count;
345 0 : }
346 :
347 : void
348 0 : SocketWriterModule::UDPWriter::stop()
349 : {
350 0 : m_socket->close();
351 0 : TLOG() << "Closed UDP socket";
352 0 : }
353 :
354 : } // namespace dunedaq::asiolibs
355 :
356 0 : DEFINE_DUNE_DAQ_MODULE(dunedaq::asiolibs::SocketWriterModule)
|