Line data Source code
1 : /**
2 : * @file SocketReaderModule.cpp Boost.Asio-based socket reader plugin for low-bandwidth devices
3 : *
4 : * This is part of the DUNE DAQ , copyright 2020.
5 : * Licensing/copyright details are in the COPYING file that you should have
6 : * received with this code.
7 : */
8 :
9 : #include "SocketReaderModule.hpp"
10 :
11 : #include "CreateSource.hpp"
12 :
13 : #include "appmodel/DataReaderModule.hpp"
14 : #include "appmodel/NetworkDetectorToDaqConnection.hpp"
15 : #include "appmodel/SocketDataSender.hpp"
16 : #include "appmodel/NWDetDataSender.hpp"
17 : #include "appmodel/SocketReaderConf.hpp"
18 : #include "appmodel/SocketReceiver.hpp"
19 : #include "confmodel/DetectorStream.hpp"
20 : #include "confmodel/GeoId.hpp"
21 : #include "confmodel/QueueWithSourceId.hpp"
22 :
23 : #include "datahandlinglibs/DataHandlingIssues.hpp"
24 :
25 : #include "asiolibs/opmon/SocketReaderModule.pb.h"
26 :
27 : #include <string>
28 : #include <vector>
29 : #include <memory>
30 : #include <utility>
31 :
32 : namespace dunedaq::asiolibs {
33 :
34 0 : SocketReaderModule::SocketReaderModule(const std::string& name)
35 : : DAQModule(name)
36 0 : , m_work_guard(boost::asio::make_work_guard(m_io_context))
37 : {
38 0 : register_command("conf", &SocketReaderModule::do_configure);
39 0 : register_command("start", &SocketReaderModule::do_start);
40 0 : register_command("stop_trigger_sources", &SocketReaderModule::do_stop);
41 0 : }
42 :
43 : inline void
44 : tokenize(std::string const& str, const char delim, std::vector<std::string>& out)
45 : {
46 : std::size_t start;
47 : std::size_t end = 0;
48 : while ((start = str.find_first_not_of(delim, end)) != std::string::npos) {
49 : end = str.find(delim, start);
50 : out.push_back(str.substr(start, end - start));
51 : }
52 : }
53 :
54 : void
55 0 : SocketReaderModule::init(const std::shared_ptr<appfwk::ConfigurationManager> mcfg)
56 : {
57 0 : m_cfg = mcfg;
58 0 : auto* mdal = m_cfg->get_dal<appmodel::DataReaderModule>(get_name());
59 0 : auto* module_conf = mdal->get_configuration()->cast<appmodel::SocketReaderConf>();
60 :
61 0 : const auto local_ip = module_conf->get_local_ip();
62 :
63 0 : m_socket_type = string_to_socket_type(module_conf->get_socket_type());
64 0 : if (m_socket_type != SocketType::TCP && m_socket_type != SocketType::UDP) {
65 0 : throw std::invalid_argument("Error: Only TCP and UDP are allowed!");
66 : }
67 :
68 0 : std::vector<const appmodel::NetworkDetectorToDaqConnection*> d2d_conns;
69 0 : for (auto* connection : mdal->get_connections()) {
70 :
71 0 : if (connection->is_disabled(*(m_cfg->get_session()))) {
72 0 : continue;
73 : }
74 :
75 0 : auto net_connection = connection->cast<appmodel::NetworkDetectorToDaqConnection>();
76 0 : if (net_connection == nullptr) {
77 0 : throw dunedaq::datahandlinglibs::InitializationError(
78 0 : ERS_HERE,
79 0 : fmt::format("Found connection {} of type {} while expecting type NetworkDetectorToDaqConnection",
80 0 : connection->UID(),
81 0 : connection->class_name()));
82 : }
83 0 : d2d_conns.push_back(net_connection);
84 : }
85 :
86 0 : for (auto* d2d_conn : d2d_conns) {
87 0 : for (auto* sender : d2d_conn->get_net_senders()) {
88 0 : auto* socket_sender = sender->cast<appmodel::SocketDataSender>();
89 :
90 0 : if (!socket_sender) {
91 0 : throw dunedaq::datahandlinglibs::InitializationError(
92 0 : ERS_HERE,
93 0 : fmt::format("Found {} of type {} in connection {} while expecting type SocketDataSender",
94 0 : sender->UID(),
95 0 : sender->class_name(),
96 0 : d2d_conn->UID()));
97 : }
98 :
99 0 : if (socket_sender->is_disabled(*(m_cfg->get_session()))) {
100 0 : continue;
101 : }
102 :
103 0 : if (socket_sender->get_streams().size() > 1) {
104 0 : dunedaq::datahandlinglibs::GenericConfigurationError err(ERS_HERE,
105 0 : "Multiple streams currently are not supported!");
106 0 : ers::fatal(err);
107 0 : throw err;
108 0 : }
109 :
110 0 : for (auto* det_stream : socket_sender->get_streams()) {
111 0 : m_reader_configs.emplace_back(
112 0 : local_ip, socket_sender->get_port(), det_stream->get_source_id(), std::make_shared<SocketStats>());
113 : }
114 : }
115 : }
116 :
117 0 : m_readers.reserve(m_reader_configs.size());
118 0 : if (m_socket_type == SocketType::TCP) {
119 0 : for (std::size_t i = 0; i < m_reader_configs.size(); ++i) {
120 0 : m_readers.emplace_back(TCPReader());
121 : }
122 : } else {
123 0 : for (std::size_t i = 0; i < m_reader_configs.size(); ++i) {
124 0 : m_readers.emplace_back(UDPReader());
125 : }
126 : }
127 :
128 0 : auto callback_confs = mdal->get_raw_data_callbacks();
129 0 : for (auto* callback_conf : callback_confs) {
130 :
131 0 : auto ptr = m_sources[callback_conf->get_source_id()] = createSourceModel(callback_conf);
132 0 : register_node(callback_conf->UID(), ptr);
133 0 : }
134 0 : }
135 :
136 : SocketReaderModule::SocketType
137 0 : SocketReaderModule::string_to_socket_type(const std::string& socket_type) const
138 : {
139 0 : if (socket_type == "TCP") {
140 : return SocketReaderModule::SocketType::TCP;
141 0 : } else if (socket_type == "UDP") {
142 0 : return SocketReaderModule::SocketType::UDP;
143 : }
144 : return SocketReaderModule::SocketType::INVALID;
145 : }
146 :
147 : void
148 0 : SocketReaderModule::do_configure(const CommandData_t&)
149 : {
150 0 : for (std::size_t i = 0; i < m_readers.size(); ++i) {
151 0 : const auto reader_config = m_reader_configs[i];
152 0 : std::visit([this, reader_config](auto& reader) { reader.configure(m_io_context, reader_config); }, m_readers[i]);
153 0 : }
154 0 : }
155 :
156 : void
157 0 : SocketReaderModule::do_start(const CommandData_t&)
158 : {
159 : // Setup callbacks on all sourcemodels
160 0 : for (auto& [sourceid, source] : m_sources) {
161 0 : source->acquire_callback();
162 : }
163 :
164 0 : m_io_thread = std::jthread([this] { m_io_context.run(); });
165 :
166 0 : for (auto& reader : m_readers) {
167 0 : boost::asio::co_spawn(m_io_context,
168 0 : std::visit([this](auto& reader) { return reader.start(m_sources); }, reader),
169 : boost::asio::detached);
170 : }
171 0 : }
172 :
173 : void
174 0 : SocketReaderModule::do_stop(const CommandData_t&)
175 : {
176 0 : for (auto& reader : m_readers) {
177 0 : std::visit([](auto& reader) { reader.stop(); }, reader);
178 : }
179 :
180 0 : m_work_guard.reset();
181 0 : }
182 :
183 : void
184 0 : SocketReaderModule::generate_opmon_data()
185 : {
186 0 : for (const auto& reader_config : m_reader_configs) {
187 0 : opmon::SocketReaderStats stats;
188 0 : stats.set_packets_received(reader_config.socket_stats->packets_received.load());
189 0 : stats.set_bytes_received(reader_config.socket_stats->bytes_received.load());
190 0 : publish(std::move(stats), { { "socket-reader", std::to_string(reader_config.local_port) } });
191 0 : }
192 0 : }
193 :
194 : void
195 0 : SocketReaderModule::TCPReader::configure(boost::asio::io_context& io_context, const ReaderConfig& reader_config)
196 : {
197 0 : m_source_id = reader_config.source_id;
198 0 : m_socket_stats = reader_config.socket_stats;
199 :
200 0 : m_socket = std::make_unique<boost::asio::ip::tcp::socket>(io_context);
201 :
202 0 : boost::asio::ip::tcp::acceptor acceptor(
203 : io_context,
204 0 : boost::asio::ip::tcp::endpoint(boost::asio::ip::address::from_string(reader_config.local_ip),
205 0 : reader_config.local_port));
206 :
207 0 : TLOG() << "Waiting for TCP connection at " << reader_config.local_ip << ":" << reader_config.local_port;
208 :
209 0 : acceptor.accept(*m_socket);
210 :
211 0 : TLOG() << "Established TCP connection from " << m_socket->remote_endpoint().address() << ":"
212 0 : << m_socket->remote_endpoint().port();
213 0 : }
214 :
215 : boost::asio::awaitable<void>
216 0 : SocketReaderModule::TCPReader::start(const sid_to_source_map_t& sources)
217 : {
218 : // FIXME (DTE): Just pass the relevant source instead of all sources
219 : const auto src_it = sources.find(m_source_id);
220 : if (src_it == sources.end()) {
221 0 : TLOG() << "Unexpected source ID! (" << m_source_id << ")";
222 : co_return;
223 : }
224 :
225 : const auto buffer_size = src_it->second->get_target_payload_size();
226 : std::vector<char> buffer(buffer_size);
227 :
228 : while (m_socket->is_open()) {
229 : const auto bytes_received =
230 : co_await boost::asio::async_read(*m_socket,
231 : boost::asio::buffer(buffer),
232 : boost::asio::use_awaitable);
233 : ++m_socket_stats->packets_received;
234 : m_socket_stats->bytes_received.fetch_add(bytes_received);
235 : src_it->second->handle_payload(buffer.data(), bytes_received);
236 : }
237 0 : }
238 :
239 : void
240 0 : SocketReaderModule::TCPReader::stop()
241 : {
242 0 : m_socket->shutdown(boost::asio::ip::tcp::socket::shutdown_both);
243 0 : m_socket->close();
244 0 : TLOG() << "Shutdown TCP connection";
245 0 : }
246 :
247 : void
248 0 : SocketReaderModule::UDPReader::configure(boost::asio::io_context& io_context, const ReaderConfig& reader_config)
249 : {
250 0 : m_source_id = reader_config.source_id;
251 0 : m_socket_stats = reader_config.socket_stats;
252 :
253 0 : const auto receiver_endpoint = boost::asio::ip::udp::endpoint(
254 0 : boost::asio::ip::address::from_string(reader_config.local_ip), reader_config.local_port);
255 0 : m_socket = std::make_unique<boost::asio::ip::udp::socket>(io_context, receiver_endpoint);
256 :
257 0 : TLOG() << "Created UDP socket on " << reader_config.local_ip << ":" << reader_config.local_port;
258 0 : }
259 :
260 : boost::asio::awaitable<void>
261 0 : SocketReaderModule::UDPReader::start(const sid_to_source_map_t& sources)
262 : {
263 : const auto src_it = sources.find(m_source_id);
264 : if (src_it == sources.end()) {
265 0 : TLOG() << "Unexpected source ID! (" << m_source_id << ")";
266 : co_return;
267 : }
268 :
269 : const auto buffer_size = src_it->second->get_target_payload_size();
270 : std::vector<char> buffer(buffer_size);
271 : boost::asio::ip::udp::endpoint sender_endpoint;
272 :
273 : while (m_socket->is_open()) {
274 : std::size_t bytes_received = co_await m_socket->async_receive_from(
275 : boost::asio::buffer(buffer), sender_endpoint, boost::asio::use_awaitable);
276 :
277 : ++m_socket_stats->packets_received;
278 : m_socket_stats->bytes_received.fetch_add(bytes_received);
279 :
280 : if (bytes_received == buffer_size) [[likely]] {
281 : src_it->second->handle_payload(buffer.data(), bytes_received);
282 : } else {
283 0 : TLOG() << "Payload is smaller than " << buffer_size << " (" << bytes_received << ")";
284 : }
285 : }
286 0 : }
287 :
288 : void
289 0 : SocketReaderModule::UDPReader::stop()
290 : {
291 0 : m_socket->close();
292 0 : TLOG() << "Closed UDP socket";
293 0 : }
294 :
295 : } // namespace dunedaq::asiolibs
296 :
297 0 : DEFINE_DUNE_DAQ_MODULE(dunedaq::asiolibs::SocketReaderModule)
|