LCOV - code coverage report
Current view: top level - ipm/plugins - ZmqPublisher.cpp (source / functions) Coverage Total Hit
Test: code.result Lines: 70.8 % 113 80
Test Date: 2026-02-16 10:18:04 Functions: 92.3 % 13 12

            Line data    Source code
       1              : /**
       2              :  *
       3              :  * @file ZmqPublisher.cpp ZmqPublisher messaging class definitions
       4              :  *
       5              :  * This is part of the DUNE DAQ Application Framework, copyright 2020.
       6              :  * Licensing/copyright details are in the COPYING file that you should have
       7              :  * received with this code.
       8              :  */
       9              : 
      10              : #include "ipm/Sender.hpp"
      11              : #include "ipm/ZmqContext.hpp"
      12              : 
      13              : #include "logging/Logging.hpp"
      14              : #include "utilities/ZmqUri.hpp"
      15              : #include "zmq.hpp"
      16              : 
      17              : #include <string>
      18              : #include <vector>
      19              : 
      20              : namespace dunedaq::ipm {
      21              : class ZmqPublisher : public Sender
      22              : {
      23              : public:
      24           12 :   ZmqPublisher()
      25           12 :     : m_socket(ZmqContext::instance().GetContext(), zmq::socket_type::pub)
      26              :   {
      27           12 :   }
      28              : 
      29           24 :   ~ZmqPublisher()
      30           12 :   {
      31              :     // Probably (cpp)zmq does this in the socket dtor anyway, but I guess it doesn't hurt to be explicit
      32           12 :     if (m_connection_info.connection_string != "" && m_socket_connected) {
      33           10 :       try {
      34           20 :         TLOG_DEBUG(TLVL_ZMQPUBLISHER_DESTRUCTOR) << m_connection_info.connection_name << ": Setting socket HWM to zero";
      35           10 :         m_socket.set(zmq::sockopt::sndhwm, 1);
      36              : 
      37           20 :         TLOG_DEBUG(TLVL_ZMQPUBLISHER_DESTRUCTOR)
      38            0 :           << m_connection_info.connection_name
      39           10 :           << ": Waiting up to 10s for socket to become writable before disconnecting";
      40           10 :         auto start_time = std::chrono::steady_clock::now();
      41           10 :         while (
      42           10 :           std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - start_time).count() <
      43              :           10000) {
      44           10 :           auto events = m_socket.get(zmq::sockopt::events);
      45           10 :           if ((events & ZMQ_POLLOUT) != 0) {
      46              :             break;
      47              :           }
      48            0 :           usleep(1000);
      49              :         }
      50           20 :         TLOG_DEBUG(TLVL_ZMQPUBLISHER_DESTRUCTOR)
      51           10 :           << m_connection_info.connection_name << ": Disconnecting socket from " << m_connection_info.connection_string;
      52              : 
      53           10 :         m_socket.unbind(m_connection_info.connection_string);
      54           10 :         m_socket_connected = false;
      55            0 :       } catch (zmq::error_t const& err) {
      56            0 :         ers::error(ZmqOperationError(ERS_HERE,
      57            0 :                                      m_connection_info.connection_name,
      58              :                                      "unbind",
      59              :                                      "send",
      60            0 :                                      err.what(),
      61            0 :                                      m_connection_info.connection_string));
      62            0 :       }
      63              :     }
      64           12 :     m_socket.close();
      65           24 :   }
      66              : 
      67           29 :   bool can_send() const noexcept override { return m_socket_connected; }
      68           13 :   std::string connect_for_sends(const ConnectionInfo& connection_info) override
      69              :   {
      70           13 :     m_connection_info = connection_info;
      71           13 :     try {
      72           13 :       m_socket.set(zmq::sockopt::sndtimeo, 0); // Return immediately if we can't send
      73            0 :     } catch (zmq::error_t const& err) {
      74            0 :       throw ZmqOperationError(ERS_HERE,
      75            0 :                               m_connection_info.connection_name,
      76              :                               "set timeout",
      77              :                               "send",
      78            0 :                               err.what(),
      79            0 :                               m_connection_info.connection_string);
      80            0 :     }
      81              : 
      82           13 :     auto hwm = connection_info.capacity;
      83           13 :     if (hwm > 0) {
      84            0 :       try {
      85            0 :         m_socket.set(zmq::sockopt::sndhwm, hwm);
      86            0 :       } catch (zmq::error_t const& err) {
      87            0 :         throw ZmqOperationError(ERS_HERE,
      88            0 :                                 m_connection_info.connection_name,
      89              :                                 "set hwm",
      90              :                                 "send",
      91            0 :                                 err.what(),
      92            0 :                                 m_connection_info.connection_string);
      93            0 :       }
      94              :     }
      95              : 
      96           13 :     std::vector<std::string> resolved;
      97           13 :     try {
      98           13 :       utilities::ZmqUri this_uri(m_connection_info.connection_string);
      99           12 :       resolved = this_uri.get_uri_ip_addresses();
     100           13 :     } catch (utilities::InvalidUri const& err) {
     101            1 :       throw ZmqOperationError(ERS_HERE,
     102            1 :                               m_connection_info.connection_name,
     103              :                               "resolve connection_string",
     104              :                               "send",
     105              :                               "An invalid URI was passed",
     106              :                               m_connection_info.connection_string,
     107            2 :                               err);
     108            1 :     }
     109              : 
     110           12 :     if (resolved.size() == 0) {
     111            1 :       throw ZmqOperationError(ERS_HERE,
     112            1 :                               m_connection_info.connection_name,
     113              :                               "resolve connection_string",
     114              :                               "send",
     115              :                               "Unable to resolve connection_string",
     116            2 :                               m_connection_info.connection_string);
     117              :     }
     118           12 :     for (auto& connection_string : resolved) {
     119           11 :       TLOG_DEBUG(TLVL_CONNECTIONSTRING) << m_connection_info.connection_name << ": Connection String is "
     120           11 :                                         << connection_string;
     121           11 :       try {
     122           11 :         m_socket.bind(connection_string);
     123           10 :         m_connection_info.connection_string = m_socket.get(zmq::sockopt::last_endpoint);
     124           10 :         m_socket_connected = true;
     125           10 :         break;
     126            1 :       } catch (zmq::error_t const& err) {
     127            1 :         ers::error(ZmqOperationError(
     128            2 :           ERS_HERE, m_connection_info.connection_name, "bind", "send", err.what(), connection_string));
     129            1 :       }
     130              :     }
     131           11 :     if (!m_socket_connected) {
     132            1 :       throw ZmqOperationError(ERS_HERE,
     133            1 :                               m_connection_info.connection_name,
     134              :                               "bind",
     135              :                               "send",
     136              :                               "Bind failed for all resolved connection strings",
     137            2 :                               "");
     138              :     }
     139              : 
     140           20 :     return m_connection_info.connection_string;
     141           13 :   }
     142              : 
     143              : protected:
     144           20 :   bool send_(const void* message,
     145              :              int N,
     146              :              const duration_t& timeout,
     147              :              std::string const& topic,
     148              :              bool no_tmoexcept_mode) override
     149              :   {
     150           20 :     TLOG_DEBUG(TLVL_ZMQPUBLISHER_SEND_START)
     151           20 :       << m_connection_info.connection_name << ": Starting send of " << N << " bytes";
     152           20 :     auto start_time = std::chrono::steady_clock::now();
     153           20 :     zmq::send_result_t res{};
     154           20 :     do {
     155              : 
     156           20 :       zmq::message_t topic_msg(topic.c_str(), topic.size());
     157           20 :       try {
     158           20 :         res = m_socket.send(topic_msg, zmq::send_flags::sndmore);
     159            0 :       } catch (zmq::error_t const& err) {
     160            0 :         throw ZmqSendError(ERS_HERE, m_connection_info.connection_name, err.what(), topic.size(), topic);
     161            0 :       }
     162              : 
     163           20 :       if (!res || res != topic.size()) {
     164            0 :         TLOG_DEBUG(TLVL_ZMQPUBLISHER_SEND_ERR) << m_connection_info.connection_name << ": Unable to send message";
     165            0 :         continue;
     166            0 :       }
     167              : 
     168           20 :       zmq::message_t msg(message, N);
     169           20 :       try {
     170           20 :         res = m_socket.send(msg, zmq::send_flags::none);
     171            0 :       } catch (zmq::error_t const& err) {
     172            0 :         throw ZmqSendError(ERS_HERE, m_connection_info.connection_name, err.what(), N, topic);
     173            0 :       }
     174              : 
     175           20 :       if (!res && timeout > duration_t::zero()) {
     176            0 :         usleep(1000);
     177              :       }
     178           60 :     } while (std::chrono::duration_cast<duration_t>(std::chrono::steady_clock::now() - start_time) < timeout && !res);
     179              : 
     180           20 :     if (!res && !no_tmoexcept_mode) {
     181            0 :       throw SendTimeoutExpired(ERS_HERE, m_connection_info.connection_name, timeout.count());
     182              :     }
     183              : 
     184           20 :     TLOG_DEBUG(TLVL_ZMQPUBLISHER_SEND_END)
     185           20 :       << m_connection_info.connection_name << ": Completed send of " << N << " bytes";
     186           20 :     return res && res == N;
     187              :   }
     188              : 
     189              : private:
     190              :   zmq::socket_t m_socket;
     191              :   bool m_socket_connected{ false };
     192              : };
     193              : 
     194              : } // namespace dunedaq::ipm
     195              : 
     196           12 : DEFINE_DUNE_IPM_SENDER(dunedaq::ipm::ZmqPublisher)
        

Generated by: LCOV version 2.0-1