Line data Source code
1 : /**
2 : * @file SocketReaderModule.cpp Boost.Asio-based socket reader plugin for low-bandwidth devices
3 : *
4 : * This is part of the DUNE DAQ , copyright 2020.
5 : * Licensing/copyright details are in the COPYING file that you should have
6 : * received with this code.
7 : */
8 :
9 : #include "SocketReaderModule.hpp"
10 :
11 : #include "CreateSource.hpp"
12 :
13 : #include "appmodel/DataReaderModule.hpp"
14 : #include "appmodel/NetworkDetectorToDaqConnection.hpp"
15 : #include "appmodel/SocketDataSender.hpp"
16 : #include "appmodel/NWDetDataSender.hpp"
17 : #include "appmodel/SocketReaderConf.hpp"
18 : #include "appmodel/SocketReceiver.hpp"
19 : #include "confmodel/DetectorStream.hpp"
20 : #include "confmodel/GeoId.hpp"
21 : #include "confmodel/QueueWithSourceId.hpp"
22 :
23 : #include "datahandlinglibs/DataHandlingIssues.hpp"
24 :
25 : #include "asiolibs/opmon/SocketReaderModule.pb.h"
26 :
27 : #include <string>
28 : #include <vector>
29 : #include <memory>
30 : #include <utility>
31 :
32 : namespace dunedaq::asiolibs {
33 :
34 0 : SocketReaderModule::SocketReaderModule(const std::string& name)
35 : : DAQModule(name)
36 0 : , m_work_guard(boost::asio::make_work_guard(m_io_context))
37 : {
38 0 : register_command("conf", &SocketReaderModule::do_configure);
39 0 : register_command("start", &SocketReaderModule::do_start);
40 0 : register_command("stop_trigger_sources", &SocketReaderModule::do_stop);
41 0 : }
42 :
43 : inline void
44 0 : tokenize(std::string const& str, const char delim, std::vector<std::string>& out)
45 : {
46 0 : std::size_t start;
47 0 : std::size_t end = 0;
48 0 : while ((start = str.find_first_not_of(delim, end)) != std::string::npos) {
49 0 : end = str.find(delim, start);
50 0 : out.push_back(str.substr(start, end - start));
51 : }
52 0 : }
53 :
54 : void
55 0 : SocketReaderModule::init(const std::shared_ptr<appfwk::ConfigurationManager> mcfg)
56 : {
57 0 : m_cfg = mcfg;
58 0 : auto* mdal = m_cfg->get_dal<appmodel::DataReaderModule>(get_name());
59 0 : auto* module_conf = mdal->get_configuration()->cast<appmodel::SocketReaderConf>();
60 :
61 0 : const auto local_ip = module_conf->get_local_ip();
62 :
63 0 : m_socket_type = string_to_socket_type(module_conf->get_socket_type());
64 0 : if (m_socket_type != SocketType::TCP && m_socket_type != SocketType::UDP) {
65 0 : throw std::invalid_argument("Error: Only TCP and UDP are allowed!");
66 : }
67 :
68 0 : std::vector<const appmodel::NetworkDetectorToDaqConnection*> d2d_conns;
69 0 : for (auto* connection : mdal->get_connections()) {
70 :
71 0 : if (connection->is_disabled(*(m_cfg->get_session()))) {
72 0 : continue;
73 : }
74 :
75 0 : auto net_connection = connection->cast<appmodel::NetworkDetectorToDaqConnection>();
76 0 : if (net_connection == nullptr) {
77 0 : throw dunedaq::datahandlinglibs::InitializationError(
78 0 : ERS_HERE,
79 0 : fmt::format("Found connection {} of type {} while expecting type NetworkDetectorToDaqConnection",
80 0 : connection->UID(),
81 0 : connection->class_name()));
82 : }
83 0 : d2d_conns.push_back(net_connection);
84 : }
85 :
86 0 : for (auto* d2d_conn : d2d_conns) {
87 0 : for (auto* sender : d2d_conn->get_net_senders()) {
88 0 : auto* socket_sender = sender->cast<appmodel::SocketDataSender>();
89 :
90 0 : if (!socket_sender) {
91 0 : throw dunedaq::datahandlinglibs::InitializationError(
92 0 : ERS_HERE,
93 0 : fmt::format("Found {} of type {} in connection {} while expecting type SocketDataSender",
94 0 : sender->UID(),
95 0 : sender->class_name(),
96 0 : d2d_conn->UID()));
97 : }
98 :
99 0 : if (socket_sender->is_disabled(*(m_cfg->get_session()))) {
100 0 : continue;
101 : }
102 :
103 0 : if (socket_sender->get_streams().size() > 1) {
104 0 : dunedaq::datahandlinglibs::GenericConfigurationError err(ERS_HERE,
105 0 : "Multiple streams currently are not supported!");
106 0 : ers::fatal(err);
107 0 : throw err;
108 0 : }
109 :
110 0 : for (auto* det_stream : socket_sender->get_streams()) {
111 0 : m_reader_configs.emplace_back(
112 0 : local_ip, socket_sender->get_port(), det_stream->get_source_id(), std::make_shared<SocketStats>());
113 : }
114 : }
115 : }
116 :
117 0 : m_readers.reserve(m_reader_configs.size());
118 0 : if (m_socket_type == SocketType::TCP) {
119 0 : for (std::size_t i = 0; i < m_reader_configs.size(); ++i) {
120 0 : m_readers.emplace_back(TCPReader());
121 : }
122 : } else {
123 0 : for (std::size_t i = 0; i < m_reader_configs.size(); ++i) {
124 0 : m_readers.emplace_back(UDPReader());
125 : }
126 : }
127 :
128 0 : if (mdal->get_outputs().empty()) {
129 0 : auto err = dunedaq::datahandlinglibs::InitializationError(ERS_HERE,
130 0 : "No outputs defined for socket reader in configuration.");
131 0 : ers::fatal(err);
132 0 : throw err;
133 0 : }
134 :
135 0 : for (auto* con : mdal->get_outputs()) {
136 0 : auto* queue = con->cast<confmodel::QueueWithSourceId>();
137 0 : if (queue == nullptr) {
138 0 : auto err = dunedaq::datahandlinglibs::InitializationError(ERS_HERE, "Outputs are not of type QueueWithGeoId.");
139 0 : ers::fatal(err);
140 0 : throw err;
141 0 : }
142 :
143 : // Check for CB prefix indicating Callback use
144 0 : const char delim = '_';
145 0 : const std::string target = queue->UID();
146 0 : std::vector<std::string> words;
147 0 : tokenize(target, delim, words);
148 :
149 0 : bool callback_mode = false;
150 0 : if (words.front() == "cb") {
151 : callback_mode = true;
152 : }
153 :
154 0 : auto ptr = m_sources[queue->get_source_id()] = createSourceModel(queue->UID(), callback_mode);
155 0 : register_node(queue->UID(), ptr);
156 0 : }
157 0 : }
158 :
159 : SocketReaderModule::SocketType
160 0 : SocketReaderModule::string_to_socket_type(const std::string& socket_type) const
161 : {
162 0 : if (socket_type == "TCP") {
163 : return SocketReaderModule::SocketType::TCP;
164 0 : } else if (socket_type == "UDP") {
165 0 : return SocketReaderModule::SocketType::UDP;
166 : }
167 : return SocketReaderModule::SocketType::INVALID;
168 : }
169 :
170 : void
171 0 : SocketReaderModule::do_configure(const CommandData_t&)
172 : {
173 0 : for (std::size_t i = 0; i < m_readers.size(); ++i) {
174 0 : const auto reader_config = m_reader_configs[i];
175 0 : std::visit([this, reader_config](auto& reader) { reader.configure(m_io_context, reader_config); }, m_readers[i]);
176 0 : }
177 0 : }
178 :
179 : void
180 0 : SocketReaderModule::do_start(const CommandData_t&)
181 : {
182 : // Setup callbacks on all sourcemodels
183 0 : for (auto& [sourceid, source] : m_sources) {
184 0 : source->acquire_callback();
185 : }
186 :
187 0 : m_io_thread = std::jthread([this] { m_io_context.run(); });
188 :
189 0 : for (auto& reader : m_readers) {
190 0 : boost::asio::co_spawn(m_io_context,
191 0 : std::visit([this](auto& reader) { return reader.start(m_sources); }, reader),
192 : boost::asio::detached);
193 : }
194 0 : }
195 :
196 : void
197 0 : SocketReaderModule::do_stop(const CommandData_t&)
198 : {
199 0 : for (auto& reader : m_readers) {
200 0 : std::visit([](auto& reader) { reader.stop(); }, reader);
201 : }
202 :
203 0 : m_work_guard.reset();
204 0 : }
205 :
206 : void
207 0 : SocketReaderModule::generate_opmon_data()
208 : {
209 0 : for (const auto& reader_config : m_reader_configs) {
210 0 : opmon::SocketReaderStats stats;
211 0 : stats.set_packets_received(reader_config.socket_stats->packets_received.load());
212 0 : stats.set_bytes_received(reader_config.socket_stats->bytes_received.load());
213 0 : publish(std::move(stats), { { "socket-reader", std::to_string(reader_config.local_port) } });
214 0 : }
215 0 : }
216 :
217 : void
218 0 : SocketReaderModule::TCPReader::configure(boost::asio::io_context& io_context, const ReaderConfig& reader_config)
219 : {
220 0 : m_source_id = reader_config.source_id;
221 0 : m_socket_stats = reader_config.socket_stats;
222 :
223 0 : m_socket = std::make_unique<boost::asio::ip::tcp::socket>(io_context);
224 :
225 0 : boost::asio::ip::tcp::acceptor acceptor(
226 : io_context,
227 0 : boost::asio::ip::tcp::endpoint(boost::asio::ip::address::from_string(reader_config.local_ip),
228 0 : reader_config.local_port));
229 :
230 0 : TLOG() << "Waiting for TCP connection at " << reader_config.local_ip << ":" << reader_config.local_port;
231 :
232 0 : acceptor.accept(*m_socket);
233 :
234 0 : TLOG() << "Established TCP connection from " << m_socket->remote_endpoint().address() << ":"
235 0 : << m_socket->remote_endpoint().port();
236 0 : }
237 :
238 : boost::asio::awaitable<void>
239 0 : SocketReaderModule::TCPReader::start(const sid_to_source_map_t& sources)
240 : {
241 : // FIXME (DTE): Just pass the relevant source instead of all sources
242 : const auto src_it = sources.find(m_source_id);
243 : if (src_it == sources.end()) {
244 0 : TLOG() << "Unexpected source ID! (" << m_source_id << ")";
245 : co_return;
246 : }
247 :
248 : const auto buffer_size = src_it->second->get_target_payload_size();
249 : std::vector<char> buffer(buffer_size);
250 :
251 : while (m_socket->is_open()) {
252 : const auto bytes_received =
253 : co_await boost::asio::async_read(*m_socket,
254 : boost::asio::buffer(buffer),
255 : boost::asio::use_awaitable);
256 : ++m_socket_stats->packets_received;
257 : m_socket_stats->bytes_received.fetch_add(bytes_received);
258 : src_it->second->handle_payload(buffer.data(), bytes_received);
259 : }
260 0 : }
261 :
262 : void
263 0 : SocketReaderModule::TCPReader::stop()
264 : {
265 0 : m_socket->shutdown(boost::asio::ip::tcp::socket::shutdown_both);
266 0 : m_socket->close();
267 0 : TLOG() << "Shutdown TCP connection";
268 0 : }
269 :
270 : void
271 0 : SocketReaderModule::UDPReader::configure(boost::asio::io_context& io_context, const ReaderConfig& reader_config)
272 : {
273 0 : m_source_id = reader_config.source_id;
274 0 : m_socket_stats = reader_config.socket_stats;
275 :
276 0 : const auto receiver_endpoint = boost::asio::ip::udp::endpoint(
277 0 : boost::asio::ip::address::from_string(reader_config.local_ip), reader_config.local_port);
278 0 : m_socket = std::make_unique<boost::asio::ip::udp::socket>(io_context, receiver_endpoint);
279 :
280 0 : TLOG() << "Created UDP socket on " << reader_config.local_ip << ":" << reader_config.local_port;
281 0 : }
282 :
283 : boost::asio::awaitable<void>
284 0 : SocketReaderModule::UDPReader::start(const sid_to_source_map_t& sources)
285 : {
286 : const auto src_it = sources.find(m_source_id);
287 : if (src_it == sources.end()) {
288 0 : TLOG() << "Unexpected source ID! (" << m_source_id << ")";
289 : co_return;
290 : }
291 :
292 : const auto buffer_size = src_it->second->get_target_payload_size();
293 : std::vector<char> buffer(buffer_size);
294 : boost::asio::ip::udp::endpoint sender_endpoint;
295 :
296 : while (m_socket->is_open()) {
297 : std::size_t bytes_received = co_await m_socket->async_receive_from(
298 : boost::asio::buffer(buffer), sender_endpoint, boost::asio::use_awaitable);
299 :
300 : ++m_socket_stats->packets_received;
301 : m_socket_stats->bytes_received.fetch_add(bytes_received);
302 :
303 : if (bytes_received == buffer_size) [[likely]] {
304 : src_it->second->handle_payload(buffer.data(), bytes_received);
305 : } else {
306 0 : TLOG() << "Payload is smaller than " << buffer_size << " (" << bytes_received << ")";
307 : }
308 : }
309 0 : }
310 :
311 : void
312 0 : SocketReaderModule::UDPReader::stop()
313 : {
314 0 : m_socket->close();
315 0 : TLOG() << "Closed UDP socket";
316 0 : }
317 :
318 : } // namespace dunedaq::asiolibs
319 :
320 0 : DEFINE_DUNE_DAQ_MODULE(dunedaq::asiolibs::SocketReaderModule)
|