LCOV - code coverage report
Current view: top level - ipm/plugins - ZmqReceiver.cpp (source / functions) Coverage Total Hit
Test: code.result Lines: 78.8 % 104 82
Test Date: 2025-12-21 13:07:08 Functions: 100.0 % 14 14

            Line data    Source code
       1              : /**
       2              :  *
       3              :  * @file ZmqReceiver.cpp ZmqReceiver messaging class definitions
       4              :  *
       5              :  * This is part of the DUNE DAQ Application Framework, copyright 2020.
       6              :  * Licensing/copyright details are in the COPYING file that you should have
       7              :  * received with this code.
       8              :  */
       9              : 
      10              : #include "CallbackAdapter.hpp"
      11              : #include "ipm/Receiver.hpp"
      12              : #include "ipm/ZmqContext.hpp"
      13              : 
      14              : #include "logging/Logging.hpp"
      15              : #include "utilities/ZmqUri.hpp"
      16              : 
      17              : #include <string>
      18              : #include <vector>
      19              : 
      20              : namespace dunedaq::ipm {
      21              : 
      22              : class ZmqReceiver : public Receiver
      23              : {
      24              : public:
      25           26 :   ZmqReceiver()
      26           26 :     : m_socket(ZmqContext::instance().GetContext(), zmq::socket_type::pull)
      27              :   {
      28           26 :   }
      29              : 
      30           52 :   ~ZmqReceiver()
      31           26 :   {
      32           26 :     unregister_callback();
      33              :     // Probably (cpp)zmq does this in the socket dtor anyway, but I guess it doesn't hurt to be explicit
      34           26 :     if (m_connection_string != "" && m_socket_connected) {
      35           23 :       try {
      36           23 :         m_socket.unbind(m_connection_string);
      37           23 :         m_socket_connected = false;
      38            0 :       } catch (zmq::error_t const& err) {
      39            0 :         ers::error(ZmqOperationError(ERS_HERE, "unbind", "receive", err.what(), m_connection_string));
      40            0 :       }
      41              :     }
      42           26 :     m_socket.close();
      43           52 :   }
      44              : 
      45           26 :   std::string connect_for_receives(const nlohmann::json& connection_info) override
      46              :   {
      47           26 :     try {
      48           26 :       m_socket.set(zmq::sockopt::rcvtimeo, 0); // Return immediately if we can't receive
      49            0 :     } catch (zmq::error_t const& err) {
      50            0 :       throw ZmqOperationError(ERS_HERE,
      51              :                               "set timeout",
      52              :                               "receive",
      53            0 :                               err.what(),
      54            0 :                               connection_info.value<std::string>("connection_string", "inproc://default"));
      55            0 :     }
      56              : 
      57           26 :     try {
      58           26 :       m_socket.set(zmq::sockopt::linger, 0); // Close connection immediately when close is called
      59            0 :     } catch (zmq::error_t const& err) {
      60            0 :       throw ZmqOperationError(ERS_HERE,
      61              :                               "set linger",
      62              :                               "receive",
      63            0 :                               err.what(),
      64            0 :                               connection_info.value<std::string>("connection_string", "inproc://default"));
      65            0 :     }
      66              : 
      67           26 :     std::vector<std::string> resolved;
      68           26 :     try {
      69           27 :       utilities::ZmqUri this_uri(connection_info.value<std::string>("connection_string", "inproc://default"));
      70           25 :       resolved = this_uri.get_uri_ip_addresses();
      71           26 :     } catch (utilities::InvalidUri const& err) {
      72            1 :       throw ZmqOperationError(ERS_HERE,
      73              :                               "resolve connection_string",
      74              :                               "receive",
      75              :                               "An invalid URI was passed",
      76            2 :                               connection_info.value<std::string>("connection_string", "inproc://default"),
      77            4 :                               err);
      78            1 :     }
      79              : 
      80           25 :     if (resolved.size() == 0) {
      81            1 :       throw ZmqOperationError(ERS_HERE,
      82              :                               "resolve connection_string",
      83              :                               "receive",
      84              :                               "Unable to resolve connection_string",
      85            2 :                               connection_info.value<std::string>("connection_string", "inproc://default"));
      86              :     }
      87           25 :     for (auto& connection_string : resolved) {
      88           24 :       TLOG_DEBUG(TLVL_CONNECTIONSTRING) << "Connection String is " << connection_string;
      89           24 :       try {
      90           24 :         m_socket.bind(connection_string);
      91           23 :         m_connection_string = m_socket.get(zmq::sockopt::last_endpoint);
      92           23 :         m_socket_connected = true;
      93           23 :         break;
      94            1 :       } catch (zmq::error_t const& err) {
      95            1 :         ers::error(ZmqOperationError(ERS_HERE, "bind", "receive", err.what(), connection_string));
      96            1 :       }
      97              :     }
      98           24 :     if (!m_socket_connected) {
      99            1 :       throw ZmqOperationError(ERS_HERE, "bind", "receive", "Bind failed for all resolved connection strings", "");
     100              :     }
     101              : 
     102           23 :     m_callback_adapter.set_receiver(this);
     103              : 
     104           46 :     return m_connection_string;
     105           26 :   }
     106              : 
     107        20767 :   bool can_receive() const noexcept override { return m_socket_connected; }
     108              : 
     109            1 :   void register_callback(std::function<void(Response&)> callback) override
     110              :   {
     111            1 :     m_callback_adapter.set_callback(callback);
     112            1 :   }
     113           27 :   void unregister_callback() override { m_callback_adapter.clear_callback(); }
     114              : 
     115              : protected:
     116        20758 :   Receiver::Response receive_(const duration_t& timeout, bool no_tmoexcept_mode) override
     117              :   {
     118        20758 :     Receiver::Response output;
     119        20758 :     zmq::message_t hdr;
     120        20758 :     zmq::message_t msg;
     121        20758 :     zmq::recv_result_t res{};
     122              : 
     123        20758 :     auto start_time = std::chrono::steady_clock::now();
     124        34709 :     do {
     125              : 
     126        34709 :       try {
     127        34709 :         TLOG_DEBUG(TLVL_ZMQRECEIVER_RECV_HDR) << "Endpoint " << m_connection_string << ": Going to receive header";
     128        34709 :         res = m_socket.recv(hdr);
     129        34709 :         TLOG_DEBUG(TLVL_ZMQRECEIVER_RECV_HDR_2)
     130            0 :           << "Endpoint " << m_connection_string << ": Recv res=" << res.value_or(0)
     131        34709 :           << " for header (hdr.size() == " << hdr.size() << ")";
     132            0 :       } catch (zmq::error_t const& err) {
     133            0 :         throw ZmqReceiveError(ERS_HERE, err.what(), "header");
     134            0 :       }
     135        34709 :       if (res || hdr.more()) {
     136        20026 :         TLOG_DEBUG(TLVL_ZMQRECEIVER_RECV_DATA) << "Endpoint " << m_connection_string << ": Going to receive data";
     137        20026 :         output.metadata.resize(hdr.size());
     138        20026 :         memcpy(&output.metadata[0], hdr.data(), hdr.size());
     139              : 
     140              :         // ZMQ guarantees that the entire message has arrived
     141              : 
     142        20026 :         try {
     143        20026 :           res = m_socket.recv(msg);
     144            0 :         } catch (zmq::error_t const& err) {
     145            0 :           throw ZmqReceiveError(ERS_HERE, err.what(), "data");
     146            0 :         }
     147        20026 :         TLOG_DEBUG(TLVL_ZMQRECEIVER_RECV_DATA_2)
     148            0 :           << "Endpoint " << m_connection_string << ": Recv res=" << res.value_or(0)
     149        20026 :           << " for data (msg.size() == " << msg.size() << ")";
     150        20026 :         output.data.resize(msg.size());
     151        20026 :         memcpy(&output.data[0], msg.data(), msg.size());
     152        14683 :       } else if (timeout > duration_t::zero()) {
     153        14678 :         usleep(1000);
     154              :       }
     155        34709 :     } while (std::chrono::duration_cast<duration_t>(std::chrono::steady_clock::now() - start_time) < timeout &&
     156        54733 :              res.value_or(0) == 0);
     157              : 
     158        20758 :     if (res.value_or(0) == 0 && !no_tmoexcept_mode) {
     159            4 :       throw ReceiveTimeoutExpired(ERS_HERE, timeout.count());
     160              :     }
     161              : 
     162        20754 :     TLOG_DEBUG(TLVL_ZMQRECEIVER_RECV_END)
     163            0 :       << "Endpoint " << m_connection_string << ": Returning output with metadata size " << output.metadata.size()
     164        20754 :       << " and data size " << output.data.size();
     165        20754 :     return output;
     166        20766 :   }
     167              : 
     168              : private:
     169              :   zmq::socket_t m_socket;
     170              :   std::string m_connection_string;
     171              :   bool m_socket_connected{ false };
     172              :   CallbackAdapter m_callback_adapter;
     173              : };
     174              : } // namespace dunedaq::ipm
     175              : 
     176           26 : DEFINE_DUNE_IPM_RECEIVER(dunedaq::ipm::ZmqReceiver)
        

Generated by: LCOV version 2.0-1