LCOV - code coverage report
Current view: top level - asiolibs/plugins - SocketWriterModule.cpp (source / functions) Coverage Total Hit
Test: code.result Lines: 0.0 % 128 0
Test Date: 2026-03-29 15:29:34 Functions: 0.0 % 28 0

            Line data    Source code
       1              : /**
       2              :  * @file SocketWriterModule.cpp Boost.Asio-based socket writer plugin for low-bandwidth devices
       3              :  *
       4              :  * This is part of the DUNE DAQ , copyright 2020.
       5              :  * Licensing/copyright details are in the COPYING file that you should have
       6              :  * received with this code.
       7              :  */
       8              : 
       9              : #include "SocketWriterModule.hpp"
      10              : 
      11              : #include "CreateGenericReceiver.hpp"
      12              : 
      13              : #include "appfwk/ConfigurationManager.hpp"
      14              : #include "appmodel/SocketDataSender.hpp"
      15              : #include "appmodel/NWDetDataSender.hpp"
      16              : #include "appmodel/SocketReceiver.hpp"
      17              : #include "appmodel/SocketWriterConf.hpp"
      18              : #include "confmodel/DetectorStream.hpp"
      19              : #include "appmodel/NetworkDetectorToDaqConnection.hpp"
      20              : #include "confmodel/QueueWithSourceId.hpp"
      21              : 
      22              : #include "datahandlinglibs/DataHandlingIssues.hpp"
      23              : #include "datahandlinglibs/DataMoveCallbackRegistry.hpp"
      24              : 
      25              : #include "asiolibs/opmon/SocketWriterModule.pb.h"
      26              : 
      27              : #include "asiolibs/AsioIssues.hpp"
      28              : 
      29              : #include <string>
      30              : #include <memory>
      31              : #include <vector>
      32              : #include <utility>
      33              : 
      34              : namespace dunedaq::asiolibs {
      35              : 
      36            0 : SocketWriterModule::SocketWriterModule(const std::string& name)
      37              :   : DAQModule(name)
      38            0 :   , m_work_guard(boost::asio::make_work_guard(m_io_context))
      39              : {
      40            0 :   register_command("conf", &SocketWriterModule::do_configure);
      41            0 :   register_command("start", &SocketWriterModule::do_start);
      42            0 :   register_command("stop_trigger_sources", &SocketWriterModule::do_stop);
      43            0 : }
      44              : 
      45              : void
      46            0 : SocketWriterModule::init(const std::shared_ptr<appfwk::ConfigurationManager> mcfg)
      47              : {
      48            0 :   m_cfg = mcfg;
      49            0 :   auto* mdal = m_cfg->get_dal<appmodel::SocketDataWriterModule>(get_name());
      50            0 :   auto* module_conf = mdal->get_configuration()->cast<appmodel::SocketWriterConf>();
      51              : 
      52            0 :   m_callback_conf = mdal->get_raw_data_callback();
      53            0 :   const auto remote_ip = module_conf->get_remote_ip();
      54              : 
      55            0 :   m_socket_type = string_to_socket_type(module_conf->get_socket_type());
      56            0 :   if (m_socket_type != SocketType::TCP && m_socket_type != SocketType::UDP) {
      57            0 :     throw std::invalid_argument("Error: Only TCP and UDP are allowed!");
      58              :   }
      59              : 
      60            0 :   for (auto* d2d_conn : mdal->get_connections()) {
      61            0 :     if (d2d_conn->is_disabled(*(m_cfg->get_session()))) {
      62            0 :       continue;
      63              :     }
      64              : 
      65            0 :     for (auto* nw_sender : d2d_conn->get_net_senders()) {
      66              : 
      67            0 :       if (nw_sender->is_disabled(*(m_cfg->get_session()))) {
      68            0 :         continue;
      69              :       }
      70              : 
      71            0 :       if (nw_sender->get_streams().size() > 1) {
      72            0 :         dunedaq::datahandlinglibs::GenericConfigurationError err(ERS_HERE,
      73            0 :                                                                  "Multiple streams currently are not supported!");
      74            0 :         ers::fatal(err);
      75            0 :         throw err;
      76            0 :       }
      77            0 :       const auto* socket_sender = nw_sender->cast<appmodel::SocketDataSender>();
      78            0 :       if (socket_sender == nullptr) {
      79            0 :         throw dunedaq::datahandlinglibs::InitializationError(
      80            0 :           ERS_HERE,
      81            0 :           fmt::format("Found {} of type {} in connection {} while expecting type SocketDetDataSender",
      82            0 :                       nw_sender->class_name(),
      83            0 :                       nw_sender->UID(),
      84            0 :                       d2d_conn->UID()));
      85              :       }
      86            0 :       m_writer_configs.emplace_back(remote_ip, socket_sender->get_port(), std::make_shared<SocketStats>());
      87              :     }
      88              :   }
      89              : 
      90            0 :   m_writers.reserve(m_writer_configs.size());
      91            0 :   if (m_socket_type == SocketType::TCP) {
      92            0 :     for (std::size_t i = 0; i < m_writer_configs.size(); ++i) {
      93            0 :       m_writers.emplace_back(TCPWriter());
      94              :     }
      95              :   } else {
      96            0 :     for (std::size_t i = 0; i < m_writer_configs.size(); ++i) {
      97            0 :       m_writers.emplace_back(UDPWriter());
      98              :     }
      99              :   }
     100              : 
     101              :   // Raw input connection sensibility check
     102            0 :   if (m_callback_conf == nullptr) {
     103            0 :     TLOG() << "No callback configuration given!";
     104              :     //ers::error(ConfigurationError(ERS_HERE, m_sourceid, No callback configuration given!"));
     105              :   }
     106            0 : }
     107              : 
     108              : SocketWriterModule::SocketType
     109            0 : SocketWriterModule::string_to_socket_type(const std::string& socket_type) const
     110              : {
     111            0 :   if (socket_type == "TCP") {
     112              :     return SocketWriterModule::SocketType::TCP;
     113            0 :   } else if (socket_type == "UDP") {
     114            0 :     return SocketWriterModule::SocketType::UDP;
     115              :   }
     116              :   return SocketWriterModule::SocketType::INVALID;
     117              : }
     118              : 
     119              : void
     120            0 : SocketWriterModule::consume_payload(GenericReceiverConcept::TypeErasedPayload payload)
     121              : {
     122            0 :   for (auto& writer : m_writers) {
     123            0 :     std::visit([this, payload](auto& w) mutable { // lets payload to be moved
     124            0 :       boost::asio::co_spawn(m_io_context, w.start(std::move(payload)), boost::asio::detached);
     125            0 :     }, writer);
     126              :   }
     127            0 : }
     128              : 
     129              : void
     130            0 : SocketWriterModule::do_configure(const CommandData_t&)
     131              : {
     132              :     // Configure and register consume callback
     133            0 :     m_consume_callback = std::bind(&SocketWriterModule::consume_payload, this, std::placeholders::_1);
     134              : 
     135              :     // Register callback
     136            0 :     auto dmcbr = datahandlinglibs::DataMoveCallbackRegistry::get();
     137            0 :     dmcbr->register_callback<GenericReceiverConcept::TypeErasedPayload>(m_callback_conf, m_consume_callback);
     138              : 
     139            0 :   for (std::size_t i = 0; i < m_writers.size(); ++i) {
     140            0 :     const auto& writer_config = m_writer_configs[i];
     141            0 :     std::visit([this, &writer_config](auto& writer) { writer.configure(m_io_context, writer_config); }, m_writers[i]);
     142              :   }
     143            0 : }
     144              : 
     145              : void
     146            0 : SocketWriterModule::do_start(const CommandData_t&)
     147              : {
     148            0 :   for (const auto& writer_config : m_writer_configs) {
     149              :     // Reset opmon variables
     150            0 :     writer_config.socket_stats->sum_payloads = 0;
     151            0 :     writer_config.socket_stats->num_payloads = 0;
     152            0 :     writer_config.socket_stats->sum_bytes = 0;
     153            0 :     writer_config.socket_stats->rawq_timeout_count = 0;
     154            0 :     writer_config.socket_stats->stats_packet_count = 0;
     155              :   }
     156              : 
     157            0 :   m_t0 = std::chrono::steady_clock::now();
     158              : 
     159            0 :   m_io_thread = std::jthread([this] { m_io_context.run(); });
     160            0 : }
     161              : 
     162              : void
     163            0 : SocketWriterModule::do_stop(const CommandData_t&)
     164              : {
     165            0 :   for (auto& writer : m_writers) {
     166            0 :     std::visit([](auto& writer) { writer.stop(); }, writer);
     167              :   }
     168              : 
     169            0 :   m_work_guard.reset();
     170            0 : }
     171              : 
     172              : void
     173            0 : SocketWriterModule::generate_opmon_data()
     174              : {
     175            0 :   for (const auto& writer_config : m_writer_configs) {
     176            0 :     opmon::SocketWriterStats stats;
     177            0 :     stats.set_sum_payloads(writer_config.socket_stats->sum_payloads.load());
     178            0 :     stats.set_num_payloads(writer_config.socket_stats->num_payloads.exchange(0));
     179            0 :     stats.set_sum_bytes(writer_config.socket_stats->sum_bytes.load());
     180            0 :     stats.set_num_data_input_timeouts(writer_config.socket_stats->rawq_timeout_count.exchange(0));
     181              : 
     182            0 :     auto now = std::chrono::steady_clock::now();
     183            0 :     int new_packets = writer_config.socket_stats->stats_packet_count.exchange(0);
     184            0 :     double seconds = std::chrono::duration_cast<std::chrono::microseconds>(now - m_t0).count() / 1000000.;
     185            0 :     m_t0 = now;
     186              : 
     187            0 :     stats.set_rate_payloads_consumed(new_packets / seconds / 1000.);
     188              : 
     189            0 :     publish(std::move(stats), { { "socket-writer", std::to_string(writer_config.remote_port) } });
     190            0 :   }
     191            0 : }
     192              : 
     193              : void
     194            0 : SocketWriterModule::TCPWriter::configure(boost::asio::io_context& io_context, const WriterConfig& writer_config)
     195              : {
     196            0 :   m_socket_stats = writer_config.socket_stats;
     197              : 
     198            0 :   while (true) {
     199            0 :     try {
     200            0 :       m_socket = std::make_unique<boost::asio::ip::tcp::socket>(io_context);
     201              : 
     202            0 :       m_socket->connect(boost::asio::ip::tcp::endpoint(
     203            0 :         boost::asio::ip::address::from_string(writer_config.remote_ip), writer_config.remote_port));
     204            0 :       break;
     205            0 :     } catch (const boost::system::system_error& e) {
     206            0 :       TLOG() << "Connection failed: " << e.what() << ". Retrying in 1 second...";
     207            0 :       std::this_thread::sleep_for(std::chrono::seconds(1));
     208            0 :     }
     209              :   }
     210              : 
     211            0 :   TLOG() << "Established TCP connection to " << writer_config.remote_ip << ":" << writer_config.remote_port;
     212            0 : }
     213              : 
     214              : boost::asio::awaitable<void>
     215            0 : SocketWriterModule::TCPWriter::start(GenericReceiverConcept::TypeErasedPayload payload) // TODO (DTE): Rename
     216              : {
     217              :   const auto bytes_sent =
     218              :     co_await boost::asio::async_write(*m_socket, boost::asio::buffer(payload.data, payload.size), boost::asio::use_awaitable);
     219              :   ++m_socket_stats->num_payloads;
     220              :   ++m_socket_stats->sum_payloads;
     221              :   m_socket_stats->sum_bytes.fetch_add(bytes_sent);
     222              :   ++m_socket_stats->stats_packet_count;
     223            0 : }
     224              : 
     225              : void
     226            0 : SocketWriterModule::TCPWriter::stop()
     227              : {
     228            0 :   m_socket->shutdown(boost::asio::ip::tcp::socket::shutdown_both);
     229            0 :   m_socket->close();
     230            0 :   TLOG() << "Shutdown TCP connection";
     231            0 : }
     232              : 
     233              : void
     234            0 : SocketWriterModule::UDPWriter::configure(boost::asio::io_context& io_context, const WriterConfig& writer_config)
     235              : {
     236            0 :   m_writer_config = writer_config;
     237              : 
     238              :   // Let the OS pick an available local IP and port for sending packets
     239            0 :   const boost::asio::ip::udp::endpoint sender_endpoint(boost::asio::ip::udp::v4(), 0);
     240              : 
     241            0 :   m_socket = std::make_unique<boost::asio::ip::udp::socket>(io_context, sender_endpoint);
     242              : 
     243            0 :   TLOG() << "Created UDP socket on " << m_socket->local_endpoint().address() << ":"
     244            0 :          << m_socket->local_endpoint().port();
     245            0 : }
     246              : 
     247              : boost::asio::awaitable<void>
     248            0 : SocketWriterModule::UDPWriter::start(GenericReceiverConcept::TypeErasedPayload payload)
     249              : {
     250              :   boost::asio::ip::udp::endpoint receiver_endpoint(boost::asio::ip::address::from_string(m_writer_config.remote_ip),
     251              :                                                    m_writer_config.remote_port);
     252              :   const auto bytes_sent = co_await m_socket->async_send_to(
     253              :       boost::asio::buffer(payload.data, payload.size), receiver_endpoint, boost::asio::use_awaitable);
     254              :   ++m_writer_config.socket_stats->num_payloads;
     255              :   ++m_writer_config.socket_stats->sum_payloads;
     256              :   m_writer_config.socket_stats->sum_bytes.fetch_add(bytes_sent);
     257              :   ++m_writer_config.socket_stats->stats_packet_count;
     258            0 : }
     259              : 
     260              : void
     261            0 : SocketWriterModule::UDPWriter::stop()
     262              : {
     263            0 :   m_socket->close();
     264            0 :   TLOG() << "Closed UDP socket";
     265            0 : }
     266              : 
     267              : } // namespace dunedaq::asiolibs
     268              : 
     269            0 : DEFINE_DUNE_DAQ_MODULE(dunedaq::asiolibs::SocketWriterModule)
        

Generated by: LCOV version 2.0-1