Line data Source code
1 : /**
2 : * @file SocketReaderModule.hpp Boost.Asio-based socket reader plugin for low-bandwidth devices
3 : *
4 : * This is part of the DUNE DAQ , copyright 2020.
5 : * Licensing/copyright details are in the COPYING file that you should have
6 : * received with this code.
7 : */
8 : #ifndef ASIOLIBS_PLUGINS_SOCKETREADERMODULE_HPP_
9 : #define ASIOLIBS_PLUGINS_SOCKETREADERMODULE_HPP_
10 :
11 : #include "appfwk/DAQModule.hpp"
12 :
13 : #include <boost/array.hpp>
14 : #include <boost/asio.hpp>
15 :
16 : #include <map>
17 : #include <memory>
18 : #include <string>
19 : #include <vector>
20 :
21 : namespace dunedaq::asiolibs {
22 :
23 : class SourceConcept;
24 :
25 : using sid_to_source_map_t = std::map<int, std::shared_ptr<SourceConcept>>;
26 :
27 : class SocketReaderModule : public dunedaq::appfwk::DAQModule
28 : {
29 : public:
30 : /**
31 : * @brief SocketReaderModule constructor
32 : * @param name DAQ module instance name
33 : */
34 : explicit SocketReaderModule(const std::string& name);
35 0 : ~SocketReaderModule() = default;
36 :
37 : SocketReaderModule(const SocketReaderModule&) = delete; ///< SocketReaderModule is not copy-constructible
38 : SocketReaderModule& operator=(const SocketReaderModule&) = delete; ///< SocketReaderModule is not copy-assignable
39 : SocketReaderModule(SocketReaderModule&&) = delete; ///< SocketReaderModule is not move-constructible
40 : SocketReaderModule& operator=(SocketReaderModule&&) = delete; ///< SocketReaderModule is not move-assignable
41 :
42 : /**
43 : * @brief Handles initialization on boot
44 : * @param mcfg DAQ configuration data
45 : */
46 : void init(const std::shared_ptr<appfwk::ConfigurationManager> mcfg) override;
47 :
48 : private:
49 : enum class SocketType
50 : {
51 : TCP,
52 : UDP,
53 : INVALID
54 : };
55 :
56 : struct SocketStats
57 : {
58 : /**
59 : * @brief Received packets
60 : */
61 : std::atomic<uint64_t> packets_received{ 0 };
62 :
63 : /**
64 : * @brief Received bytes
65 : */
66 : std::atomic<uint64_t> bytes_received{ 0 };
67 : };
68 :
69 : struct ReaderConfig
70 : {
71 : /**
72 : * @brief Source IP address
73 : */
74 : std::string local_ip;
75 :
76 : /**
77 : * @brief Source port number
78 : */
79 : ushort local_port;
80 :
81 : /**
82 : * @brief Detector stream source ID
83 : */
84 : uint source_id;
85 :
86 : /**
87 : * @brief Statistics of socket traffic
88 : */
89 : std::shared_ptr<SocketStats> socket_stats;
90 : };
91 :
92 : class TCPReader
93 : {
94 : public:
95 : /**
96 : * @brief Asynchronously creates and connects a TCP socket
97 : * @param io_context I/O context for socket creation
98 : * @param reader_config TCP reader configuration
99 : * @throws boost::system::system_error on failure
100 : */
101 : void configure(boost::asio::io_context& io_context, const ReaderConfig& reader_config);
102 :
103 : /**
104 : * @brief Asynchronously receives payloads from the socket in a loop
105 : * @param sources Data sources
106 : * @return Coroutine handle
107 : */
108 : boost::asio::awaitable<void> start(const sid_to_source_map_t& sources);
109 :
110 : /**
111 : * @brief Closes the socket
112 : */
113 : void stop();
114 :
115 : private:
116 : /**
117 : * @brief TCP socket
118 : */
119 : std::unique_ptr<boost::asio::ip::tcp::socket> m_socket;
120 :
121 : /**
122 : * @brief Detector stream source ID
123 : */
124 : uint m_source_id;
125 :
126 : /**
127 : * @brief Statistics of socket traffic
128 : */
129 : std::shared_ptr<SocketStats> m_socket_stats;
130 : };
131 :
132 : class UDPReader
133 : {
134 : public:
135 : /**
136 : * @brief Creates a UDP socket
137 : * @param io_context I/O context for socket creation
138 : * @param reader_config UDP reader configuration
139 : */
140 : void configure(boost::asio::io_context& io_context, const ReaderConfig& reader_config);
141 :
142 : /**
143 : * @brief Asynchronously receives payloads from the socket in a loop
144 : * @param sources Data sources
145 : * @return Coroutine handle
146 : */
147 : boost::asio::awaitable<void> start(const sid_to_source_map_t& sources);
148 :
149 : /**
150 : * @brief Closes the socket
151 : */
152 : void stop();
153 :
154 : private:
155 : /**
156 : * @brief UDP socket
157 : */
158 : std::unique_ptr<boost::asio::ip::udp::socket> m_socket;
159 :
160 : /**
161 : * @brief Detector stream source ID
162 : */
163 : uint m_source_id;
164 :
165 : /**
166 : * @brief Statistics of socket traffic
167 : */
168 : std::shared_ptr<SocketStats> m_socket_stats;
169 : };
170 :
171 : // Commands
172 : void do_configure(const CommandData_t&);
173 : void do_start(const CommandData_t&);
174 : void do_stop(const CommandData_t&);
175 :
176 : void generate_opmon_data() override;
177 :
178 : /**
179 : * @brief Converts a socket type string to an enum
180 : * @param socket_type Socket type as a string
181 : * @return Corresponding SocketType enum
182 : */
183 : SocketType string_to_socket_type(const std::string& socket_type) const;
184 :
185 : /**
186 : * @brief I/O context for socket operations
187 : */
188 : boost::asio::io_context m_io_context;
189 :
190 : /**
191 : * @brief Prevents I/O context from exiting prematurely
192 : */
193 : boost::asio::executor_work_guard<boost::asio::io_context::executor_type> m_work_guard;
194 :
195 : /**
196 : * @brief Socket readers
197 : */
198 : std::vector<std::variant<TCPReader, UDPReader>> m_readers;
199 :
200 : /**
201 : * @brief Background thread to keep the I/O context running
202 : */
203 : std::jthread m_io_thread;
204 :
205 : /**
206 : * @brief Type of socket
207 : */
208 : SocketType m_socket_type{ SocketType::INVALID };
209 :
210 : /**
211 : * @brief Socket reader configurations
212 : */
213 : std::vector<ReaderConfig> m_reader_configs;
214 :
215 : // Internals
216 : /**
217 : * @brief DAQ configuration data
218 : */
219 : std::shared_ptr<appfwk::ConfigurationManager> m_cfg;
220 :
221 : // Sinks (SourceConcepts)
222 : /**
223 : * @brief Data sources
224 : */
225 : sid_to_source_map_t m_sources;
226 : };
227 :
228 : } // namespace dunedaq::asiolibs
229 :
230 : #endif // ASIOLIBS_PLUGINS_SOCKETREADERMODULE_HPP_
|