LCOV - code coverage report
Current view: top level - ipm/plugins - ZmqSubscriber.cpp (source / functions) Coverage Total Hit
Test: code.result Lines: 73.4 % 124 91
Test Date: 2026-02-16 10:18:04 Functions: 100.0 % 17 17

            Line data    Source code
       1              : 
       2              : /**
       3              :  *
       4              :  * @file ZmqSubscriber.cpp ZmqSubscriber messaging class definitions
       5              :  *
       6              :  * This is part of the DUNE DAQ Application Framework, copyright 2020.
       7              :  * Licensing/copyright details are in the COPYING file that you should have
       8              :  * received with this code.
       9              :  */
      10              : 
      11              : #include "CallbackAdapter.hpp"
      12              : #include "ipm/Subscriber.hpp"
      13              : #include "ipm/ZmqContext.hpp"
      14              : 
      15              : #include "logging/Logging.hpp"
      16              : 
      17              : #include <set>
      18              : #include <string>
      19              : #include <vector>
      20              : 
      21              : namespace dunedaq::ipm {
      22              : 
      23              : class ZmqSubscriber : public Subscriber
      24              : {
      25              : public:
      26           13 :   ZmqSubscriber()
      27           13 :     : m_socket(ZmqContext::instance().GetContext(), zmq::socket_type::sub)
      28              :   {
      29           13 :   }
      30              : 
      31           26 :   ~ZmqSubscriber()
      32           13 :   {
      33           13 :     unregister_callback();
      34              :     // Probably (cpp)zmq does this in the socket dtor anyway, but I guess it doesn't hurt to be explicit
      35           13 :     if (!m_connection_strings.empty() && m_socket_connected) {
      36           10 :       m_socket_connected = false;
      37           25 :       for (auto& conn_string : m_connection_strings) {
      38           15 :         try {
      39           15 :           m_socket.disconnect(conn_string);
      40            0 :         } catch (zmq::error_t const& err) {
      41            0 :           ers::error(ZmqOperationError(
      42            0 :             ERS_HERE, m_connection_info.connection_name, "disconnect", "receive", err.what(), conn_string));
      43            0 :         }
      44              :       }
      45              :     }
      46           13 :     m_socket.close();
      47           26 :   }
      48              : 
      49           13 :   std::string connect_for_receives(const ConnectionInfo& connection_info) override
      50              :   {
      51           13 :     m_connection_info = connection_info;
      52           13 :     std::set<std::string> new_connection_strings;
      53           13 :     if (connection_info.connection_string != "") {
      54            5 :       new_connection_strings.insert(connection_info.connection_string);
      55              :     }
      56              : 
      57           26 :     for (auto& conn_string : connection_info.connection_strings) {
      58           13 :       if (m_connection_strings.count(conn_string) == 0)
      59           13 :         new_connection_strings.insert(conn_string);
      60              :     }
      61              : 
      62           13 :     if (m_connection_strings.size() == 0 && new_connection_strings.size() == 0) {
      63            0 :       throw ZmqOperationError(ERS_HERE,
      64            0 :                               m_connection_info.connection_name,
      65              :                               "resolve connections",
      66              :                               "receive",
      67              :                               "No valid connection strings passed",
      68            0 :                               "");
      69              :     }
      70              : 
      71           13 :     if (!m_socket_connected) {
      72           11 :       try {
      73           11 :         m_socket.set(zmq::sockopt::rcvtimeo, 0); // Return immediately if we can't receive
      74            0 :       } catch (zmq::error_t const& err) {
      75            0 :         throw ZmqOperationError(ERS_HERE,
      76            0 :                                 m_connection_info.connection_name,
      77              :                                 "set timeout",
      78              :                                 "receive",
      79            0 :                                 err.what(),
      80            0 :                                 *m_connection_strings.begin());
      81            0 :       }
      82              :     }
      83           31 :     for (auto& conn_string : new_connection_strings) {
      84           18 :       try {
      85           18 :         TLOG_DEBUG(TLVL_CONNECTIONSTRING)
      86           18 :           << m_connection_info.connection_name << ": Connection String is " << conn_string;
      87           18 :         m_socket.connect(conn_string);
      88           15 :         m_connection_strings.insert(conn_string);
      89            3 :       } catch (zmq::error_t const& err) {
      90            3 :         ers::error(ZmqOperationError(
      91            6 :           ERS_HERE, m_connection_info.connection_name, "connect", "receive", err.what(), conn_string));
      92            3 :       }
      93              :     }
      94           13 :     m_socket_connected = true;
      95           13 :     m_callback_adapter.set_receiver(this);
      96              : 
      97           13 :     if (m_connection_strings.size() > 0) {
      98           10 :       return *m_connection_strings.begin();
      99              :     }
     100            3 :     return {};
     101           13 :   }
     102              : 
     103           63 :   bool can_receive() const noexcept override { return m_socket_connected; }
     104              : 
     105           12 :   void subscribe(std::string const& topic) override
     106              :   {
     107           12 :     try {
     108           12 :       m_socket.set(zmq::sockopt::subscribe, topic);
     109            0 :     } catch (zmq::error_t const& err) {
     110            0 :       throw ZmqSubscribeError(ERS_HERE, m_connection_info.connection_name, err.what(), topic);
     111            0 :     }
     112           12 :   }
     113            2 :   void unsubscribe(std::string const& topic) override
     114              :   {
     115            2 :     try {
     116            2 :       m_socket.set(zmq::sockopt::unsubscribe, topic);
     117            0 :     } catch (zmq::error_t const& err) {
     118            0 :       throw ZmqUnsubscribeError(ERS_HERE, m_connection_info.connection_name, err.what(), topic);
     119            0 :     }
     120            2 :   }
     121              : 
     122            6 :   bool data_pending() override
     123              :   {
     124            6 :     try {
     125            6 :       auto events = m_socket.get(zmq::sockopt::events);
     126            6 :       return (events & ZMQ_POLLIN) != 0;
     127            0 :     } catch (zmq::error_t const& err) {
     128            0 :       ers::error(
     129            0 :         ZmqOperationError(ERS_HERE, m_connection_info.connection_name, "get events sockopt", "data_pending", err.what(), *m_connection_strings.begin()));
     130            0 :     }
     131            0 :     return false;
     132              :   }
     133              : 
     134            1 :   void register_callback(std::function<void(Response&)> callback) override
     135              :   {
     136            1 :     m_callback_adapter.set_callback(callback);
     137            1 :   }
     138           14 :   void unregister_callback() override { m_callback_adapter.clear_callback(); }
     139              : 
     140              : protected:
     141           54 :   Receiver::Response receive_(const duration_t& timeout, bool no_tmoexcept_mode) override
     142              :   {
     143           54 :     Receiver::Response output;
     144           54 :     zmq::message_t hdr;
     145           54 :     zmq::message_t msg;
     146           54 :     zmq::recv_result_t res{};
     147              : 
     148           54 :     auto start_time = std::chrono::steady_clock::now();
     149         2159 :     do {
     150              : 
     151         2159 :       try {
     152         2159 :         TLOG_DEBUG(TLVL_ZMQSUBSCRIBER_RECV_HDR) << m_connection_info.connection_name << ": Going to receive header";
     153         2159 :         res = m_socket.recv(hdr);
     154         2159 :         TLOG_DEBUG(TLVL_ZMQSUBSCRIBER_RECV_HDR_2)
     155            0 :           << m_connection_info.connection_name << ": Recv res=" << res.value_or(0)
     156         2159 :           << " for header (hdr.size() == " << hdr.size() << ")";
     157            0 :       } catch (zmq::error_t const& err) {
     158            0 :         throw ZmqReceiveError(ERS_HERE, m_connection_info.connection_name, err.what(), "header");
     159            0 :       }
     160         2159 :       if (res || hdr.more()) {
     161           16 :         TLOG_DEBUG(TLVL_ZMQSUBSCRIBER_RECV_DATA) << m_connection_info.connection_name << ": Going to receive data";
     162           16 :         output.metadata.resize(hdr.size());
     163           16 :         memcpy(&output.metadata[0], hdr.data(), hdr.size());
     164              : 
     165              :         // ZMQ guarantees that the entire message has arrived
     166              : 
     167           16 :         try {
     168           16 :           res = m_socket.recv(msg);
     169            0 :         } catch (zmq::error_t const& err) {
     170            0 :           throw ZmqReceiveError(ERS_HERE, m_connection_info.connection_name, err.what(), "data");
     171            0 :         }
     172           16 :         TLOG_DEBUG(TLVL_ZMQSUBSCRIBER_RECV_DATA_2)
     173            0 :           << m_connection_info.connection_name << ": Recv res=" << res.value_or(0)
     174           16 :           << " for data (msg.size() == " << msg.size() << ")";
     175           16 :         output.data.resize(msg.size());
     176           16 :         memcpy(&output.data[0], msg.data(), msg.size());
     177         2143 :       } else if (timeout > duration_t::zero()) {
     178         2121 :         usleep(1000);
     179              :       }
     180         2159 :     } while (std::chrono::duration_cast<duration_t>(std::chrono::steady_clock::now() - start_time) < timeout &&
     181         2174 :              res.value_or(0) == 0);
     182              : 
     183           54 :     if (res.value_or(0) == 0 && !no_tmoexcept_mode) {
     184           38 :       throw ReceiveTimeoutExpired(ERS_HERE, m_connection_info.connection_name, timeout.count());
     185              :     }
     186              : 
     187           16 :     TLOG_DEBUG(TLVL_ZMQSUBSCRIBER_RECV_END)
     188            0 :       << m_connection_info.connection_name << ": Returning output with metadata size " << output.metadata.size()
     189           16 :       << " and data size " << output.data.size();
     190           16 :     return output;
     191          130 :   }
     192              : 
     193              : private:
     194              :   zmq::socket_t m_socket;
     195              :   std::set<std::string> m_connection_strings{};
     196              :   bool m_socket_connected{ false };
     197              :   CallbackAdapter m_callback_adapter;
     198              : };
     199              : } // namespace dunedaq::ipm
     200              : 
     201           13 : DEFINE_DUNE_IPM_RECEIVER(dunedaq::ipm::ZmqSubscriber)
        

Generated by: LCOV version 2.0-1