LCOV - code coverage report
Current view: top level - asiolibs/plugins - SocketWriterModule.hpp (source / functions) Coverage Total Hit
Test: code.result Lines: 0.0 % 1 0
Test Date: 2026-03-29 15:29:34 Functions: 0.0 % 1 0

            Line data    Source code
       1              : /**
       2              :  * @file SocketWriterModule.hpp Boost.Asio-based socket writer plugin for low-bandwidth devices
       3              :  *
       4              :  * This is part of the DUNE DAQ , copyright 2020.
       5              :  * Licensing/copyright details are in the COPYING file that you should have
       6              :  * received with this code.
       7              :  */
       8              : #ifndef ASIOLIBS_PLUGINS_SOCKETWRITERMODULE_HPP_
       9              : #define ASIOLIBS_PLUGINS_SOCKETWRITERMODULE_HPP_
      10              : 
      11              : #include "GenericReceiverConcept.hpp"
      12              : 
      13              : #include "appfwk/DAQModule.hpp"
      14              : #include "utilities/ReusableThread.hpp"
      15              : 
      16              : #include "appmodel/SocketDataWriterModule.hpp"
      17              : 
      18              : #include <boost/asio.hpp>
      19              : 
      20              : #include <string>
      21              : #include <memory>
      22              : #include <vector>
      23              : 
      24              : namespace dunedaq::asiolibs {
      25              : 
      26              : class ConfigurationManager;
      27              : class SocketDataWriterModule;
      28              : 
      29              : class SocketWriterModule : public dunedaq::appfwk::DAQModule
      30              : {
      31              : public:
      32              :   /**
      33              :    * @brief Default raw data receiver timeout in ms
      34              :    */
      35              :   static constexpr auto raw_receiver_timeout_ms = 10;
      36              : 
      37              :   /**
      38              :    * @brief SocketWriterModule constructor
      39              :    * @param name DAQ module instance name
      40              :    */
      41              :   explicit SocketWriterModule(const std::string& name);
      42            0 :   ~SocketWriterModule() = default;
      43              : 
      44              :   SocketWriterModule(const SocketWriterModule&) = delete; ///< SocketWriterModule is not copy-constructible
      45              :   SocketWriterModule& operator=(const SocketWriterModule&) =
      46              :     delete;                                                  ///< SocketWriterModule is not copy-assignable
      47              :   SocketWriterModule(SocketWriterModule&&) = delete; ///< SocketWriterModule is not move-constructible
      48              :   SocketWriterModule& operator=(SocketWriterModule&&) =
      49              :     delete; ///< SocketWriterModule is not move-assignable
      50              : 
      51              :   /**
      52              :    * @brief Handles initialization on boot
      53              :    * @param mcfg DAQ configuration data
      54              :    */
      55              :   void init(const std::shared_ptr<appfwk::ConfigurationManager> mcfg) override;
      56              : 
      57              : private:
      58              :   enum class SocketType
      59              :   {
      60              :     TCP,
      61              :     UDP,
      62              :     INVALID
      63              :   };
      64              : 
      65              :   struct SocketStats
      66              :   {
      67              :     /**
      68              :      * @brief Total number of received payloads
      69              :      */
      70              :     std::atomic<uint64_t> sum_payloads{ 0 };
      71              : 
      72              :     /**
      73              :      * @brief Incremental number of received payloads
      74              :      */
      75              :     std::atomic<uint64_t> num_payloads{ 0 };
      76              : 
      77              :     /**
      78              :      * @brief Total number of received bytes
      79              :      */
      80              :     std::atomic<uint64_t> sum_bytes{ 0 };
      81              : 
      82              :     /**
      83              :      * @brief Timeout on data inputs
      84              :      */
      85              :     std::atomic<uint64_t> rawq_timeout_count{ 0 };
      86              : 
      87              :     /**
      88              :      * @brief Rate of consumed packets
      89              :      */
      90              :     std::atomic<double> rate_payloads_consumed{ 0 };
      91              : 
      92              :     /**
      93              :      * @brief Counts packets since last opmon data generation
      94              :      */
      95              :     std::atomic<int> stats_packet_count{ 0 };
      96              :   };
      97              : 
      98              :   struct WriterConfig
      99              :   {
     100              :     /**
     101              :      * @brief Destination IP address
     102              :      */
     103              :     std::string remote_ip;
     104              : 
     105              :     /**
     106              :      * @brief Destination port number
     107              :      */
     108              :     ushort remote_port;
     109              : 
     110              :     /**
     111              :      * @brief Statistics of socket traffic
     112              :      */
     113              :     std::shared_ptr<SocketStats> socket_stats;
     114              :   };
     115              : 
     116              :   class TCPWriter
     117              :   {
     118              :   public:
     119              :     /**
     120              :      * @brief Creates and connects a TCP socket
     121              :      * @param io_context I/O context for socket creation
     122              :      * @param writer_config TCP writer configuration
     123              :      * @throws boost::system::system_error on failure
     124              :      */
     125              :     void configure(boost::asio::io_context& io_context, const WriterConfig& writer_config);
     126              : 
     127              :     /**
     128              :      * @brief Asynchronously sends payloads to the socket in a loop
     129              :      * @param payload Payload to send
     130              :      * @return Coroutine handle
     131              :      */
     132              :     boost::asio::awaitable<void> start(GenericReceiverConcept::TypeErasedPayload payload);
     133              : 
     134              :     /**
     135              :      * @brief Closes the socket
     136              :      */
     137              :     void stop();
     138              : 
     139              :   private:
     140              :     /**
     141              :      * @brief TCP socket
     142              :      */
     143              :     std::unique_ptr<boost::asio::ip::tcp::socket> m_socket;
     144              : 
     145              :     /**
     146              :      * @brief Statistics of socket traffic
     147              :      */
     148              :     std::shared_ptr<SocketStats> m_socket_stats;
     149              :   };
     150              : 
     151              :   class UDPWriter
     152              :   {
     153              :   public:
     154              :     /**
     155              :      * @brief Creates a UDP socket
     156              :      * @param io_context I/O context for socket creation
     157              :      * @param writer_config UDP writer configuration
     158              :      */
     159              :     void configure(boost::asio::io_context& io_context, const WriterConfig& writer_config);
     160              : 
     161              :     /**
     162              :      * @brief Asynchronously sends payloads to the socket in a loop
     163              :      * @param payload Payload to send
     164              :      * @return Coroutine handle
     165              :      */
     166              :     boost::asio::awaitable<void> start(GenericReceiverConcept::TypeErasedPayload payload);
     167              : 
     168              :     /**
     169              :      * @brief Closes the socket
     170              :      */
     171              :     void stop();
     172              : 
     173              :   private:
     174              :     /**
     175              :      * @brief UDP socket
     176              :      */
     177              :     std::unique_ptr<boost::asio::ip::udp::socket> m_socket;
     178              : 
     179              :     /**
     180              :      * @brief Socket writer configuration
     181              :      */
     182              :     WriterConfig m_writer_config;
     183              :   };
     184              : 
     185              :   // Commands
     186              :   void do_configure(const CommandData_t&);
     187              :   void do_start(const CommandData_t&);
     188              :   void do_stop(const CommandData_t&);
     189              : 
     190              :   void generate_opmon_data() override;
     191              : 
     192              :   /**
     193              :    * @brief Converts a socket type string to an enum
     194              :    * @param socket_type Socket type as a string
     195              :    * @return Corresponding SocketType enum
     196              :    */
     197              :   SocketType string_to_socket_type(const std::string& socket_type) const;
     198              : 
     199              :   /**
     200              :    * @brief Raw data consume callback function
     201              :    * @param payload Consumed data
     202              :    */
     203              :   void consume_payload(GenericReceiverConcept::TypeErasedPayload payload);
     204              : 
     205              :   /**
     206              :    * @brief I/O context for socket operations
     207              :    */
     208              :   boost::asio::io_context m_io_context;
     209              : 
     210              :   /**
     211              :    * @brief Prevents I/O context from exiting prematurely
     212              :    */
     213              :   boost::asio::executor_work_guard<boost::asio::io_context::executor_type> m_work_guard;
     214              : 
     215              :   /**
     216              :    * @brief Socket writers
     217              :    */
     218              :   std::vector<std::variant<TCPWriter, UDPWriter>> m_writers;
     219              : 
     220              :   /**
     221              :    * @brief Background thread to keep the I/O context running
     222              :    */
     223              :   std::jthread m_io_thread;
     224              : 
     225              :   /**
     226              :    * @brief Type of socket
     227              :    */
     228              :   SocketType m_socket_type{ SocketType::INVALID };
     229              : 
     230              :   /**
     231              :    * @brief Socket writer configurations
     232              :    */
     233              :   std::vector<WriterConfig> m_writer_configs;
     234              : 
     235              :   /**
     236              :    * @brief DAQ configuration data
     237              :    */
     238              :   std::shared_ptr<appfwk::ConfigurationManager> m_cfg;
     239              : 
     240              :   /**
     241              :    * @brief Configuration object for the callbacks
     242              :    */
     243              :   const appmodel::DataMoveCallbackConf* m_callback_conf;
     244              : 
     245              :   // Consume callback
     246              :   /**
     247              :    * @brief Raw data consume callback
     248              :    */
     249              :   std::function<void(GenericReceiverConcept::TypeErasedPayload payload)> m_consume_callback;
     250              : 
     251              :   // RUN START T0
     252              :   /**
     253              :    * @brief Timestamp used to measure time between opmon reports
     254              :    */
     255              :   std::chrono::time_point<std::chrono::steady_clock> m_t0;
     256              : };
     257              : 
     258              : } // namespace dunedaq::asiolibs
     259              : 
     260              : #endif // ASIOLIBS_PLUGINS_SOCKETWRITERMODULE_HPP_
        

Generated by: LCOV version 2.0-1