LCOV - code coverage report
Current view: top level - ipm/plugins - ZmqPublisher.cpp (source / functions) Coverage Total Hit
Test: code.result Lines: 72.4 % 98 71
Test Date: 2025-12-21 13:07:08 Functions: 92.3 % 13 12

            Line data    Source code
       1              : /**
       2              :  *
       3              :  * @file ZmqPublisher.cpp ZmqPublisher messaging class definitions
       4              :  *
       5              :  * This is part of the DUNE DAQ Application Framework, copyright 2020.
       6              :  * Licensing/copyright details are in the COPYING file that you should have
       7              :  * received with this code.
       8              :  */
       9              : 
      10              : #include "ipm/Sender.hpp"
      11              : #include "ipm/ZmqContext.hpp"
      12              : 
      13              : #include "logging/Logging.hpp"
      14              : #include "utilities/ZmqUri.hpp"
      15              : #include "zmq.hpp"
      16              : 
      17              : #include <string>
      18              : #include <vector>
      19              : 
      20              : namespace dunedaq::ipm {
      21              : class ZmqPublisher : public Sender
      22              : {
      23              : public:
      24           12 :   ZmqPublisher()
      25           12 :     : m_socket(ZmqContext::instance().GetContext(), zmq::socket_type::pub)
      26              :   {
      27           12 :   }
      28              : 
      29           24 :   ~ZmqPublisher()
      30           12 :   {
      31              :     // Probably (cpp)zmq does this in the socket dtor anyway, but I guess it doesn't hurt to be explicit
      32           12 :     if (m_connection_string != "" && m_socket_connected) {
      33           10 :       try {
      34           20 :         TLOG_DEBUG(TLVL_ZMQPUBLISHER_DESTRUCTOR) << "Setting socket HWM to zero";
      35           10 :         m_socket.set(zmq::sockopt::sndhwm, 1);
      36              : 
      37           20 :         TLOG_DEBUG(TLVL_ZMQPUBLISHER_DESTRUCTOR) << "Waiting up to 10s for socket to become writable before disconnecting";
      38           10 :         auto start_time = std::chrono::steady_clock::now();
      39           20 :         while (std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - start_time).count() < 10000) {
      40           10 :           auto events = m_socket.get(zmq::sockopt::events);
      41           10 :           if ((events & ZMQ_POLLOUT) != 0) {
      42              :             break;
      43              :           }
      44            0 :           usleep(1000);
      45              :         }
      46           20 :         TLOG_DEBUG(TLVL_ZMQPUBLISHER_DESTRUCTOR) << "Disconnecting socket from " << m_connection_string;
      47              : 
      48           10 :         m_socket.unbind(m_connection_string);
      49           10 :         m_socket_connected = false;
      50            0 :       } catch (zmq::error_t const& err) {
      51            0 :         ers::error(ZmqOperationError(ERS_HERE, "unbind", "send", err.what(), m_connection_string));
      52            0 :       }
      53              :     }
      54           12 :     m_socket.close();
      55           24 :   }
      56              : 
      57           29 :   bool can_send() const noexcept override { return m_socket_connected; }
      58           13 :   std::string connect_for_sends(const nlohmann::json& connection_info) override
      59              :   {
      60           13 :     try {
      61           13 :       m_socket.set(zmq::sockopt::sndtimeo, 0); // Return immediately if we can't send
      62            0 :     } catch (zmq::error_t const& err) {
      63            0 :       throw ZmqOperationError(ERS_HERE,
      64              :                               "set timeout",
      65              :                               "send",
      66            0 :                               err.what(),
      67            0 :                               connection_info.value<std::string>("connection_string", "inproc://default"));
      68            0 :     }
      69              : 
      70           13 :     auto hwm = connection_info.value<int>("capacity", 0);
      71           13 :     if (hwm > 0) {
      72            0 :       try {
      73            0 :         m_socket.set(zmq::sockopt::sndhwm, hwm);
      74            0 :       } catch (zmq::error_t const& err) {
      75            0 :         throw ZmqOperationError(ERS_HERE,
      76              :                                 "set hwm",
      77              :                                 "send",
      78            0 :                                 err.what(),
      79            0 :                                 connection_info.value<std::string>("connection_string", "inproc://default"));
      80            0 :       }
      81              :     }
      82              : 
      83           13 :     std::vector<std::string> resolved;
      84           13 :     try {
      85           14 :       utilities::ZmqUri this_uri(connection_info.value<std::string>("connection_string", "inproc://default"));
      86           12 :       resolved = this_uri.get_uri_ip_addresses();
      87           13 :     } catch (utilities::InvalidUri const& err) {
      88            1 :       throw ZmqOperationError(ERS_HERE,
      89              :                               "resolve connection_string",
      90              :                               "send",
      91              :                               "An invalid URI was passed",
      92            2 :                               connection_info.value<std::string>("connection_string", "inproc://default"),
      93            4 :                               err);
      94            1 :     }
      95              : 
      96           12 :     if (resolved.size() == 0) {
      97            1 :       throw ZmqOperationError(ERS_HERE,
      98              :                               "resolve connection_string",
      99              :                               "send",
     100              :                               "Unable to resolve connection_string",
     101            2 :                               connection_info.value<std::string>("connection_string", "inproc://default"));
     102              :     }
     103           12 :     for (auto& connection_string : resolved) {
     104           11 :       TLOG_DEBUG(TLVL_CONNECTIONSTRING) << "Connection String is " << connection_string;
     105           11 :       try {
     106           11 :         m_socket.bind(connection_string);
     107           10 :         m_connection_string = m_socket.get(zmq::sockopt::last_endpoint);
     108           10 :         m_socket_connected = true;
     109           10 :         break;
     110            1 :       } catch (zmq::error_t const& err) {
     111            1 :         ers::error(ZmqOperationError(ERS_HERE, "bind", "send", err.what(), connection_string));
     112            1 :       }
     113              :     }
     114           11 :     if (!m_socket_connected) {
     115            1 :       throw ZmqOperationError(ERS_HERE, "bind", "send", "Bind failed for all resolved connection strings", "");
     116              :     }
     117              : 
     118           20 :     return m_connection_string;
     119           13 :   }
     120              : 
     121              : protected:
     122           20 :   bool send_(const void* message,
     123              :              int N,
     124              :              const duration_t& timeout,
     125              :              std::string const& topic,
     126              :              bool no_tmoexcept_mode) override
     127              :   {
     128           20 :     TLOG_DEBUG(TLVL_ZMQPUBLISHER_SEND_START)
     129           20 :       << "Endpoint " << m_connection_string << ": Starting send of " << N << " bytes";
     130           20 :     auto start_time = std::chrono::steady_clock::now();
     131           20 :     zmq::send_result_t res{};
     132           20 :     do {
     133              : 
     134           20 :       zmq::message_t topic_msg(topic.c_str(), topic.size());
     135           20 :       try {
     136           20 :         res = m_socket.send(topic_msg, zmq::send_flags::sndmore);
     137            0 :       } catch (zmq::error_t const& err) {
     138            0 :         throw ZmqSendError(ERS_HERE, err.what(), topic.size(), topic);
     139            0 :       }
     140              : 
     141           20 :       if (!res || res != topic.size()) {
     142            0 :         TLOG_DEBUG(TLVL_ZMQPUBLISHER_SEND_ERR) << "Endpoint " << m_connection_string << ": Unable to send message";
     143            0 :         continue;
     144            0 :       }
     145              : 
     146           20 :       zmq::message_t msg(message, N);
     147           20 :       try {
     148           20 :         res = m_socket.send(msg, zmq::send_flags::none);
     149            0 :       } catch (zmq::error_t const& err) {
     150            0 :         throw ZmqSendError(ERS_HERE, err.what(), N, topic);
     151            0 :       }
     152              : 
     153           20 :       if (!res && timeout > duration_t::zero()) {
     154            0 :         usleep(1000);
     155              :       }
     156           60 :     } while (std::chrono::duration_cast<duration_t>(std::chrono::steady_clock::now() - start_time) < timeout && !res);
     157              : 
     158           20 :     if (!res && !no_tmoexcept_mode) {
     159            0 :       throw SendTimeoutExpired(ERS_HERE, timeout.count());
     160              :     }
     161              : 
     162           20 :     TLOG_DEBUG(TLVL_ZMQPUBLISHER_SEND_END)
     163           20 :       << "Endpoint " << m_connection_string << ": Completed send of " << N << " bytes";
     164           20 :     return res && res == N;
     165              :   }
     166              : 
     167              : private:
     168              :   zmq::socket_t m_socket;
     169              :   std::string m_connection_string;
     170              :   bool m_socket_connected{ false };
     171              : };
     172              : 
     173              : } // namespace dunedaq::ipm
     174              : 
     175           12 : DEFINE_DUNE_IPM_SENDER(dunedaq::ipm::ZmqPublisher)
        

Generated by: LCOV version 2.0-1