LCOV - code coverage report
Current view: top level - asiolibs/plugins - SocketReaderModule.cpp (source / functions) Coverage Total Hit
Test: code.result Lines: 0.0 % 151 0
Test Date: 2025-12-21 13:07:08 Functions: 0.0 % 30 0

            Line data    Source code
       1              : /**
       2              :  * @file SocketReaderModule.cpp Boost.Asio-based socket reader plugin for low-bandwidth devices
       3              :  *
       4              :  * This is part of the DUNE DAQ , copyright 2020.
       5              :  * Licensing/copyright details are in the COPYING file that you should have
       6              :  * received with this code.
       7              :  */
       8              : 
       9              : #include "SocketReaderModule.hpp"
      10              : 
      11              : #include "CreateSource.hpp"
      12              : 
      13              : #include "appmodel/DataReaderModule.hpp"
      14              : #include "appmodel/NetworkDetectorToDaqConnection.hpp"
      15              : #include "appmodel/SocketDataSender.hpp"
      16              : #include "appmodel/NWDetDataSender.hpp"
      17              : #include "appmodel/SocketReaderConf.hpp"
      18              : #include "appmodel/SocketReceiver.hpp"
      19              : #include "confmodel/DetectorStream.hpp"
      20              : #include "confmodel/GeoId.hpp"
      21              : #include "confmodel/QueueWithSourceId.hpp"
      22              : 
      23              : #include "datahandlinglibs/DataHandlingIssues.hpp"
      24              : 
      25              : #include "asiolibs/opmon/SocketReaderModule.pb.h"
      26              : 
      27              : #include <string>
      28              : #include <vector>
      29              : #include <memory>
      30              : #include <utility>
      31              : 
      32              : namespace dunedaq::asiolibs {
      33              : 
      34            0 : SocketReaderModule::SocketReaderModule(const std::string& name)
      35              :   : DAQModule(name)
      36            0 :   , m_work_guard(boost::asio::make_work_guard(m_io_context))
      37              : {
      38            0 :   register_command("conf", &SocketReaderModule::do_configure);
      39            0 :   register_command("start", &SocketReaderModule::do_start);
      40            0 :   register_command("stop_trigger_sources", &SocketReaderModule::do_stop);
      41            0 : }
      42              : 
      43              : inline void
      44            0 : tokenize(std::string const& str, const char delim, std::vector<std::string>& out)
      45              : {
      46            0 :   std::size_t start;
      47            0 :   std::size_t end = 0;
      48            0 :   while ((start = str.find_first_not_of(delim, end)) != std::string::npos) {
      49            0 :     end = str.find(delim, start);
      50            0 :     out.push_back(str.substr(start, end - start));
      51              :   }
      52            0 : }
      53              : 
      54              : void
      55            0 : SocketReaderModule::init(const std::shared_ptr<appfwk::ConfigurationManager> mcfg)
      56              : {
      57            0 :   m_cfg = mcfg;
      58            0 :   auto* mdal = m_cfg->get_dal<appmodel::DataReaderModule>(get_name());
      59            0 :   auto* module_conf = mdal->get_configuration()->cast<appmodel::SocketReaderConf>();
      60              : 
      61            0 :   const auto local_ip = module_conf->get_local_ip();
      62              : 
      63            0 :   m_socket_type = string_to_socket_type(module_conf->get_socket_type());
      64            0 :   if (m_socket_type != SocketType::TCP && m_socket_type != SocketType::UDP) {
      65            0 :     throw std::invalid_argument("Error: Only TCP and UDP are allowed!");
      66              :   }
      67              : 
      68            0 :   std::vector<const appmodel::NetworkDetectorToDaqConnection*> d2d_conns;
      69            0 :   for (auto* connection : mdal->get_connections()) {
      70              : 
      71            0 :     if (connection->is_disabled(*(m_cfg->get_session()))) {
      72            0 :       continue;
      73              :     }
      74              : 
      75            0 :     auto net_connection = connection->cast<appmodel::NetworkDetectorToDaqConnection>();
      76            0 :     if (net_connection == nullptr) {
      77            0 :         throw dunedaq::datahandlinglibs::InitializationError(
      78            0 :           ERS_HERE,
      79            0 :           fmt::format("Found connection {} of type {} while expecting type NetworkDetectorToDaqConnection",
      80            0 :                       connection->UID(),
      81            0 :                       connection->class_name()));
      82              :     }
      83            0 :     d2d_conns.push_back(net_connection);
      84              :   }
      85              : 
      86            0 :   for (auto* d2d_conn : d2d_conns) {
      87            0 :     for (auto* sender : d2d_conn->get_net_senders()) {
      88            0 :       auto* socket_sender = sender->cast<appmodel::SocketDataSender>();
      89              : 
      90            0 :       if (!socket_sender) {
      91            0 :         throw dunedaq::datahandlinglibs::InitializationError(
      92            0 :           ERS_HERE,
      93            0 :           fmt::format("Found {} of type {} in connection {} while expecting type SocketDataSender",
      94            0 :                       sender->UID(),
      95            0 :                       sender->class_name(),
      96            0 :                       d2d_conn->UID()));
      97              :       }
      98              : 
      99            0 :       if (socket_sender->is_disabled(*(m_cfg->get_session()))) {
     100            0 :         continue;
     101              :       }
     102              : 
     103            0 :       if (socket_sender->get_streams().size() > 1) {
     104            0 :         dunedaq::datahandlinglibs::GenericConfigurationError err(ERS_HERE,
     105            0 :                                                                  "Multiple streams currently are not supported!");
     106            0 :         ers::fatal(err);
     107            0 :         throw err;
     108            0 :       }
     109              : 
     110            0 :       for (auto* det_stream : socket_sender->get_streams()) {
     111            0 :         m_reader_configs.emplace_back(
     112            0 :           local_ip, socket_sender->get_port(), det_stream->get_source_id(), std::make_shared<SocketStats>());
     113              :       }
     114              :     }
     115              :   }
     116              : 
     117            0 :   m_readers.reserve(m_reader_configs.size());
     118            0 :   if (m_socket_type == SocketType::TCP) {
     119            0 :     for (std::size_t i = 0; i < m_reader_configs.size(); ++i) {
     120            0 :       m_readers.emplace_back(TCPReader());
     121              :     }
     122              :   } else {
     123            0 :     for (std::size_t i = 0; i < m_reader_configs.size(); ++i) {
     124            0 :       m_readers.emplace_back(UDPReader());
     125              :     }
     126              :   }
     127              : 
     128            0 :   if (mdal->get_outputs().empty()) {
     129            0 :     auto err = dunedaq::datahandlinglibs::InitializationError(ERS_HERE,
     130            0 :                                                               "No outputs defined for socket reader in configuration.");
     131            0 :     ers::fatal(err);
     132            0 :     throw err;
     133            0 :   }
     134              : 
     135            0 :   for (auto* con : mdal->get_outputs()) {
     136            0 :     auto* queue = con->cast<confmodel::QueueWithSourceId>();
     137            0 :     if (queue == nullptr) {
     138            0 :       auto err = dunedaq::datahandlinglibs::InitializationError(ERS_HERE, "Outputs are not of type QueueWithGeoId.");
     139            0 :       ers::fatal(err);
     140            0 :       throw err;
     141            0 :     }
     142              : 
     143              :     // Check for CB prefix indicating Callback use
     144            0 :     const char delim = '_';
     145            0 :     const std::string target = queue->UID();
     146            0 :     std::vector<std::string> words;
     147            0 :     tokenize(target, delim, words);
     148              : 
     149            0 :     bool callback_mode = false;
     150            0 :     if (words.front() == "cb") {
     151              :       callback_mode = true;
     152              :     }
     153              : 
     154            0 :     auto ptr = m_sources[queue->get_source_id()] = createSourceModel(queue->UID(), callback_mode);
     155            0 :     register_node(queue->UID(), ptr);
     156            0 :   }  
     157            0 : }
     158              : 
     159              : SocketReaderModule::SocketType
     160            0 : SocketReaderModule::string_to_socket_type(const std::string& socket_type) const
     161              : {
     162            0 :   if (socket_type == "TCP") {
     163              :     return SocketReaderModule::SocketType::TCP;
     164            0 :   } else if (socket_type == "UDP") {
     165            0 :     return SocketReaderModule::SocketType::UDP;
     166              :   }
     167              :   return SocketReaderModule::SocketType::INVALID;
     168              : }
     169              : 
     170              : void
     171            0 : SocketReaderModule::do_configure(const CommandData_t&)
     172              : {
     173            0 :   for (std::size_t i = 0; i < m_readers.size(); ++i) {
     174            0 :     const auto reader_config = m_reader_configs[i];
     175            0 :     std::visit([this, reader_config](auto& reader) { reader.configure(m_io_context, reader_config); }, m_readers[i]);
     176            0 :   }
     177            0 : }
     178              : 
     179              : void
     180            0 : SocketReaderModule::do_start(const CommandData_t&)
     181              : {
     182              :   // Setup callbacks on all sourcemodels
     183            0 :   for (auto& [sourceid, source] : m_sources) {
     184            0 :     source->acquire_callback();
     185              :   }
     186              : 
     187            0 :   m_io_thread = std::jthread([this] { m_io_context.run(); });
     188              : 
     189            0 :   for (auto& reader : m_readers) {
     190            0 :     boost::asio::co_spawn(m_io_context,
     191            0 :                           std::visit([this](auto& reader) { return reader.start(m_sources); }, reader),
     192              :                           boost::asio::detached);
     193              :   }
     194            0 : }
     195              : 
     196              : void
     197            0 : SocketReaderModule::do_stop(const CommandData_t&)
     198              : {
     199            0 :   for (auto& reader : m_readers) {
     200            0 :     std::visit([](auto& reader) { reader.stop(); }, reader);
     201              :   }
     202              : 
     203            0 :   m_work_guard.reset();
     204            0 : }
     205              : 
     206              : void
     207            0 : SocketReaderModule::generate_opmon_data()
     208              : {
     209            0 :   for (const auto& reader_config : m_reader_configs) {
     210            0 :     opmon::SocketReaderStats stats;
     211            0 :     stats.set_packets_received(reader_config.socket_stats->packets_received.load());
     212            0 :     stats.set_bytes_received(reader_config.socket_stats->bytes_received.load());
     213            0 :     publish(std::move(stats), { { "socket-reader", std::to_string(reader_config.local_port) } });
     214            0 :   }
     215            0 : }
     216              : 
     217              : void
     218            0 : SocketReaderModule::TCPReader::configure(boost::asio::io_context& io_context, const ReaderConfig& reader_config)
     219              : {
     220            0 :   m_source_id = reader_config.source_id;
     221            0 :   m_socket_stats = reader_config.socket_stats;
     222              : 
     223            0 :   m_socket = std::make_unique<boost::asio::ip::tcp::socket>(io_context);
     224              : 
     225            0 :   boost::asio::ip::tcp::acceptor acceptor(
     226              :     io_context,
     227            0 :     boost::asio::ip::tcp::endpoint(boost::asio::ip::address::from_string(reader_config.local_ip),
     228            0 :                                    reader_config.local_port));
     229              : 
     230            0 :   TLOG() << "Waiting for TCP connection at " << reader_config.local_ip << ":" << reader_config.local_port;
     231              : 
     232            0 :   acceptor.accept(*m_socket);
     233              : 
     234            0 :   TLOG() << "Established TCP connection from " << m_socket->remote_endpoint().address() << ":"
     235            0 :          << m_socket->remote_endpoint().port();
     236            0 : }
     237              : 
     238              : boost::asio::awaitable<void>
     239            0 : SocketReaderModule::TCPReader::start(const sid_to_source_map_t& sources)
     240              : {
     241              :   // FIXME (DTE): Just pass the relevant source instead of all sources
     242              :   const auto src_it = sources.find(m_source_id);
     243              :   if (src_it == sources.end()) {
     244            0 :     TLOG() << "Unexpected source ID! (" << m_source_id << ")";
     245              :     co_return;
     246              :   }
     247              : 
     248              :   const auto buffer_size = src_it->second->get_target_payload_size();
     249              :   std::vector<char> buffer(buffer_size);
     250              : 
     251              :   while (m_socket->is_open()) {
     252              :     const auto bytes_received =
     253              :       co_await boost::asio::async_read(*m_socket,
     254              :                                        boost::asio::buffer(buffer),
     255              :                                        boost::asio::use_awaitable);
     256              :     ++m_socket_stats->packets_received;
     257              :     m_socket_stats->bytes_received.fetch_add(bytes_received);
     258              :     src_it->second->handle_payload(buffer.data(), bytes_received);
     259              :   }
     260            0 : }
     261              : 
     262              : void
     263            0 : SocketReaderModule::TCPReader::stop()
     264              : {
     265            0 :   m_socket->shutdown(boost::asio::ip::tcp::socket::shutdown_both);
     266            0 :   m_socket->close();
     267            0 :   TLOG() << "Shutdown TCP connection";
     268            0 : }
     269              : 
     270              : void
     271            0 : SocketReaderModule::UDPReader::configure(boost::asio::io_context& io_context, const ReaderConfig& reader_config)
     272              : {
     273            0 :   m_source_id = reader_config.source_id;
     274            0 :   m_socket_stats = reader_config.socket_stats;
     275              : 
     276            0 :   const auto receiver_endpoint = boost::asio::ip::udp::endpoint(
     277            0 :     boost::asio::ip::address::from_string(reader_config.local_ip), reader_config.local_port);
     278            0 :   m_socket = std::make_unique<boost::asio::ip::udp::socket>(io_context, receiver_endpoint);
     279              : 
     280            0 :   TLOG() << "Created UDP socket on " << reader_config.local_ip << ":" << reader_config.local_port;
     281            0 : }
     282              : 
     283              : boost::asio::awaitable<void>
     284            0 : SocketReaderModule::UDPReader::start(const sid_to_source_map_t& sources)
     285              : {
     286              :   const auto src_it = sources.find(m_source_id);
     287              :   if (src_it == sources.end()) {
     288            0 :     TLOG() << "Unexpected source ID! (" << m_source_id << ")";
     289              :     co_return;
     290              :   }
     291              : 
     292              :   const auto buffer_size = src_it->second->get_target_payload_size();
     293              :   std::vector<char> buffer(buffer_size);
     294              :   boost::asio::ip::udp::endpoint sender_endpoint;
     295              : 
     296              :   while (m_socket->is_open()) {
     297              :     std::size_t bytes_received = co_await m_socket->async_receive_from(
     298              :       boost::asio::buffer(buffer), sender_endpoint, boost::asio::use_awaitable);
     299              : 
     300              :     ++m_socket_stats->packets_received;
     301              :     m_socket_stats->bytes_received.fetch_add(bytes_received);
     302              : 
     303              :     if (bytes_received == buffer_size) [[likely]] {
     304              :       src_it->second->handle_payload(buffer.data(), bytes_received);
     305              :     } else {
     306            0 :       TLOG() << "Payload is smaller than " << buffer_size << " (" << bytes_received << ")";
     307              :     }
     308              :   }
     309            0 : }
     310              : 
     311              : void
     312            0 : SocketReaderModule::UDPReader::stop()
     313              : {
     314            0 :   m_socket->close();
     315            0 :   TLOG() << "Closed UDP socket";
     316            0 : }
     317              : 
     318              : } // namespace dunedaq::asiolibs
     319              : 
     320            0 : DEFINE_DUNE_DAQ_MODULE(dunedaq::asiolibs::SocketReaderModule)
        

Generated by: LCOV version 2.0-1