Line data Source code
1 : /**
2 : * @file SocketWriterModule.hpp Boost.Asio-based socket writer plugin for low-bandwidth devices
3 : *
4 : * This is part of the DUNE DAQ , copyright 2020.
5 : * Licensing/copyright details are in the COPYING file that you should have
6 : * received with this code.
7 : */
8 : #ifndef ASIOLIBS_PLUGINS_SOCKETWRITERMODULE_HPP_
9 : #define ASIOLIBS_PLUGINS_SOCKETWRITERMODULE_HPP_
10 :
11 : #include "GenericReceiverConcept.hpp"
12 :
13 : #include "appfwk/DAQModule.hpp"
14 : #include "utilities/ReusableThread.hpp"
15 :
16 : #include "appmodel/SocketDataWriterModule.hpp"
17 :
18 : #include <boost/asio.hpp>
19 :
20 : #include <string>
21 : #include <memory>
22 : #include <vector>
23 :
24 : namespace dunedaq::asiolibs {
25 :
26 : class ConfigurationManager;
27 : class SocketDataWriterModule;
28 :
29 : class SocketWriterModule : public dunedaq::appfwk::DAQModule
30 : {
31 : public:
32 : /**
33 : * @brief Default raw data receiver timeout in ms
34 : */
35 : static constexpr auto raw_receiver_timeout_ms = 10;
36 :
37 : /**
38 : * @brief SocketWriterModule constructor
39 : * @param name DAQ module instance name
40 : */
41 : explicit SocketWriterModule(const std::string& name);
42 0 : ~SocketWriterModule() = default;
43 :
44 : SocketWriterModule(const SocketWriterModule&) = delete; ///< SocketWriterModule is not copy-constructible
45 : SocketWriterModule& operator=(const SocketWriterModule&) =
46 : delete; ///< SocketWriterModule is not copy-assignable
47 : SocketWriterModule(SocketWriterModule&&) = delete; ///< SocketWriterModule is not move-constructible
48 : SocketWriterModule& operator=(SocketWriterModule&&) =
49 : delete; ///< SocketWriterModule is not move-assignable
50 :
51 : /**
52 : * @brief Handles initialization on boot
53 : * @param mcfg DAQ configuration data
54 : */
55 : void init(const std::shared_ptr<appfwk::ConfigurationManager> mcfg) override;
56 :
57 : private:
58 : enum class SocketType
59 : {
60 : TCP,
61 : UDP,
62 : INVALID
63 : };
64 :
65 : struct SocketStats
66 : {
67 : /**
68 : * @brief Total number of received payloads
69 : */
70 : std::atomic<uint64_t> sum_payloads{ 0 };
71 :
72 : /**
73 : * @brief Incremental number of received payloads
74 : */
75 : std::atomic<uint64_t> num_payloads{ 0 };
76 :
77 : /**
78 : * @brief Total number of received bytes
79 : */
80 : std::atomic<uint64_t> sum_bytes{ 0 };
81 :
82 : /**
83 : * @brief Timeout on data inputs
84 : */
85 : std::atomic<uint64_t> rawq_timeout_count{ 0 };
86 :
87 : /**
88 : * @brief Rate of consumed packets
89 : */
90 : std::atomic<double> rate_payloads_consumed{ 0 };
91 :
92 : /**
93 : * @brief Counts packets since last opmon data generation
94 : */
95 : std::atomic<int> stats_packet_count{ 0 };
96 : };
97 :
98 : struct WriterConfig
99 : {
100 : /**
101 : * @brief Destination IP address
102 : */
103 : std::string remote_ip;
104 :
105 : /**
106 : * @brief Destination port number
107 : */
108 : ushort remote_port;
109 :
110 : /**
111 : * @brief Statistics of socket traffic
112 : */
113 : std::shared_ptr<SocketStats> socket_stats;
114 : };
115 :
116 : class TCPWriter
117 : {
118 : public:
119 : /**
120 : * @brief Creates and connects a TCP socket
121 : * @param io_context I/O context for socket creation
122 : * @param writer_config TCP writer configuration
123 : * @throws boost::system::system_error on failure
124 : */
125 : void configure(boost::asio::io_context& io_context, const WriterConfig& writer_config);
126 :
127 : /**
128 : * @brief Asynchronously sends payloads to the socket in a loop
129 : * @param payload Payload to send
130 : * @return Coroutine handle
131 : */
132 : boost::asio::awaitable<void> start(GenericReceiverConcept::TypeErasedPayload payload);
133 :
134 : /**
135 : * @brief Closes the socket
136 : */
137 : void stop();
138 :
139 : private:
140 : /**
141 : * @brief TCP socket
142 : */
143 : std::unique_ptr<boost::asio::ip::tcp::socket> m_socket;
144 :
145 : /**
146 : * @brief Statistics of socket traffic
147 : */
148 : std::shared_ptr<SocketStats> m_socket_stats;
149 : };
150 :
151 : class UDPWriter
152 : {
153 : public:
154 : /**
155 : * @brief Creates a UDP socket
156 : * @param io_context I/O context for socket creation
157 : * @param writer_config UDP writer configuration
158 : */
159 : void configure(boost::asio::io_context& io_context, const WriterConfig& writer_config);
160 :
161 : /**
162 : * @brief Asynchronously sends payloads to the socket in a loop
163 : * @param payload Payload to send
164 : * @return Coroutine handle
165 : */
166 : boost::asio::awaitable<void> start(GenericReceiverConcept::TypeErasedPayload payload);
167 :
168 : /**
169 : * @brief Closes the socket
170 : */
171 : void stop();
172 :
173 : private:
174 : /**
175 : * @brief UDP socket
176 : */
177 : std::unique_ptr<boost::asio::ip::udp::socket> m_socket;
178 :
179 : /**
180 : * @brief Socket writer configuration
181 : */
182 : WriterConfig m_writer_config;
183 : };
184 :
185 : // Commands
186 : void do_configure(const CommandData_t&);
187 : void do_start(const CommandData_t&);
188 : void do_stop(const CommandData_t&);
189 :
190 : void generate_opmon_data() override;
191 :
192 : /**
193 : * @brief Converts a socket type string to an enum
194 : * @param socket_type Socket type as a string
195 : * @return Corresponding SocketType enum
196 : */
197 : SocketType string_to_socket_type(const std::string& socket_type) const;
198 :
199 : /**
200 : * @brief Gets dal inputs
201 : * @param mdal SocketDataWriterModule dal
202 : */
203 : void get_dal_inputs(const dunedaq::appmodel::SocketDataWriterModule* mdal);
204 :
205 : /**
206 : * @brief Raw data consume thread function
207 : */
208 : void run_consume();
209 :
210 : /**
211 : * @brief Raw data consume callback function
212 : * @param payload Consumed data
213 : */
214 : void consume_payload(GenericReceiverConcept::TypeErasedPayload payload);
215 :
216 : /**
217 : * @brief I/O context for socket operations
218 : */
219 : boost::asio::io_context m_io_context;
220 :
221 : /**
222 : * @brief Prevents I/O context from exiting prematurely
223 : */
224 : boost::asio::executor_work_guard<boost::asio::io_context::executor_type> m_work_guard;
225 :
226 : /**
227 : * @brief Socket writers
228 : */
229 : std::vector<std::variant<TCPWriter, UDPWriter>> m_writers;
230 :
231 : /**
232 : * @brief Background thread to keep the I/O context running
233 : */
234 : std::jthread m_io_thread;
235 :
236 : /**
237 : * @brief Type of socket
238 : */
239 : SocketType m_socket_type{ SocketType::INVALID };
240 :
241 : /**
242 : * @brief Socket writer configurations
243 : */
244 : std::vector<WriterConfig> m_writer_configs;
245 :
246 : /**
247 : * @brief DAQ configuration data
248 : */
249 : std::shared_ptr<appfwk::ConfigurationManager> m_cfg;
250 :
251 : /**
252 : * @brief Whether callback mode is configured
253 : */
254 : bool m_callback_mode{ false };
255 :
256 : // RAW RECEIVER
257 : /**
258 : * @brief Generic raw data receiver
259 : */
260 : std::shared_ptr<GenericReceiverConcept> m_raw_data_receiver;
261 :
262 : /**
263 : * @brief Raw data receiver timeout
264 : */
265 : std::chrono::milliseconds m_raw_receiver_timeout_ms{ raw_receiver_timeout_ms };
266 :
267 : /**
268 : * @brief Raw data receiver UID
269 : */
270 : std::string m_raw_data_receiver_connection_name;
271 :
272 : // CONSUMER
273 : /**
274 : * @brief Raw data consume thread
275 : */
276 : utilities::ReusableThread m_consumer_thread;
277 :
278 : /**
279 : * @brief Whether consumer thread should continue
280 : */
281 : std::atomic<bool> m_run_marker { false };
282 :
283 : // Consume callback
284 : /**
285 : * @brief Raw data consume callback
286 : */
287 : std::function<void(GenericReceiverConcept::TypeErasedPayload payload)> m_consume_callback;
288 :
289 : // RUN START T0
290 : /**
291 : * @brief Timestamp used to measure time between opmon reports
292 : */
293 : std::chrono::time_point<std::chrono::high_resolution_clock> m_t0;
294 : };
295 :
296 : } // namespace dunedaq::asiolibs
297 :
298 : #endif // ASIOLIBS_PLUGINS_SOCKETWRITERMODULE_HPP_
|