LCOV - code coverage report
Current view: top level - ipm/plugins - ZmqSubscriber.cpp (source / functions) Coverage Total Hit
Test: code.result Lines: 81.9 % 105 86
Test Date: 2025-12-21 13:07:08 Functions: 100.0 % 16 16

            Line data    Source code
       1              : 
       2              : /**
       3              :  *
       4              :  * @file ZmqSubscriber.cpp ZmqSubscriber messaging class definitions
       5              :  *
       6              :  * This is part of the DUNE DAQ Application Framework, copyright 2020.
       7              :  * Licensing/copyright details are in the COPYING file that you should have
       8              :  * received with this code.
       9              :  */
      10              : 
      11              : #include "CallbackAdapter.hpp"
      12              : #include "ipm/Subscriber.hpp"
      13              : #include "ipm/ZmqContext.hpp"
      14              : 
      15              : #include "logging/Logging.hpp"
      16              : 
      17              : #include <set>
      18              : #include <string>
      19              : #include <vector>
      20              : 
      21              : namespace dunedaq::ipm {
      22              : 
      23              : class ZmqSubscriber : public Subscriber
      24              : {
      25              : public:
      26           13 :   ZmqSubscriber()
      27           13 :     : m_socket(ZmqContext::instance().GetContext(), zmq::socket_type::sub)
      28              :   {
      29           13 :   }
      30              : 
      31           26 :   ~ZmqSubscriber()
      32           13 :   {
      33           13 :     unregister_callback();
      34              :     // Probably (cpp)zmq does this in the socket dtor anyway, but I guess it doesn't hurt to be explicit
      35           13 :     if (!m_connection_strings.empty() && m_socket_connected) {
      36           10 :       m_socket_connected = false;
      37           25 :       for (auto& conn_string : m_connection_strings) {
      38           15 :         try {
      39           15 :           m_socket.disconnect(conn_string);
      40            0 :         } catch (zmq::error_t const& err) {
      41            0 :           ers::error(ZmqOperationError(ERS_HERE, "disconnect", "receive", err.what(), conn_string));
      42            0 :         }
      43              :       }
      44              :     }
      45           13 :     m_socket.close();
      46           26 :   }
      47              : 
      48           13 :   std::string connect_for_receives(const nlohmann::json& connection_info) override
      49              :   {
      50           13 :     std::set<std::string> new_connection_strings;
      51           13 :     if (connection_info.contains("connection_string")) {
      52            5 :       if (m_connection_strings.count(connection_info.value<std::string>("connection_string", "")) == 0)
      53            5 :         new_connection_strings.insert(connection_info.value<std::string>("connection_string", ""));
      54              :     }
      55              : 
      56           26 :     for (auto& conn_string : connection_info.value<std::vector<std::string>>("connection_strings", {})) {
      57           13 :       if (m_connection_strings.count(conn_string) == 0)
      58           13 :         new_connection_strings.insert(conn_string);
      59           13 :     }
      60              : 
      61           13 :     if (m_connection_strings.size() == 0 && new_connection_strings.size() == 0) {
      62            0 :       throw ZmqOperationError(ERS_HERE, "resolve connections", "receive", "No valid connection strings passed", "");
      63              :     }
      64              : 
      65           13 :     if (!m_socket_connected) {
      66           11 :       try {
      67           11 :         m_socket.set(zmq::sockopt::rcvtimeo, 0); // Return immediately if we can't receive
      68            0 :       } catch (zmq::error_t const& err) {
      69            0 :         throw ZmqOperationError(ERS_HERE, "set timeout", "receive", err.what(), *m_connection_strings.begin());
      70            0 :       }
      71              :     }
      72           31 :     for (auto& conn_string : new_connection_strings) {
      73           18 :       try {
      74           18 :         TLOG_DEBUG(TLVL_CONNECTIONSTRING) << "Connection String is " << conn_string;
      75           18 :         m_socket.connect(conn_string);
      76           15 :         m_connection_strings.insert(conn_string);
      77            3 :       } catch (zmq::error_t const& err) {
      78            3 :         ers::error(ZmqOperationError(ERS_HERE, "connect", "receive", err.what(), conn_string));
      79            3 :       }
      80              :     }
      81           13 :     m_socket_connected = true;
      82           13 :     m_callback_adapter.set_receiver(this);
      83              : 
      84           13 :     if (m_connection_strings.size() > 0) {
      85           10 :       return *m_connection_strings.begin();
      86              :     }
      87            3 :     return {};
      88           13 :   }
      89              : 
      90           62 :   bool can_receive() const noexcept override { return m_socket_connected; }
      91              : 
      92           12 :   void subscribe(std::string const& topic) override
      93              :   {
      94           12 :     try {
      95           12 :       m_socket.set(zmq::sockopt::subscribe, topic);
      96            0 :     } catch (zmq::error_t const& err) {
      97            0 :       throw ZmqSubscribeError(ERS_HERE, err.what(), topic);
      98            0 :     }
      99           12 :   }
     100            2 :   void unsubscribe(std::string const& topic) override
     101              :   {
     102            2 :     try {
     103            2 :       m_socket.set(zmq::sockopt::unsubscribe, topic);
     104            0 :     } catch (zmq::error_t const& err) {
     105            0 :       throw ZmqUnsubscribeError(ERS_HERE, err.what(), topic);
     106            0 :     }
     107            2 :   }
     108              : 
     109            1 :   void register_callback(std::function<void(Response&)> callback) override
     110              :   {
     111            1 :     m_callback_adapter.set_callback(callback);
     112            1 :   }
     113           14 :   void unregister_callback() override { m_callback_adapter.clear_callback(); }
     114              : 
     115              : protected:
     116           53 :   Receiver::Response receive_(const duration_t& timeout, bool no_tmoexcept_mode) override
     117              :   {
     118           53 :     Receiver::Response output;
     119           53 :     zmq::message_t hdr;
     120           53 :     zmq::message_t msg;
     121           53 :     zmq::recv_result_t res{};
     122              : 
     123           53 :     auto start_time = std::chrono::steady_clock::now();
     124         2164 :     do {
     125              : 
     126         2164 :       try {
     127         2164 :         TLOG_DEBUG(TLVL_ZMQSUBSCRIBER_RECV_HDR) << "Subscriber: Going to receive header";
     128         2164 :         res = m_socket.recv(hdr);
     129         2164 :         TLOG_DEBUG(TLVL_ZMQSUBSCRIBER_RECV_HDR_2)
     130         2164 :           << "Subscriber: Recv res=" << res.value_or(0) << " for header (hdr.size() == " << hdr.size() << ")";
     131            0 :       } catch (zmq::error_t const& err) {
     132            0 :         throw ZmqReceiveError(ERS_HERE, err.what(), "header");
     133            0 :       }
     134         2164 :       if (res || hdr.more()) {
     135           16 :         TLOG_DEBUG(TLVL_ZMQSUBSCRIBER_RECV_DATA) << "Subscriber: Going to receive data";
     136           16 :         output.metadata.resize(hdr.size());
     137           16 :         memcpy(&output.metadata[0], hdr.data(), hdr.size());
     138              : 
     139              :         // ZMQ guarantees that the entire message has arrived
     140              : 
     141           16 :         try {
     142           16 :           res = m_socket.recv(msg);
     143            0 :         } catch (zmq::error_t const& err) {
     144            0 :           throw ZmqReceiveError(ERS_HERE, err.what(), "data");
     145            0 :         }
     146           16 :         TLOG_DEBUG(TLVL_ZMQSUBSCRIBER_RECV_DATA_2)
     147           16 :           << "Subscriber: Recv res=" << res.value_or(0) << " for data (msg.size() == " << msg.size() << ")";
     148           16 :         output.data.resize(msg.size());
     149           16 :         memcpy(&output.data[0], msg.data(), msg.size());
     150         2148 :       } else if (timeout > duration_t::zero()) {
     151         2127 :         usleep(1000);
     152              :       }
     153         2164 :     } while (std::chrono::duration_cast<duration_t>(std::chrono::steady_clock::now() - start_time) < timeout &&
     154         2179 :              res.value_or(0) == 0);
     155              : 
     156           53 :     if (res.value_or(0) == 0 && !no_tmoexcept_mode) {
     157           37 :       throw ReceiveTimeoutExpired(ERS_HERE, timeout.count());
     158              :     }
     159              : 
     160           16 :     TLOG_DEBUG(TLVL_ZMQSUBSCRIBER_RECV_END) << "Subscriber: Returning output with metadata size "
     161           16 :                                             << output.metadata.size() << " and data size " << output.data.size();
     162           16 :     return output;
     163          127 :   }
     164              : 
     165              : private:
     166              :   zmq::socket_t m_socket;
     167              :   std::set<std::string> m_connection_strings{};
     168              :   bool m_socket_connected{ false };
     169              :   CallbackAdapter m_callback_adapter;
     170              : };
     171              : } // namespace dunedaq::ipm
     172              : 
     173           13 : DEFINE_DUNE_IPM_RECEIVER(dunedaq::ipm::ZmqSubscriber)
        

Generated by: LCOV version 2.0-1