LCOV - code coverage report
Current view: top level - ipm/plugins - ZmqSender.cpp (source / functions) Coverage Total Hit
Test: code.result Lines: 72.8 % 92 67
Test Date: 2025-12-21 13:07:08 Functions: 100.0 % 13 13

            Line data    Source code
       1              : /**
       2              :  *
       3              :  * @file ZmqSender.cpp ZmqSender messaging class definitions
       4              :  *
       5              :  * This is part of the DUNE DAQ Application Framework, copyright 2020.
       6              :  * Licensing/copyright details are in the COPYING file that you should have
       7              :  * received with this code.
       8              :  */
       9              : 
      10              : #include "ipm/Sender.hpp"
      11              : #include "ipm/ZmqContext.hpp"
      12              : 
      13              : #include "logging/Logging.hpp"
      14              : #include "zmq.hpp"
      15              : 
      16              : #include <string>
      17              : #include <vector>
      18              : 
      19              : namespace dunedaq::ipm {
      20              : class ZmqSender : public Sender
      21              : {
      22              : public:
      23           26 :   ZmqSender()
      24           26 :     : m_socket(ZmqContext::instance().GetContext(), zmq::socket_type::push)
      25              :   {
      26           26 :   }
      27              : 
      28           52 :   ~ZmqSender()
      29           26 :   {
      30              :     // Probably (cpp)zmq does this in the socket dtor anyway, but I guess it doesn't hurt to be explicit
      31           26 :     if (m_connection_string != "" && m_socket_connected) {
      32           24 :       try {
      33           48 :         TLOG_DEBUG(TLVL_ZMQSENDER_DESTRUCTOR) << "Setting socket HWM to zero";
      34           24 :         m_socket.set(zmq::sockopt::sndhwm, 1);
      35              : 
      36           48 :         TLOG_DEBUG(TLVL_ZMQSENDER_DESTRUCTOR)
      37           24 :           << "Waiting up to 10s for socket to become writable before disconnecting";
      38           24 :         auto start_time = std::chrono::steady_clock::now();
      39           24 :         while (
      40       104001 :           std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - start_time).count() <
      41              :           10000) {
      42       103990 :           auto events = m_socket.get(zmq::sockopt::events);
      43       103990 :           if ((events & ZMQ_POLLOUT) != 0) {
      44              :             break;
      45              :           }
      46       103977 :           usleep(1000);
      47              :         }
      48           48 :         TLOG_DEBUG(TLVL_ZMQSENDER_DESTRUCTOR) << "Disconnecting socket from " << m_connection_string;
      49              : 
      50              : 
      51           24 :         m_socket.disconnect(m_connection_string);
      52           24 :         m_socket_connected = false;
      53            0 :       } catch (zmq::error_t const& err) {
      54            0 :         ers::error(ZmqOperationError(ERS_HERE, "disconnect", "send", err.what(), m_connection_string));
      55            0 :       }
      56              :     }
      57           26 :     m_socket.close();
      58           52 :   }
      59              : 
      60        20037 :   bool can_send() const noexcept override { return m_socket_connected; }
      61           27 :   std::string connect_for_sends(const nlohmann::json& connection_info) override
      62              :   {
      63           27 :     try {
      64           27 :       m_socket.set(zmq::sockopt::sndtimeo, 0); // Return immediately if we can't send
      65            0 :     } catch (zmq::error_t const& err) {
      66            0 :       throw ZmqOperationError(ERS_HERE,
      67              :                               "set timeout",
      68              :                               "send",
      69            0 :                               err.what(),
      70            0 :                               connection_info.value<std::string>("connection_string", "inproc://default"));
      71            0 :     }
      72              : 
      73           27 :     auto hwm = connection_info.value<int>("capacity", 0);
      74           27 :     if (hwm > 0) {
      75            0 :       try {
      76            0 :         m_socket.set(zmq::sockopt::sndhwm, hwm);
      77            0 :       } catch (zmq::error_t const& err) {
      78            0 :         throw ZmqOperationError(ERS_HERE,
      79              :                                 "set hwm",
      80              :                                 "send",
      81            0 :                                 err.what(),
      82            0 :                                 connection_info.value<std::string>("connection_string", "inproc://default"));
      83            0 :       }
      84              :     }
      85              : 
      86           27 :     auto connection_string = connection_info.value<std::string>("connection_string", "inproc://default");
      87              : 
      88           27 :     TLOG_DEBUG(TLVL_CONNECTIONSTRING) << "Connection String is " << connection_string;
      89           27 :     try {
      90           27 :       m_socket.set(zmq::sockopt::immediate, 1); // Don't queue messages to incomplete connections
      91            0 :     } catch (zmq::error_t const& err) {
      92            0 :       throw ZmqOperationError(ERS_HERE, "set immediate mode", "send", err.what(), connection_string);
      93            0 :     }
      94              : 
      95           27 :     try {
      96           27 :       m_socket.connect(connection_string);
      97           24 :       m_connection_string = m_socket.get(zmq::sockopt::last_endpoint);
      98           24 :       m_socket_connected = true;
      99            3 :     } catch (zmq::error_t const& err) {
     100            3 :       ers::error(ZmqOperationError(ERS_HERE, "connect", "send", err.what(), connection_string));
     101            3 :     }
     102              : 
     103           27 :     if (!m_socket_connected) {
     104            3 :       throw ZmqOperationError(ERS_HERE, "connect", "send", "Operation failed for all resolved connection strings", "");
     105              :     }
     106           48 :     return m_connection_string;
     107           27 :   }
     108              : 
     109              : protected:
     110        20028 :   bool send_(const void* message,
     111              :              int N,
     112              :              const duration_t& timeout,
     113              :              std::string const& topic,
     114              :              bool no_tmoexcept_mode) override
     115              :   {
     116        20028 :     TLOG_DEBUG(TLVL_ZMQSENDER_SEND_START)
     117        20028 :       << "Endpoint " << m_connection_string << ": Starting send of " << N << " bytes";
     118        20028 :     auto start_time = std::chrono::steady_clock::now();
     119        20028 :     zmq::send_result_t res{};
     120      7409368 :     do {
     121              : 
     122      7409368 :       zmq::message_t topic_msg(topic.c_str(), topic.size());
     123      7409368 :       try {
     124      7409368 :         res = m_socket.send(topic_msg, zmq::send_flags::sndmore);
     125            0 :       } catch (zmq::error_t const& err) {
     126            0 :         throw ZmqSendError(ERS_HERE, err.what(), topic.size(), topic);
     127            0 :       }
     128              : 
     129      7409368 :       if (!res || res != topic.size()) {
     130      7389341 :         TLOG_DEBUG(TLVL_ZMQSENDER_SEND_ERR) << "Endpoint " << m_connection_string << ": Unable to send message";
     131      7389341 :         continue;
     132      7389341 :       }
     133              : 
     134        20027 :       zmq::message_t msg(message, N);
     135        20027 :       try {
     136        20027 :         res = m_socket.send(msg, zmq::send_flags::none);
     137            0 :       } catch (zmq::error_t const& err) {
     138            0 :         throw ZmqSendError(ERS_HERE, err.what(), N, topic);
     139            0 :       }
     140              : 
     141        20027 :       if (!res && timeout > duration_t::zero()) {
     142            0 :         usleep(1000);
     143              :       }
     144     14838764 :     } while (std::chrono::duration_cast<duration_t>(std::chrono::steady_clock::now() - start_time) < timeout && !res);
     145              : 
     146        20028 :     if (!res && !no_tmoexcept_mode) {
     147            1 :       throw SendTimeoutExpired(ERS_HERE, timeout.count());
     148              :     }
     149              : 
     150        20027 :     TLOG_DEBUG(TLVL_ZMQSENDER_SEND_END) << "Endpoint " << m_connection_string << ": Completed send of " << N
     151        20027 :                                         << " bytes";
     152        20027 :     return res && res == N;
     153              :   }
     154              : 
     155              : private:
     156              :   zmq::socket_t m_socket;
     157              :   std::string m_connection_string;
     158              :   bool m_socket_connected{ false };
     159              : };
     160              : 
     161              : } // namespace dunedaq::ipm
     162              : 
     163           26 : DEFINE_DUNE_IPM_SENDER(dunedaq::ipm::ZmqSender)
        

Generated by: LCOV version 2.0-1