LCOV - code coverage report
Current view: top level - ipm/plugins - ZmqSender.cpp (source / functions) Coverage Total Hit
Test: code.result Lines: 69.9 % 103 72
Test Date: 2026-02-16 10:18:04 Functions: 100.0 % 13 13

            Line data    Source code
       1              : /**
       2              :  *
       3              :  * @file ZmqSender.cpp ZmqSender messaging class definitions
       4              :  *
       5              :  * This is part of the DUNE DAQ Application Framework, copyright 2020.
       6              :  * Licensing/copyright details are in the COPYING file that you should have
       7              :  * received with this code.
       8              :  */
       9              : 
      10              : #include "ipm/Sender.hpp"
      11              : #include "ipm/ZmqContext.hpp"
      12              : 
      13              : #include "logging/Logging.hpp"
      14              : #include "zmq.hpp"
      15              : 
      16              : #include <string>
      17              : #include <vector>
      18              : 
      19              : namespace dunedaq::ipm {
      20              : class ZmqSender : public Sender
      21              : {
      22              : public:
      23           26 :   ZmqSender()
      24           26 :     : m_socket(ZmqContext::instance().GetContext(), zmq::socket_type::push)
      25              :   {
      26           26 :   }
      27              : 
      28           52 :   ~ZmqSender()
      29           26 :   {
      30              :     // Probably (cpp)zmq does this in the socket dtor anyway, but I guess it doesn't hurt to be explicit
      31           26 :     if (m_connection_info.connection_string != "" && m_socket_connected) {
      32           24 :       try {
      33           48 :         TLOG_DEBUG(TLVL_ZMQSENDER_DESTRUCTOR) << m_connection_info.connection_name << ": Setting socket HWM to zero";
      34           24 :         m_socket.set(zmq::sockopt::sndhwm, 1);
      35              : 
      36           48 :         TLOG_DEBUG(TLVL_ZMQSENDER_DESTRUCTOR) << m_connection_info.connection_name
      37           24 :           << ": Waiting up to 10s for socket to become writable before disconnecting";
      38           24 :         auto start_time = std::chrono::steady_clock::now();
      39           24 :         while (
      40       103803 :           std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - start_time).count() <
      41              :           10000) {
      42       103792 :           auto events = m_socket.get(zmq::sockopt::events);
      43       103792 :           if ((events & ZMQ_POLLOUT) != 0) {
      44              :             break;
      45              :           }
      46       103779 :           usleep(1000);
      47              :         }
      48           48 :         TLOG_DEBUG(TLVL_ZMQSENDER_DESTRUCTOR)
      49           24 :           << m_connection_info.connection_name << ": Disconnecting socket from " << m_connection_info.connection_string;
      50              : 
      51              : 
      52           24 :         m_socket.disconnect(m_connection_info.connection_string);
      53           24 :         m_socket_connected = false;
      54            0 :       } catch (zmq::error_t const& err) {
      55            0 :         ers::error(ZmqOperationError(ERS_HERE,
      56            0 :                                      m_connection_info.connection_name,
      57              :                                      "disconnect",
      58              :                                      "send",
      59            0 :                                      err.what(),
      60            0 :                                      m_connection_info.connection_string));
      61            0 :       }
      62              :     }
      63           26 :     m_socket.close();
      64           52 :   }
      65              : 
      66        20037 :   bool can_send() const noexcept override { return m_socket_connected; }
      67           27 :   std::string connect_for_sends(const ConnectionInfo& connection_info) override
      68              :   {
      69           27 :     m_connection_info = connection_info;
      70           27 :     try {
      71           27 :       m_socket.set(zmq::sockopt::sndtimeo, 0); // Return immediately if we can't send
      72            0 :     } catch (zmq::error_t const& err) {
      73            0 :       throw ZmqOperationError(ERS_HERE,
      74            0 :                               m_connection_info.connection_name,
      75              :                               "set timeout",
      76              :                               "send",
      77            0 :                               err.what(),
      78            0 :                               m_connection_info.connection_string);
      79            0 :     }
      80              : 
      81           27 :     auto hwm = connection_info.capacity;
      82           27 :     if (hwm > 0) {
      83            0 :       try {
      84            0 :         m_socket.set(zmq::sockopt::sndhwm, hwm);
      85            0 :       } catch (zmq::error_t const& err) {
      86            0 :         throw ZmqOperationError(ERS_HERE,
      87            0 :                                 m_connection_info.connection_name,
      88              :                                 "set hwm",
      89              :                                 "send",
      90            0 :                                 err.what(),
      91            0 :                                 m_connection_info.connection_string);
      92            0 :       }
      93              :     }
      94              : 
      95           27 :     auto connection_string = m_connection_info.connection_string;
      96              : 
      97           27 :     TLOG_DEBUG(TLVL_CONNECTIONSTRING) << "Connection String is " << connection_string;
      98           27 :     try {
      99           27 :       m_socket.set(zmq::sockopt::immediate, 1); // Don't queue messages to incomplete connections
     100            0 :     } catch (zmq::error_t const& err) {
     101            0 :       throw ZmqOperationError(
     102            0 :         ERS_HERE, m_connection_info.connection_name, "set immediate mode", "send", err.what(), connection_string);
     103            0 :     }
     104              : 
     105           27 :     try {
     106           27 :       m_socket.connect(connection_string);
     107           24 :       m_connection_info.connection_string = m_socket.get(zmq::sockopt::last_endpoint);
     108           24 :       m_socket_connected = true;
     109            3 :     } catch (zmq::error_t const& err) {
     110            3 :       ers::error(ZmqOperationError(
     111            6 :         ERS_HERE, m_connection_info.connection_name, "connect", "send", err.what(), connection_string));
     112            3 :     }
     113              : 
     114           27 :     if (!m_socket_connected) {
     115            3 :       throw ZmqOperationError(ERS_HERE,
     116            3 :                               m_connection_info.connection_name,
     117              :                               "connect",
     118              :                               "send",
     119              :                               "Operation failed for all resolved connection strings",
     120            6 :                               "");
     121              :     }
     122           48 :     return m_connection_info.connection_string;
     123           27 :   }
     124              : 
     125              : protected:
     126        20028 :   bool send_(const void* message,
     127              :              int N,
     128              :              const duration_t& timeout,
     129              :              std::string const& topic,
     130              :              bool no_tmoexcept_mode) override
     131              :   {
     132        20028 :     TLOG_DEBUG(TLVL_ZMQSENDER_SEND_START)
     133        20028 :       << m_connection_info.connection_name << ": Starting send of " << N << " bytes";
     134        20028 :     auto start_time = std::chrono::steady_clock::now();
     135        20028 :     zmq::send_result_t res{};
     136      7605704 :     do {
     137              : 
     138      7605704 :       zmq::message_t topic_msg(topic.c_str(), topic.size());
     139      7605704 :       try {
     140      7605704 :         res = m_socket.send(topic_msg, zmq::send_flags::sndmore);
     141            0 :       } catch (zmq::error_t const& err) {
     142            0 :         throw ZmqSendError(ERS_HERE, m_connection_info.connection_name, err.what(), topic.size(), topic);
     143            0 :       }
     144              : 
     145      7605704 :       if (!res || res != topic.size()) {
     146      7585677 :         TLOG_DEBUG(TLVL_ZMQSENDER_SEND_ERR) << m_connection_info.connection_name << ": Unable to send message";
     147      7585677 :         continue;
     148      7585677 :       }
     149              : 
     150        20027 :       zmq::message_t msg(message, N);
     151        20027 :       try {
     152        20027 :         res = m_socket.send(msg, zmq::send_flags::none);
     153            0 :       } catch (zmq::error_t const& err) {
     154            0 :         throw ZmqSendError(ERS_HERE, m_connection_info.connection_name, err.what(), N, topic);
     155            0 :       }
     156              : 
     157        20027 :       if (!res && timeout > duration_t::zero()) {
     158            0 :         usleep(1000);
     159              :       }
     160     15231436 :     } while (std::chrono::duration_cast<duration_t>(std::chrono::steady_clock::now() - start_time) < timeout && !res);
     161              : 
     162        20028 :     if (!res && !no_tmoexcept_mode) {
     163            1 :       throw SendTimeoutExpired(ERS_HERE, m_connection_info.connection_name, timeout.count());
     164              :     }
     165              : 
     166        20027 :     TLOG_DEBUG(TLVL_ZMQSENDER_SEND_END) << m_connection_info.connection_name << ": Completed send of " << N
     167        20027 :                                         << " bytes";
     168        20027 :     return res && res == N;
     169              :   }
     170              : 
     171              : private:
     172              :   zmq::socket_t m_socket;
     173              :   bool m_socket_connected{ false };
     174              : };
     175              : 
     176              : } // namespace dunedaq::ipm
     177              : 
     178           26 : DEFINE_DUNE_IPM_SENDER(dunedaq::ipm::ZmqSender)
        

Generated by: LCOV version 2.0-1