Line data Source code
1 : /**
2 : * @file SocketWriterModule.hpp Boost.Asio-based socket writer plugin for low-bandwidth devices
3 : *
4 : * This is part of the DUNE DAQ , copyright 2020.
5 : * Licensing/copyright details are in the COPYING file that you should have
6 : * received with this code.
7 : */
8 : #ifndef ASIOLIBS_PLUGINS_SOCKETWRITERMODULE_HPP_
9 : #define ASIOLIBS_PLUGINS_SOCKETWRITERMODULE_HPP_
10 :
11 : #include "GenericReceiverConcept.hpp"
12 :
13 : #include "appfwk/DAQModule.hpp"
14 : #include "utilities/ReusableThread.hpp"
15 :
16 : #include "appmodel/SocketDataWriterModule.hpp"
17 :
18 : #include <boost/asio.hpp>
19 :
20 : #include <string>
21 : #include <memory>
22 : #include <vector>
23 :
24 : namespace dunedaq::asiolibs {
25 :
26 : class ConfigurationManager;
27 : class SocketDataWriterModule;
28 :
29 : class SocketWriterModule : public dunedaq::appfwk::DAQModule
30 : {
31 : public:
32 : /**
33 : * @brief Default raw data receiver timeout in ms
34 : */
35 : static constexpr auto raw_receiver_timeout_ms = 10;
36 :
37 : /**
38 : * @brief SocketWriterModule constructor
39 : * @param name DAQ module instance name
40 : */
41 : explicit SocketWriterModule(const std::string& name);
42 0 : ~SocketWriterModule() = default;
43 :
44 : SocketWriterModule(const SocketWriterModule&) = delete; ///< SocketWriterModule is not copy-constructible
45 : SocketWriterModule& operator=(const SocketWriterModule&) =
46 : delete; ///< SocketWriterModule is not copy-assignable
47 : SocketWriterModule(SocketWriterModule&&) = delete; ///< SocketWriterModule is not move-constructible
48 : SocketWriterModule& operator=(SocketWriterModule&&) =
49 : delete; ///< SocketWriterModule is not move-assignable
50 :
51 : /**
52 : * @brief Handles initialization on boot
53 : * @param mcfg DAQ configuration data
54 : */
55 : void init(const std::shared_ptr<appfwk::ConfigurationManager> mcfg) override;
56 :
57 : private:
58 : enum class SocketType
59 : {
60 : TCP,
61 : UDP,
62 : INVALID
63 : };
64 :
65 : struct SocketStats
66 : {
67 : /**
68 : * @brief Total number of received payloads
69 : */
70 : std::atomic<uint64_t> sum_payloads{ 0 };
71 :
72 : /**
73 : * @brief Incremental number of received payloads
74 : */
75 : std::atomic<uint64_t> num_payloads{ 0 };
76 :
77 : /**
78 : * @brief Total number of received bytes
79 : */
80 : std::atomic<uint64_t> sum_bytes{ 0 };
81 :
82 : /**
83 : * @brief Timeout on data inputs
84 : */
85 : std::atomic<uint64_t> rawq_timeout_count{ 0 };
86 :
87 : /**
88 : * @brief Rate of consumed packets
89 : */
90 : std::atomic<double> rate_payloads_consumed{ 0 };
91 :
92 : /**
93 : * @brief Counts packets since last opmon data generation
94 : */
95 : std::atomic<int> stats_packet_count{ 0 };
96 : };
97 :
98 : struct WriterConfig
99 : {
100 : /**
101 : * @brief Destination IP address
102 : */
103 : std::string remote_ip;
104 :
105 : /**
106 : * @brief Destination port number
107 : */
108 : ushort remote_port;
109 :
110 : /**
111 : * @brief Statistics of socket traffic
112 : */
113 : std::shared_ptr<SocketStats> socket_stats;
114 : };
115 :
116 : class TCPWriter
117 : {
118 : public:
119 : /**
120 : * @brief Creates and connects a TCP socket
121 : * @param io_context I/O context for socket creation
122 : * @param writer_config TCP writer configuration
123 : * @throws boost::system::system_error on failure
124 : */
125 : void configure(boost::asio::io_context& io_context, const WriterConfig& writer_config);
126 :
127 : /**
128 : * @brief Asynchronously sends payloads to the socket in a loop
129 : * @param payload Payload to send
130 : * @return Coroutine handle
131 : */
132 : boost::asio::awaitable<void> start(GenericReceiverConcept::TypeErasedPayload payload);
133 :
134 : /**
135 : * @brief Closes the socket
136 : */
137 : void stop();
138 :
139 : private:
140 : /**
141 : * @brief TCP socket
142 : */
143 : std::unique_ptr<boost::asio::ip::tcp::socket> m_socket;
144 :
145 : /**
146 : * @brief Statistics of socket traffic
147 : */
148 : std::shared_ptr<SocketStats> m_socket_stats;
149 : };
150 :
151 : class UDPWriter
152 : {
153 : public:
154 : /**
155 : * @brief Creates a UDP socket
156 : * @param io_context I/O context for socket creation
157 : * @param writer_config UDP writer configuration
158 : */
159 : void configure(boost::asio::io_context& io_context, const WriterConfig& writer_config);
160 :
161 : /**
162 : * @brief Asynchronously sends payloads to the socket in a loop
163 : * @param payload Payload to send
164 : * @return Coroutine handle
165 : */
166 : boost::asio::awaitable<void> start(GenericReceiverConcept::TypeErasedPayload payload);
167 :
168 : /**
169 : * @brief Closes the socket
170 : */
171 : void stop();
172 :
173 : private:
174 : /**
175 : * @brief UDP socket
176 : */
177 : std::unique_ptr<boost::asio::ip::udp::socket> m_socket;
178 :
179 : /**
180 : * @brief Socket writer configuration
181 : */
182 : WriterConfig m_writer_config;
183 : };
184 :
185 : // Commands
186 : void do_configure(const CommandData_t&);
187 : void do_start(const CommandData_t&);
188 : void do_stop(const CommandData_t&);
189 :
190 : void generate_opmon_data() override;
191 :
192 : /**
193 : * @brief Converts a socket type string to an enum
194 : * @param socket_type Socket type as a string
195 : * @return Corresponding SocketType enum
196 : */
197 : SocketType string_to_socket_type(const std::string& socket_type) const;
198 :
199 : /**
200 : * @brief Raw data consume callback function
201 : * @param payload Consumed data
202 : */
203 : void consume_payload(GenericReceiverConcept::TypeErasedPayload payload);
204 :
205 : /**
206 : * @brief I/O context for socket operations
207 : */
208 : boost::asio::io_context m_io_context;
209 :
210 : /**
211 : * @brief Prevents I/O context from exiting prematurely
212 : */
213 : boost::asio::executor_work_guard<boost::asio::io_context::executor_type> m_work_guard;
214 :
215 : /**
216 : * @brief Socket writers
217 : */
218 : std::vector<std::variant<TCPWriter, UDPWriter>> m_writers;
219 :
220 : /**
221 : * @brief Background thread to keep the I/O context running
222 : */
223 : std::jthread m_io_thread;
224 :
225 : /**
226 : * @brief Type of socket
227 : */
228 : SocketType m_socket_type{ SocketType::INVALID };
229 :
230 : /**
231 : * @brief Socket writer configurations
232 : */
233 : std::vector<WriterConfig> m_writer_configs;
234 :
235 : /**
236 : * @brief DAQ configuration data
237 : */
238 : std::shared_ptr<appfwk::ConfigurationManager> m_cfg;
239 :
240 : /**
241 : * @brief Configuration object for the callbacks
242 : */
243 : const appmodel::DataMoveCallbackConf* m_callback_conf;
244 :
245 : // Consume callback
246 : /**
247 : * @brief Raw data consume callback
248 : */
249 : std::function<void(GenericReceiverConcept::TypeErasedPayload payload)> m_consume_callback;
250 :
251 : // RUN START T0
252 : /**
253 : * @brief Timestamp used to measure time between opmon reports
254 : */
255 : std::chrono::time_point<std::chrono::steady_clock> m_t0;
256 : };
257 :
258 : } // namespace dunedaq::asiolibs
259 :
260 : #endif // ASIOLIBS_PLUGINS_SOCKETWRITERMODULE_HPP_
|