LCOV - code coverage report
Current view: top level - asiolibs/plugins - SocketReaderModule.cpp (source / functions) Coverage Total Hit
Test: code.result Lines: 0.0 % 127 0
Test Date: 2026-03-29 15:29:34 Functions: 0.0 % 29 0

            Line data    Source code
       1              : /**
       2              :  * @file SocketReaderModule.cpp Boost.Asio-based socket reader plugin for low-bandwidth devices
       3              :  *
       4              :  * This is part of the DUNE DAQ , copyright 2020.
       5              :  * Licensing/copyright details are in the COPYING file that you should have
       6              :  * received with this code.
       7              :  */
       8              : 
       9              : #include "SocketReaderModule.hpp"
      10              : 
      11              : #include "CreateSource.hpp"
      12              : 
      13              : #include "appmodel/DataReaderModule.hpp"
      14              : #include "appmodel/NetworkDetectorToDaqConnection.hpp"
      15              : #include "appmodel/SocketDataSender.hpp"
      16              : #include "appmodel/NWDetDataSender.hpp"
      17              : #include "appmodel/SocketReaderConf.hpp"
      18              : #include "appmodel/SocketReceiver.hpp"
      19              : #include "confmodel/DetectorStream.hpp"
      20              : #include "confmodel/GeoId.hpp"
      21              : #include "confmodel/QueueWithSourceId.hpp"
      22              : 
      23              : #include "datahandlinglibs/DataHandlingIssues.hpp"
      24              : 
      25              : #include "asiolibs/opmon/SocketReaderModule.pb.h"
      26              : 
      27              : #include <string>
      28              : #include <vector>
      29              : #include <memory>
      30              : #include <utility>
      31              : 
      32              : namespace dunedaq::asiolibs {
      33              : 
      34            0 : SocketReaderModule::SocketReaderModule(const std::string& name)
      35              :   : DAQModule(name)
      36            0 :   , m_work_guard(boost::asio::make_work_guard(m_io_context))
      37              : {
      38            0 :   register_command("conf", &SocketReaderModule::do_configure);
      39            0 :   register_command("start", &SocketReaderModule::do_start);
      40            0 :   register_command("stop_trigger_sources", &SocketReaderModule::do_stop);
      41            0 : }
      42              : 
      43              : inline void
      44              : tokenize(std::string const& str, const char delim, std::vector<std::string>& out)
      45              : {
      46              :   std::size_t start;
      47              :   std::size_t end = 0;
      48              :   while ((start = str.find_first_not_of(delim, end)) != std::string::npos) {
      49              :     end = str.find(delim, start);
      50              :     out.push_back(str.substr(start, end - start));
      51              :   }
      52              : }
      53              : 
      54              : void
      55            0 : SocketReaderModule::init(const std::shared_ptr<appfwk::ConfigurationManager> mcfg)
      56              : {
      57            0 :   m_cfg = mcfg;
      58            0 :   auto* mdal = m_cfg->get_dal<appmodel::DataReaderModule>(get_name());
      59            0 :   auto* module_conf = mdal->get_configuration()->cast<appmodel::SocketReaderConf>();
      60              : 
      61            0 :   const auto local_ip = module_conf->get_local_ip();
      62              : 
      63            0 :   m_socket_type = string_to_socket_type(module_conf->get_socket_type());
      64            0 :   if (m_socket_type != SocketType::TCP && m_socket_type != SocketType::UDP) {
      65            0 :     throw std::invalid_argument("Error: Only TCP and UDP are allowed!");
      66              :   }
      67              : 
      68            0 :   std::vector<const appmodel::NetworkDetectorToDaqConnection*> d2d_conns;
      69            0 :   for (auto* connection : mdal->get_connections()) {
      70              : 
      71            0 :     if (connection->is_disabled(*(m_cfg->get_session()))) {
      72            0 :       continue;
      73              :     }
      74              : 
      75            0 :     auto net_connection = connection->cast<appmodel::NetworkDetectorToDaqConnection>();
      76            0 :     if (net_connection == nullptr) {
      77            0 :         throw dunedaq::datahandlinglibs::InitializationError(
      78            0 :           ERS_HERE,
      79            0 :           fmt::format("Found connection {} of type {} while expecting type NetworkDetectorToDaqConnection",
      80            0 :                       connection->UID(),
      81            0 :                       connection->class_name()));
      82              :     }
      83            0 :     d2d_conns.push_back(net_connection);
      84              :   }
      85              : 
      86            0 :   for (auto* d2d_conn : d2d_conns) {
      87            0 :     for (auto* sender : d2d_conn->get_net_senders()) {
      88            0 :       auto* socket_sender = sender->cast<appmodel::SocketDataSender>();
      89              : 
      90            0 :       if (!socket_sender) {
      91            0 :         throw dunedaq::datahandlinglibs::InitializationError(
      92            0 :           ERS_HERE,
      93            0 :           fmt::format("Found {} of type {} in connection {} while expecting type SocketDataSender",
      94            0 :                       sender->UID(),
      95            0 :                       sender->class_name(),
      96            0 :                       d2d_conn->UID()));
      97              :       }
      98              : 
      99            0 :       if (socket_sender->is_disabled(*(m_cfg->get_session()))) {
     100            0 :         continue;
     101              :       }
     102              : 
     103            0 :       if (socket_sender->get_streams().size() > 1) {
     104            0 :         dunedaq::datahandlinglibs::GenericConfigurationError err(ERS_HERE,
     105            0 :                                                                  "Multiple streams currently are not supported!");
     106            0 :         ers::fatal(err);
     107            0 :         throw err;
     108            0 :       }
     109              : 
     110            0 :       for (auto* det_stream : socket_sender->get_streams()) {
     111            0 :         m_reader_configs.emplace_back(
     112            0 :           local_ip, socket_sender->get_port(), det_stream->get_source_id(), std::make_shared<SocketStats>());
     113              :       }
     114              :     }
     115              :   }
     116              : 
     117            0 :   m_readers.reserve(m_reader_configs.size());
     118            0 :   if (m_socket_type == SocketType::TCP) {
     119            0 :     for (std::size_t i = 0; i < m_reader_configs.size(); ++i) {
     120            0 :       m_readers.emplace_back(TCPReader());
     121              :     }
     122              :   } else {
     123            0 :     for (std::size_t i = 0; i < m_reader_configs.size(); ++i) {
     124            0 :       m_readers.emplace_back(UDPReader());
     125              :     }
     126              :   }
     127              : 
     128            0 :   auto callback_confs = mdal->get_raw_data_callbacks();
     129            0 :   for (auto* callback_conf : callback_confs) {
     130              : 
     131            0 :     auto ptr = m_sources[callback_conf->get_source_id()] = createSourceModel(callback_conf);
     132            0 :     register_node(callback_conf->UID(), ptr);
     133            0 :   }  
     134            0 : }
     135              : 
     136              : SocketReaderModule::SocketType
     137            0 : SocketReaderModule::string_to_socket_type(const std::string& socket_type) const
     138              : {
     139            0 :   if (socket_type == "TCP") {
     140              :     return SocketReaderModule::SocketType::TCP;
     141            0 :   } else if (socket_type == "UDP") {
     142            0 :     return SocketReaderModule::SocketType::UDP;
     143              :   }
     144              :   return SocketReaderModule::SocketType::INVALID;
     145              : }
     146              : 
     147              : void
     148            0 : SocketReaderModule::do_configure(const CommandData_t&)
     149              : {
     150            0 :   for (std::size_t i = 0; i < m_readers.size(); ++i) {
     151            0 :     const auto reader_config = m_reader_configs[i];
     152            0 :     std::visit([this, reader_config](auto& reader) { reader.configure(m_io_context, reader_config); }, m_readers[i]);
     153            0 :   }
     154            0 : }
     155              : 
     156              : void
     157            0 : SocketReaderModule::do_start(const CommandData_t&)
     158              : {
     159              :   // Setup callbacks on all sourcemodels
     160            0 :   for (auto& [sourceid, source] : m_sources) {
     161            0 :     source->acquire_callback();
     162              :   }
     163              : 
     164            0 :   m_io_thread = std::jthread([this] { m_io_context.run(); });
     165              : 
     166            0 :   for (auto& reader : m_readers) {
     167            0 :     boost::asio::co_spawn(m_io_context,
     168            0 :                           std::visit([this](auto& reader) { return reader.start(m_sources); }, reader),
     169              :                           boost::asio::detached);
     170              :   }
     171            0 : }
     172              : 
     173              : void
     174            0 : SocketReaderModule::do_stop(const CommandData_t&)
     175              : {
     176            0 :   for (auto& reader : m_readers) {
     177            0 :     std::visit([](auto& reader) { reader.stop(); }, reader);
     178              :   }
     179              : 
     180            0 :   m_work_guard.reset();
     181            0 : }
     182              : 
     183              : void
     184            0 : SocketReaderModule::generate_opmon_data()
     185              : {
     186            0 :   for (const auto& reader_config : m_reader_configs) {
     187            0 :     opmon::SocketReaderStats stats;
     188            0 :     stats.set_packets_received(reader_config.socket_stats->packets_received.load());
     189            0 :     stats.set_bytes_received(reader_config.socket_stats->bytes_received.load());
     190            0 :     publish(std::move(stats), { { "socket-reader", std::to_string(reader_config.local_port) } });
     191            0 :   }
     192            0 : }
     193              : 
     194              : void
     195            0 : SocketReaderModule::TCPReader::configure(boost::asio::io_context& io_context, const ReaderConfig& reader_config)
     196              : {
     197            0 :   m_source_id = reader_config.source_id;
     198            0 :   m_socket_stats = reader_config.socket_stats;
     199              : 
     200            0 :   m_socket = std::make_unique<boost::asio::ip::tcp::socket>(io_context);
     201              : 
     202            0 :   boost::asio::ip::tcp::acceptor acceptor(
     203              :     io_context,
     204            0 :     boost::asio::ip::tcp::endpoint(boost::asio::ip::address::from_string(reader_config.local_ip),
     205            0 :                                    reader_config.local_port));
     206              : 
     207            0 :   TLOG() << "Waiting for TCP connection at " << reader_config.local_ip << ":" << reader_config.local_port;
     208              : 
     209            0 :   acceptor.accept(*m_socket);
     210              : 
     211            0 :   TLOG() << "Established TCP connection from " << m_socket->remote_endpoint().address() << ":"
     212            0 :          << m_socket->remote_endpoint().port();
     213            0 : }
     214              : 
     215              : boost::asio::awaitable<void>
     216            0 : SocketReaderModule::TCPReader::start(const sid_to_source_map_t& sources)
     217              : {
     218              :   // FIXME (DTE): Just pass the relevant source instead of all sources
     219              :   const auto src_it = sources.find(m_source_id);
     220              :   if (src_it == sources.end()) {
     221            0 :     TLOG() << "Unexpected source ID! (" << m_source_id << ")";
     222              :     co_return;
     223              :   }
     224              : 
     225              :   const auto buffer_size = src_it->second->get_target_payload_size();
     226              :   std::vector<char> buffer(buffer_size);
     227              : 
     228              :   while (m_socket->is_open()) {
     229              :     const auto bytes_received =
     230              :       co_await boost::asio::async_read(*m_socket,
     231              :                                        boost::asio::buffer(buffer),
     232              :                                        boost::asio::use_awaitable);
     233              :     ++m_socket_stats->packets_received;
     234              :     m_socket_stats->bytes_received.fetch_add(bytes_received);
     235              :     src_it->second->handle_payload(buffer.data(), bytes_received);
     236              :   }
     237            0 : }
     238              : 
     239              : void
     240            0 : SocketReaderModule::TCPReader::stop()
     241              : {
     242            0 :   m_socket->shutdown(boost::asio::ip::tcp::socket::shutdown_both);
     243            0 :   m_socket->close();
     244            0 :   TLOG() << "Shutdown TCP connection";
     245            0 : }
     246              : 
     247              : void
     248            0 : SocketReaderModule::UDPReader::configure(boost::asio::io_context& io_context, const ReaderConfig& reader_config)
     249              : {
     250            0 :   m_source_id = reader_config.source_id;
     251            0 :   m_socket_stats = reader_config.socket_stats;
     252              : 
     253            0 :   const auto receiver_endpoint = boost::asio::ip::udp::endpoint(
     254            0 :     boost::asio::ip::address::from_string(reader_config.local_ip), reader_config.local_port);
     255            0 :   m_socket = std::make_unique<boost::asio::ip::udp::socket>(io_context, receiver_endpoint);
     256              : 
     257            0 :   TLOG() << "Created UDP socket on " << reader_config.local_ip << ":" << reader_config.local_port;
     258            0 : }
     259              : 
     260              : boost::asio::awaitable<void>
     261            0 : SocketReaderModule::UDPReader::start(const sid_to_source_map_t& sources)
     262              : {
     263              :   const auto src_it = sources.find(m_source_id);
     264              :   if (src_it == sources.end()) {
     265            0 :     TLOG() << "Unexpected source ID! (" << m_source_id << ")";
     266              :     co_return;
     267              :   }
     268              : 
     269              :   const auto buffer_size = src_it->second->get_target_payload_size();
     270              :   std::vector<char> buffer(buffer_size);
     271              :   boost::asio::ip::udp::endpoint sender_endpoint;
     272              : 
     273              :   while (m_socket->is_open()) {
     274              :     std::size_t bytes_received = co_await m_socket->async_receive_from(
     275              :       boost::asio::buffer(buffer), sender_endpoint, boost::asio::use_awaitable);
     276              : 
     277              :     ++m_socket_stats->packets_received;
     278              :     m_socket_stats->bytes_received.fetch_add(bytes_received);
     279              : 
     280              :     if (bytes_received == buffer_size) [[likely]] {
     281              :       src_it->second->handle_payload(buffer.data(), bytes_received);
     282              :     } else {
     283            0 :       TLOG() << "Payload is smaller than " << buffer_size << " (" << bytes_received << ")";
     284              :     }
     285              :   }
     286            0 : }
     287              : 
     288              : void
     289            0 : SocketReaderModule::UDPReader::stop()
     290              : {
     291            0 :   m_socket->close();
     292            0 :   TLOG() << "Closed UDP socket";
     293            0 : }
     294              : 
     295              : } // namespace dunedaq::asiolibs
     296              : 
     297            0 : DEFINE_DUNE_DAQ_MODULE(dunedaq::asiolibs::SocketReaderModule)
        

Generated by: LCOV version 2.0-1