LCOV - code coverage report
Current view: top level - asiolibs/plugins - SocketWriterModule.hpp (source / functions) Coverage Total Hit
Test: code.result Lines: 0.0 % 1 0
Test Date: 2025-12-21 13:07:08 Functions: 0.0 % 1 0

            Line data    Source code
       1              : /**
       2              :  * @file SocketWriterModule.hpp Boost.Asio-based socket writer plugin for low-bandwidth devices
       3              :  *
       4              :  * This is part of the DUNE DAQ , copyright 2020.
       5              :  * Licensing/copyright details are in the COPYING file that you should have
       6              :  * received with this code.
       7              :  */
       8              : #ifndef ASIOLIBS_PLUGINS_SOCKETWRITERMODULE_HPP_
       9              : #define ASIOLIBS_PLUGINS_SOCKETWRITERMODULE_HPP_
      10              : 
      11              : #include "GenericReceiverConcept.hpp"
      12              : 
      13              : #include "appfwk/DAQModule.hpp"
      14              : #include "utilities/ReusableThread.hpp"
      15              : 
      16              : #include "appmodel/SocketDataWriterModule.hpp"
      17              : 
      18              : #include <boost/asio.hpp>
      19              : 
      20              : #include <string>
      21              : #include <memory>
      22              : #include <vector>
      23              : 
      24              : namespace dunedaq::asiolibs {
      25              : 
      26              : class ConfigurationManager;
      27              : class SocketDataWriterModule;
      28              : 
      29              : class SocketWriterModule : public dunedaq::appfwk::DAQModule
      30              : {
      31              : public:
      32              :   /**
      33              :    * @brief Default raw data receiver timeout in ms
      34              :    */
      35              :   static constexpr auto raw_receiver_timeout_ms = 10;
      36              : 
      37              :   /**
      38              :    * @brief SocketWriterModule constructor
      39              :    * @param name DAQ module instance name
      40              :    */
      41              :   explicit SocketWriterModule(const std::string& name);
      42            0 :   ~SocketWriterModule() = default;
      43              : 
      44              :   SocketWriterModule(const SocketWriterModule&) = delete; ///< SocketWriterModule is not copy-constructible
      45              :   SocketWriterModule& operator=(const SocketWriterModule&) =
      46              :     delete;                                                  ///< SocketWriterModule is not copy-assignable
      47              :   SocketWriterModule(SocketWriterModule&&) = delete; ///< SocketWriterModule is not move-constructible
      48              :   SocketWriterModule& operator=(SocketWriterModule&&) =
      49              :     delete; ///< SocketWriterModule is not move-assignable
      50              : 
      51              :   /**
      52              :    * @brief Handles initialization on boot
      53              :    * @param mcfg DAQ configuration data
      54              :    */
      55              :   void init(const std::shared_ptr<appfwk::ConfigurationManager> mcfg) override;
      56              : 
      57              : private:
      58              :   enum class SocketType
      59              :   {
      60              :     TCP,
      61              :     UDP,
      62              :     INVALID
      63              :   };
      64              : 
      65              :   struct SocketStats
      66              :   {
      67              :     /**
      68              :      * @brief Total number of received payloads
      69              :      */
      70              :     std::atomic<uint64_t> sum_payloads{ 0 };
      71              : 
      72              :     /**
      73              :      * @brief Incremental number of received payloads 
      74              :      */
      75              :     std::atomic<uint64_t> num_payloads{ 0 };   
      76              : 
      77              :     /**
      78              :      * @brief Total number of received bytes 
      79              :      */
      80              :     std::atomic<uint64_t> sum_bytes{ 0 };
      81              : 
      82              :     /**
      83              :      * @brief Timeout on data inputs 
      84              :      */
      85              :     std::atomic<uint64_t> rawq_timeout_count{ 0 };    
      86              : 
      87              :     /**
      88              :      * @brief Rate of consumed packets 
      89              :      */
      90              :     std::atomic<double> rate_payloads_consumed{ 0 };
      91              :     
      92              :     /**
      93              :      * @brief Counts packets since last opmon data generation
      94              :      */
      95              :     std::atomic<int> stats_packet_count{ 0 };    
      96              :   };
      97              : 
      98              :   struct WriterConfig
      99              :   {
     100              :     /**
     101              :      * @brief Destination IP address
     102              :      */
     103              :     std::string remote_ip;
     104              : 
     105              :     /**
     106              :      * @brief Destination port number
     107              :      */
     108              :     ushort remote_port;
     109              : 
     110              :     /**
     111              :      * @brief Statistics of socket traffic
     112              :      */
     113              :     std::shared_ptr<SocketStats> socket_stats;
     114              :   };
     115              : 
     116              :   class TCPWriter
     117              :   {
     118              :   public:
     119              :     /**
     120              :      * @brief Creates and connects a TCP socket
     121              :      * @param io_context I/O context for socket creation
     122              :      * @param writer_config TCP writer configuration
     123              :      * @throws boost::system::system_error on failure
     124              :      */
     125              :     void configure(boost::asio::io_context& io_context, const WriterConfig& writer_config);
     126              : 
     127              :     /**
     128              :      * @brief Asynchronously sends payloads to the socket in a loop
     129              :      * @param payload Payload to send
     130              :      * @return Coroutine handle
     131              :      */
     132              :     boost::asio::awaitable<void> start(GenericReceiverConcept::TypeErasedPayload payload);
     133              : 
     134              :     /**
     135              :      * @brief Closes the socket
     136              :      */
     137              :     void stop();
     138              : 
     139              :   private:
     140              :     /**
     141              :      * @brief TCP socket
     142              :      */
     143              :     std::unique_ptr<boost::asio::ip::tcp::socket> m_socket;
     144              : 
     145              :     /**
     146              :      * @brief Statistics of socket traffic
     147              :      */
     148              :     std::shared_ptr<SocketStats> m_socket_stats;
     149              :   };
     150              : 
     151              :   class UDPWriter
     152              :   {
     153              :   public:
     154              :     /**
     155              :      * @brief Creates a UDP socket
     156              :      * @param io_context I/O context for socket creation
     157              :      * @param writer_config UDP writer configuration
     158              :      */
     159              :     void configure(boost::asio::io_context& io_context, const WriterConfig& writer_config);
     160              : 
     161              :     /**
     162              :      * @brief Asynchronously sends payloads to the socket in a loop
     163              :      * @param payload Payload to send
     164              :      * @return Coroutine handle
     165              :      */
     166              :     boost::asio::awaitable<void> start(GenericReceiverConcept::TypeErasedPayload payload);
     167              : 
     168              :     /**
     169              :      * @brief Closes the socket
     170              :      */
     171              :     void stop();
     172              : 
     173              :   private:
     174              :     /**
     175              :      * @brief UDP socket
     176              :      */
     177              :     std::unique_ptr<boost::asio::ip::udp::socket> m_socket;
     178              : 
     179              :     /**
     180              :      * @brief Socket writer configuration
     181              :      */
     182              :     WriterConfig m_writer_config;
     183              :   };
     184              : 
     185              :   // Commands
     186              :   void do_configure(const CommandData_t&);
     187              :   void do_start(const CommandData_t&);
     188              :   void do_stop(const CommandData_t&);
     189              : 
     190              :   void generate_opmon_data() override;
     191              : 
     192              :   /**
     193              :    * @brief Converts a socket type string to an enum
     194              :    * @param socket_type Socket type as a string
     195              :    * @return Corresponding SocketType enum
     196              :    */
     197              :   SocketType string_to_socket_type(const std::string& socket_type) const;
     198              : 
     199              :   /**
     200              :    * @brief Gets dal inputs
     201              :    * @param mdal SocketDataWriterModule dal
     202              :    */   
     203              :   void get_dal_inputs(const dunedaq::appmodel::SocketDataWriterModule* mdal);
     204              : 
     205              :   /**
     206              :    * @brief Raw data consume thread function
     207              :    */   
     208              :   void run_consume();
     209              : 
     210              :   /**
     211              :    * @brief Raw data consume callback function
     212              :    * @param payload Consumed data
     213              :    */  
     214              :   void consume_payload(GenericReceiverConcept::TypeErasedPayload payload);  
     215              : 
     216              :   /**
     217              :    * @brief I/O context for socket operations
     218              :    */
     219              :   boost::asio::io_context m_io_context;
     220              : 
     221              :   /**
     222              :    * @brief Prevents I/O context from exiting prematurely
     223              :    */
     224              :   boost::asio::executor_work_guard<boost::asio::io_context::executor_type> m_work_guard;
     225              : 
     226              :   /**
     227              :    * @brief Socket writers
     228              :    */
     229              :   std::vector<std::variant<TCPWriter, UDPWriter>> m_writers;
     230              : 
     231              :   /**
     232              :    * @brief Background thread to keep the I/O context running
     233              :    */
     234              :   std::jthread m_io_thread;
     235              : 
     236              :   /**
     237              :    * @brief Type of socket
     238              :    */
     239              :   SocketType m_socket_type{ SocketType::INVALID };
     240              : 
     241              :   /**
     242              :    * @brief Socket writer configurations
     243              :    */
     244              :   std::vector<WriterConfig> m_writer_configs;
     245              : 
     246              :   /**
     247              :    * @brief DAQ configuration data
     248              :    */
     249              :   std::shared_ptr<appfwk::ConfigurationManager> m_cfg;
     250              : 
     251              :   /**
     252              :    * @brief Whether callback mode is configured
     253              :    */  
     254              :   bool m_callback_mode{ false };
     255              : 
     256              :   // RAW RECEIVER
     257              :   /**
     258              :    * @brief Generic raw data receiver
     259              :    */  
     260              :   std::shared_ptr<GenericReceiverConcept> m_raw_data_receiver;
     261              : 
     262              :   /**
     263              :    * @brief Raw data receiver timeout
     264              :    */  
     265              :   std::chrono::milliseconds m_raw_receiver_timeout_ms{ raw_receiver_timeout_ms };
     266              : 
     267              :   /**
     268              :    * @brief Raw data receiver UID
     269              :    */  
     270              :   std::string m_raw_data_receiver_connection_name;  
     271              : 
     272              :   // CONSUMER
     273              :   /**
     274              :    * @brief Raw data consume thread
     275              :    */     
     276              :   utilities::ReusableThread m_consumer_thread;
     277              : 
     278              :   /**
     279              :    * @brief Whether consumer thread should continue
     280              :    */    
     281              :   std::atomic<bool> m_run_marker { false };
     282              : 
     283              :   // Consume callback
     284              :   /**
     285              :    * @brief Raw data consume callback
     286              :    */    
     287              :   std::function<void(GenericReceiverConcept::TypeErasedPayload payload)> m_consume_callback;  
     288              : 
     289              :   // RUN START T0
     290              :   /**
     291              :    * @brief Timestamp used to measure time between opmon reports
     292              :    */   
     293              :   std::chrono::time_point<std::chrono::high_resolution_clock> m_t0;  
     294              : };
     295              : 
     296              : } // namespace dunedaq::asiolibs
     297              : 
     298              : #endif // ASIOLIBS_PLUGINS_SOCKETWRITERMODULE_HPP_
        

Generated by: LCOV version 2.0-1