LCOV - code coverage report
Current view: top level - asiolibs/plugins - SocketWriterModule.cpp (source / functions) Coverage Total Hit
Test: code.result Lines: 0.0 % 183 0
Test Date: 2025-12-21 13:07:08 Functions: 0.0 % 33 0

            Line data    Source code
       1              : /**
       2              :  * @file SocketWriterModule.cpp Boost.Asio-based socket writer plugin for low-bandwidth devices
       3              :  *
       4              :  * This is part of the DUNE DAQ , copyright 2020.
       5              :  * Licensing/copyright details are in the COPYING file that you should have
       6              :  * received with this code.
       7              :  */
       8              : 
       9              : #include "SocketWriterModule.hpp"
      10              : 
      11              : #include "CreateGenericReceiver.hpp"
      12              : 
      13              : #include "appfwk/ConfigurationManager.hpp"
      14              : #include "appmodel/SocketDataSender.hpp"
      15              : #include "appmodel/NWDetDataSender.hpp"
      16              : #include "appmodel/SocketReceiver.hpp"
      17              : #include "appmodel/SocketWriterConf.hpp"
      18              : #include "confmodel/DetectorStream.hpp"
      19              : #include "appmodel/NetworkDetectorToDaqConnection.hpp"
      20              : #include "confmodel/QueueWithSourceId.hpp"
      21              : 
      22              : #include "datahandlinglibs/DataHandlingIssues.hpp"
      23              : #include "datahandlinglibs/DataMoveCallbackRegistry.hpp"
      24              : 
      25              : #include "asiolibs/opmon/SocketWriterModule.pb.h"
      26              : 
      27              : #include "asiolibs/AsioIssues.hpp"
      28              : 
      29              : #include <string>
      30              : #include <memory>
      31              : #include <vector>
      32              : #include <utility>
      33              : 
      34              : namespace dunedaq::asiolibs {
      35              : 
      36            0 : SocketWriterModule::SocketWriterModule(const std::string& name)
      37              :   : DAQModule(name)
      38            0 :   , m_work_guard(boost::asio::make_work_guard(m_io_context))
      39              : {
      40            0 :   register_command("conf", &SocketWriterModule::do_configure);
      41            0 :   register_command("start", &SocketWriterModule::do_start);
      42            0 :   register_command("stop_trigger_sources", &SocketWriterModule::do_stop);
      43            0 : }
      44              : 
      45              : void 
      46            0 : SocketWriterModule::get_dal_inputs(const dunedaq::appmodel::SocketDataWriterModule* mdal)
      47              : {
      48            0 :   if (mdal->get_inputs().empty()) {
      49            0 :     auto err = dunedaq::datahandlinglibs::InitializationError(ERS_HERE,
      50            0 :                                                               "No inputs defined for socket writer in configuration.");
      51            0 :     ers::fatal(err);
      52            0 :     throw err;
      53            0 :   }
      54              : 
      55            0 :   for (auto* input : mdal->get_inputs()) {
      56            0 :     m_raw_data_receiver_connection_name = input->UID();
      57              :     // Parse for prefix
      58            0 :     std::string conn_name = input->UID(); 
      59            0 :     const char delim = '_';
      60            0 :     std::vector<std::string> words;
      61            0 :     std::size_t start;
      62            0 :     std::size_t end = 0;
      63            0 :     while ((start = conn_name.find_first_not_of(delim, end)) != std::string::npos) {
      64            0 :       end = conn_name.find(delim, start);
      65            0 :       words.push_back(conn_name.substr(start, end - start));
      66              :     }
      67              : 
      68            0 :     TLOG_DEBUG() << "Initialize connection based on uid: " 
      69            0 :                  << m_raw_data_receiver_connection_name << " front word: " << words.front();
      70              : 
      71            0 :     std::string cb_prefix("cb");
      72            0 :     if (words.front() == cb_prefix) {
      73            0 :       m_callback_mode = true;
      74              :     }
      75              : 
      76            0 :     if (!m_callback_mode) {
      77            0 :       const auto recv_timeout_ms = input->get_recv_timeout_ms();
      78            0 :       if (recv_timeout_ms == 0) {
      79            0 :         ers::warning(InvalidRawReceiverTimeout(ERS_HERE, m_raw_receiver_timeout_ms.count()));
      80              :       } else {
      81            0 :         m_raw_receiver_timeout_ms = std::chrono::milliseconds(recv_timeout_ms);
      82              :       }
      83              :     }
      84              : 
      85            0 :     auto* queue = input->cast<confmodel::QueueWithSourceId>();
      86            0 :     if (queue == nullptr) {
      87            0 :       auto err = dunedaq::datahandlinglibs::InitializationError(ERS_HERE, "Inputs are not of type QueueWithGeoId.");
      88            0 :       ers::fatal(err);
      89            0 :       throw err;
      90            0 :     }  
      91              : 
      92            0 :     m_raw_data_receiver = createGenericReceiver(queue->UID(), m_raw_data_receiver_connection_name); // FIXME (DTE): Overwriting doesn't make sense      
      93            0 :   }    
      94            0 : }
      95              : 
      96              : void
      97            0 : SocketWriterModule::init(const std::shared_ptr<appfwk::ConfigurationManager> mcfg)
      98              : {
      99            0 :   m_cfg = mcfg;
     100            0 :   auto* mdal = m_cfg->get_dal<appmodel::SocketDataWriterModule>(get_name());
     101            0 :   auto* module_conf = mdal->get_configuration()->cast<appmodel::SocketWriterConf>();
     102              : 
     103            0 :   const auto remote_ip = module_conf->get_remote_ip();
     104              : 
     105            0 :   m_socket_type = string_to_socket_type(module_conf->get_socket_type());
     106            0 :   if (m_socket_type != SocketType::TCP && m_socket_type != SocketType::UDP) {
     107            0 :     throw std::invalid_argument("Error: Only TCP and UDP are allowed!");
     108              :   }
     109              : 
     110            0 :   for (auto* d2d_conn : mdal->get_connections()) {
     111            0 :     if (d2d_conn->is_disabled(*(m_cfg->get_session()))) {
     112            0 :       continue;
     113              :     }
     114              : 
     115            0 :     for (auto* nw_sender : d2d_conn->get_net_senders()) {
     116              :       
     117            0 :       if (nw_sender->is_disabled(*(m_cfg->get_session()))) {
     118            0 :         continue;
     119              :       }
     120              : 
     121            0 :       if (nw_sender->get_streams().size() > 1) {
     122            0 :         dunedaq::datahandlinglibs::GenericConfigurationError err(ERS_HERE,
     123            0 :                                                                  "Multiple streams currently are not supported!");
     124            0 :         ers::fatal(err);
     125            0 :         throw err;
     126            0 :       }
     127            0 :       const auto* socket_sender = nw_sender->cast<appmodel::SocketDataSender>();
     128            0 :       if (socket_sender == nullptr) {
     129            0 :         throw dunedaq::datahandlinglibs::InitializationError(
     130            0 :           ERS_HERE,
     131            0 :           fmt::format("Found {} of type {} in connection {} while expecting type SocketDetDataSender",
     132            0 :                       nw_sender->class_name(),
     133            0 :                       nw_sender->UID(),
     134            0 :                       d2d_conn->UID()));
     135              :       }
     136            0 :       m_writer_configs.emplace_back(remote_ip, socket_sender->get_port(), std::make_shared<SocketStats>());
     137              :     }
     138              :   }
     139              : 
     140            0 :   m_writers.reserve(m_writer_configs.size());
     141            0 :   if (m_socket_type == SocketType::TCP) {
     142            0 :     for (std::size_t i = 0; i < m_writer_configs.size(); ++i) {
     143            0 :       m_writers.emplace_back(TCPWriter());
     144              :     }
     145              :   } else {
     146            0 :     for (std::size_t i = 0; i < m_writer_configs.size(); ++i) {
     147            0 :       m_writers.emplace_back(UDPWriter());
     148              :     }
     149              :   }
     150              : 
     151            0 :   get_dal_inputs(mdal);
     152              : 
     153              :   // Raw input connection sensibility check
     154            0 :   if (!m_callback_mode && m_raw_data_receiver == nullptr) {
     155            0 :     TLOG() << "Non callback mode, and receiver is unset!";
     156              :     //ers::error(ConfigurationError(ERS_HERE, m_sourceid, "Non callback mode, and receiver is unset!"));
     157              :   }  
     158            0 : }
     159              : 
     160              : SocketWriterModule::SocketType
     161            0 : SocketWriterModule::string_to_socket_type(const std::string& socket_type) const
     162              : {
     163            0 :   if (socket_type == "TCP") {
     164              :     return SocketWriterModule::SocketType::TCP;
     165            0 :   } else if (socket_type == "UDP") {
     166            0 :     return SocketWriterModule::SocketType::UDP;
     167              :   }
     168              :   return SocketWriterModule::SocketType::INVALID;
     169              : }
     170              : 
     171              : void 
     172            0 : SocketWriterModule::run_consume()
     173              : {
     174            0 :   TLOG() << "Consumer thread started..."; // TODO (DTE): Make debug logs
     175              : 
     176            0 :   while (m_run_marker.load()) {
     177              :     // Try to acquire data
     178              : 
     179            0 :     if (auto opt_payload = m_raw_data_receiver->try_receive(m_raw_receiver_timeout_ms)) {
     180            0 :       consume_payload(std::move(*opt_payload));
     181              :     } else {
     182            0 :       for (const auto& writer_config : m_writer_configs) {
     183            0 :         ++writer_config.socket_stats->rawq_timeout_count;
     184              :       }
     185            0 :     }
     186              :   }
     187              : 
     188            0 :   TLOG() << "Consumer thread joins... ";
     189            0 : }
     190              : 
     191              : void
     192            0 : SocketWriterModule::consume_payload(GenericReceiverConcept::TypeErasedPayload payload)
     193              : {
     194            0 :   for (auto& writer : m_writers) {
     195            0 :     std::visit([this, payload](auto& w) mutable { // lets payload to be moved
     196            0 :       boost::asio::co_spawn(m_io_context, w.start(std::move(payload)), boost::asio::detached);
     197            0 :     }, writer);
     198              :   }
     199            0 : }
     200              : 
     201              : void
     202            0 : SocketWriterModule::do_configure(const CommandData_t&)
     203              : {
     204              :   // Register callbacks if operating in that mode.
     205            0 :   if (m_callback_mode) {
     206              :     // Configure and register consume callback
     207            0 :     m_consume_callback = std::bind(&SocketWriterModule::consume_payload, this, std::placeholders::_1);
     208              :  
     209              :     // Register callback
     210            0 :     auto dmcbr = datahandlinglibs::DataMoveCallbackRegistry::get();
     211            0 :     dmcbr->register_callback<GenericReceiverConcept::TypeErasedPayload>(m_raw_data_receiver_connection_name, m_consume_callback);
     212            0 :   }
     213              : 
     214            0 :   for (std::size_t i = 0; i < m_writers.size(); ++i) {
     215            0 :     const auto& writer_config = m_writer_configs[i];
     216            0 :     std::visit([this, &writer_config](auto& writer) { writer.configure(m_io_context, writer_config); }, m_writers[i]);
     217              :   }
     218            0 : }
     219              : 
     220              : void
     221            0 : SocketWriterModule::do_start(const CommandData_t&)
     222              : {
     223            0 :   for (const auto& writer_config : m_writer_configs) {
     224              :     // Reset opmon variables
     225            0 :     writer_config.socket_stats->sum_payloads = 0;
     226            0 :     writer_config.socket_stats->num_payloads = 0;
     227            0 :     writer_config.socket_stats->sum_bytes = 0;
     228            0 :     writer_config.socket_stats->rawq_timeout_count = 0;
     229            0 :     writer_config.socket_stats->stats_packet_count = 0;
     230              :   }
     231              : 
     232            0 :   m_t0 = std::chrono::high_resolution_clock::now();
     233              : 
     234            0 :   if (!m_callback_mode) {
     235            0 :     m_run_marker.store(true);
     236            0 :     m_consumer_thread.set_work(&SocketWriterModule::run_consume, this);
     237              :   }
     238              : 
     239            0 :   m_io_thread = std::jthread([this] { m_io_context.run(); });
     240            0 : }
     241              : 
     242              : void
     243            0 : SocketWriterModule::do_stop(const CommandData_t&)
     244              : {
     245            0 :   if (!m_callback_mode) {
     246            0 :     m_run_marker.store(false);
     247            0 :     while (!m_consumer_thread.get_readiness()) {
     248            0 :       std::this_thread::sleep_for(std::chrono::milliseconds(10));
     249              :     }
     250              :   }
     251              : 
     252            0 :   for (auto& writer : m_writers) {
     253            0 :     std::visit([](auto& writer) { writer.stop(); }, writer);
     254              :   }
     255              : 
     256            0 :   m_work_guard.reset();
     257            0 : }
     258              : 
     259              : void
     260            0 : SocketWriterModule::generate_opmon_data()
     261              : {
     262            0 :   for (const auto& writer_config : m_writer_configs) {
     263            0 :     opmon::SocketWriterStats stats;
     264            0 :     stats.set_sum_payloads(writer_config.socket_stats->sum_payloads.load());
     265            0 :     stats.set_num_payloads(writer_config.socket_stats->num_payloads.exchange(0));
     266            0 :     stats.set_sum_bytes(writer_config.socket_stats->sum_bytes.load());
     267            0 :     stats.set_num_data_input_timeouts(writer_config.socket_stats->rawq_timeout_count.exchange(0));
     268              : 
     269            0 :     auto now = std::chrono::high_resolution_clock::now();
     270            0 :     int new_packets = writer_config.socket_stats->stats_packet_count.exchange(0);
     271            0 :     double seconds = std::chrono::duration_cast<std::chrono::microseconds>(now - m_t0).count() / 1000000.;
     272            0 :     m_t0 = now;
     273              : 
     274            0 :     stats.set_rate_payloads_consumed(new_packets / seconds / 1000.);
     275              : 
     276            0 :     publish(std::move(stats), { { "socket-writer", std::to_string(writer_config.remote_port) } });
     277            0 :   }
     278            0 : }
     279              : 
     280              : void
     281            0 : SocketWriterModule::TCPWriter::configure(boost::asio::io_context& io_context, const WriterConfig& writer_config)
     282              : {
     283            0 :   m_socket_stats = writer_config.socket_stats;
     284              : 
     285            0 :   while (true) {
     286            0 :     try {
     287            0 :       m_socket = std::make_unique<boost::asio::ip::tcp::socket>(io_context);
     288              : 
     289            0 :       m_socket->connect(boost::asio::ip::tcp::endpoint(
     290            0 :         boost::asio::ip::address::from_string(writer_config.remote_ip), writer_config.remote_port));
     291            0 :       break;
     292            0 :     } catch (const boost::system::system_error& e) {
     293            0 :       TLOG() << "Connection failed: " << e.what() << ". Retrying in 1 second...";
     294            0 :       std::this_thread::sleep_for(std::chrono::seconds(1));
     295            0 :     }
     296              :   }
     297              : 
     298            0 :   TLOG() << "Established TCP connection to " << writer_config.remote_ip << ":" << writer_config.remote_port;
     299            0 : }
     300              : 
     301              : boost::asio::awaitable<void>
     302            0 : SocketWriterModule::TCPWriter::start(GenericReceiverConcept::TypeErasedPayload payload) // TODO (DTE): Rename
     303              : {
     304              :   const auto bytes_sent =
     305              :     co_await boost::asio::async_write(*m_socket, boost::asio::buffer(payload.data, payload.size), boost::asio::use_awaitable);
     306              :   ++m_socket_stats->num_payloads;
     307              :   ++m_socket_stats->sum_payloads;
     308              :   m_socket_stats->sum_bytes.fetch_add(bytes_sent);
     309              :   ++m_socket_stats->stats_packet_count;
     310            0 : }
     311              : 
     312              : void
     313            0 : SocketWriterModule::TCPWriter::stop()
     314              : {
     315            0 :   m_socket->shutdown(boost::asio::ip::tcp::socket::shutdown_both);
     316            0 :   m_socket->close();
     317            0 :   TLOG() << "Shutdown TCP connection";
     318            0 : }
     319              : 
     320              : void
     321            0 : SocketWriterModule::UDPWriter::configure(boost::asio::io_context& io_context, const WriterConfig& writer_config)
     322              : {
     323            0 :   m_writer_config = writer_config;
     324              : 
     325              :   // Let the OS pick an available local IP and port for sending packets
     326            0 :   const boost::asio::ip::udp::endpoint sender_endpoint(boost::asio::ip::udp::v4(), 0);
     327              : 
     328            0 :   m_socket = std::make_unique<boost::asio::ip::udp::socket>(io_context, sender_endpoint);
     329              : 
     330            0 :   TLOG() << "Created UDP socket on " << m_socket->local_endpoint().address() << ":"
     331            0 :          << m_socket->local_endpoint().port();
     332            0 : }
     333              : 
     334              : boost::asio::awaitable<void>
     335            0 : SocketWriterModule::UDPWriter::start(GenericReceiverConcept::TypeErasedPayload payload)
     336              : {
     337              :   boost::asio::ip::udp::endpoint receiver_endpoint(boost::asio::ip::address::from_string(m_writer_config.remote_ip),
     338              :                                                    m_writer_config.remote_port);
     339              :   const auto bytes_sent = co_await m_socket->async_send_to(
     340              :       boost::asio::buffer(payload.data, payload.size), receiver_endpoint, boost::asio::use_awaitable);
     341              :   ++m_writer_config.socket_stats->num_payloads;
     342              :   ++m_writer_config.socket_stats->sum_payloads;
     343              :   m_writer_config.socket_stats->sum_bytes.fetch_add(bytes_sent);
     344              :   ++m_writer_config.socket_stats->stats_packet_count;  
     345            0 : }
     346              : 
     347              : void
     348            0 : SocketWriterModule::UDPWriter::stop()
     349              : {
     350            0 :   m_socket->close();
     351            0 :   TLOG() << "Closed UDP socket";
     352            0 : }
     353              : 
     354              : } // namespace dunedaq::asiolibs
     355              : 
     356            0 : DEFINE_DUNE_DAQ_MODULE(dunedaq::asiolibs::SocketWriterModule)
        

Generated by: LCOV version 2.0-1