LCOV - code coverage report
Current view: top level - ipm/plugins - ZmqReceiver.cpp (source / functions) Coverage Total Hit
Test: code.result Lines: 75.2 % 121 91
Test Date: 2026-02-16 10:18:04 Functions: 100.0 % 15 15

            Line data    Source code
       1              : /**
       2              :  *
       3              :  * @file ZmqReceiver.cpp ZmqReceiver messaging class definitions
       4              :  *
       5              :  * This is part of the DUNE DAQ Application Framework, copyright 2020.
       6              :  * Licensing/copyright details are in the COPYING file that you should have
       7              :  * received with this code.
       8              :  */
       9              : 
      10              : #include "CallbackAdapter.hpp"
      11              : #include "ipm/Receiver.hpp"
      12              : #include "ipm/ZmqContext.hpp"
      13              : 
      14              : #include "logging/Logging.hpp"
      15              : #include "utilities/ZmqUri.hpp"
      16              : 
      17              : #include <string>
      18              : #include <vector>
      19              : 
      20              : namespace dunedaq::ipm {
      21              : 
      22              : class ZmqReceiver : public Receiver
      23              : {
      24              : public:
      25           26 :   ZmqReceiver()
      26           26 :     : m_socket(ZmqContext::instance().GetContext(), zmq::socket_type::pull)
      27              :   {
      28           26 :   }
      29              : 
      30           52 :   ~ZmqReceiver()
      31           26 :   {
      32           26 :     unregister_callback();
      33              :     // Probably (cpp)zmq does this in the socket dtor anyway, but I guess it doesn't hurt to be explicit
      34           26 :     if (m_connection_info.connection_string != "" && m_socket_connected) {
      35           24 :       try {
      36           24 :         m_socket.unbind(m_connection_info.connection_string);
      37           24 :         m_socket_connected = false;
      38            0 :       } catch (zmq::error_t const& err) {
      39            0 :         ers::error(ZmqOperationError(ERS_HERE,
      40            0 :                                      m_connection_info.connection_name,
      41              :                                      "unbind",
      42              :                                      "receive",
      43            0 :                                      err.what(),
      44            0 :                                      m_connection_info.connection_string));
      45            0 :       }
      46              :     }
      47           26 :     m_socket.close();
      48           52 :   }
      49              : 
      50           27 :   std::string connect_for_receives(const ConnectionInfo& connection_info) override
      51              :   {
      52           27 :     m_connection_info = connection_info;
      53           27 :     try {
      54           27 :       m_socket.set(zmq::sockopt::rcvtimeo, 0); // Return immediately if we can't receive
      55            0 :     } catch (zmq::error_t const& err) {
      56            0 :       throw ZmqOperationError(ERS_HERE, m_connection_info.connection_name,
      57              :                               "set timeout",
      58              :                               "receive",
      59            0 :                               err.what(),
      60            0 :                               m_connection_info.connection_string);
      61            0 :     }
      62              : 
      63           27 :     try {
      64           27 :       m_socket.set(zmq::sockopt::linger, 0); // Close connection immediately when close is called
      65            0 :     } catch (zmq::error_t const& err) {
      66            0 :       throw ZmqOperationError(ERS_HERE,
      67            0 :                               m_connection_info.connection_name,
      68              :                               "set linger",
      69              :                               "receive",
      70            0 :                               err.what(),
      71            0 :                               m_connection_info.connection_string);
      72            0 :     }
      73              : 
      74           27 :     std::vector<std::string> resolved;
      75           27 :     try {
      76           27 :       utilities::ZmqUri this_uri(m_connection_info.connection_string);
      77           26 :       resolved = this_uri.get_uri_ip_addresses();
      78           27 :     } catch (utilities::InvalidUri const& err) {
      79            1 :       throw ZmqOperationError(ERS_HERE,
      80            1 :                               m_connection_info.connection_name,
      81              :                               "resolve connection_string",
      82              :                               "receive",
      83              :                               "An invalid URI was passed",
      84              :                               m_connection_info.connection_string,
      85            2 :                               err);
      86            1 :     }
      87              : 
      88           26 :     if (resolved.size() == 0) {
      89            1 :       throw ZmqOperationError(ERS_HERE,
      90            1 :                               m_connection_info.connection_name,
      91              :                               "resolve connection_string",
      92              :                               "receive",
      93              :                               "Unable to resolve connection_string",
      94            2 :                               m_connection_info.connection_string);
      95              :     }
      96           26 :     for (auto& connection_string : resolved) {
      97           25 :       TLOG_DEBUG(TLVL_CONNECTIONSTRING) << m_connection_info.connection_name << ": Connection String is " << connection_string;
      98           25 :       try {
      99           25 :         m_socket.bind(connection_string);
     100           24 :         m_connection_info.connection_string = m_socket.get(zmq::sockopt::last_endpoint);
     101           24 :         m_socket_connected = true;
     102           24 :         break;
     103            1 :       } catch (zmq::error_t const& err) {
     104            1 :         ers::error(ZmqOperationError(
     105            2 :           ERS_HERE, m_connection_info.connection_name, "bind", "receive", err.what(), connection_string));
     106            1 :       }
     107              :     }
     108           25 :     if (!m_socket_connected) {
     109            1 :       throw ZmqOperationError(ERS_HERE,
     110            1 :                               m_connection_info.connection_name,
     111              :                               "bind",
     112              :                               "receive",
     113              :                               "Bind failed for all resolved connection strings",
     114            2 :                               "");
     115              :     }
     116              : 
     117           24 :     m_callback_adapter.set_receiver(this);
     118              : 
     119           48 :     return m_connection_info.connection_string;
     120           27 :   }
     121              : 
     122        20767 :   bool can_receive() const noexcept override { return m_socket_connected; }
     123              : 
     124            1 :   void register_callback(std::function<void(Response&)> callback) override
     125              :   {
     126            1 :     m_callback_adapter.set_callback(callback);
     127            1 :   }
     128           27 :   void unregister_callback() override { m_callback_adapter.clear_callback(); }
     129              : 
     130            4 :   bool data_pending() override
     131              :   {
     132            4 :     try {
     133            4 :       auto events = m_socket.get(zmq::sockopt::events);
     134            4 :       return (events & ZMQ_POLLIN) != 0;
     135            0 :     } catch (zmq::error_t const& err) {
     136            0 :       ers::error(ZmqOperationError(ERS_HERE, m_connection_info.connection_name, "get events sockopt", "data_pending", err.what(), m_connection_info.connection_string));
     137            0 :     }
     138            0 :     return false;
     139              :   }
     140              : 
     141              : protected:
     142        20757 :   Receiver::Response receive_(const duration_t& timeout, bool no_tmoexcept_mode) override
     143              :   {
     144        20757 :     Receiver::Response output;
     145        20757 :     zmq::message_t hdr;
     146        20757 :     zmq::message_t msg;
     147        20757 :     zmq::recv_result_t res{};
     148              : 
     149        20757 :     auto start_time = std::chrono::steady_clock::now();
     150        34623 :     do {
     151              : 
     152        34623 :       try {
     153        34623 :         TLOG_DEBUG(TLVL_ZMQRECEIVER_RECV_HDR) << m_connection_info.connection_name << ": Going to receive header";
     154        34623 :         res = m_socket.recv(hdr);
     155        34623 :         TLOG_DEBUG(TLVL_ZMQRECEIVER_RECV_HDR_2) << m_connection_info.connection_name << ": Recv res=" << res.value_or(0)
     156        34623 :           << " for header (hdr.size() == " << hdr.size() << ")";
     157            0 :       } catch (zmq::error_t const& err) {
     158            0 :         throw ZmqReceiveError(ERS_HERE, m_connection_info.connection_name, err.what(), "header");
     159            0 :       }
     160        34623 :       if (res || hdr.more()) {
     161        20026 :         TLOG_DEBUG(TLVL_ZMQRECEIVER_RECV_DATA) << m_connection_info.connection_name << ": Going to receive data";
     162        20026 :         output.metadata.resize(hdr.size());
     163        20026 :         memcpy(&output.metadata[0], hdr.data(), hdr.size());
     164              : 
     165              :         // ZMQ guarantees that the entire message has arrived
     166              : 
     167        20026 :         try {
     168        20026 :           res = m_socket.recv(msg);
     169            0 :         } catch (zmq::error_t const& err) {
     170            0 :           throw ZmqReceiveError(ERS_HERE, m_connection_info.connection_name, err.what(), "data");
     171            0 :         }
     172        20026 :         TLOG_DEBUG(TLVL_ZMQRECEIVER_RECV_DATA_2)
     173            0 :           << m_connection_info.connection_name << ": Recv res=" << res.value_or(0)
     174        20026 :           << " for data (msg.size() == " << msg.size() << ")";
     175        20026 :         output.data.resize(msg.size());
     176        20026 :         memcpy(&output.data[0], msg.data(), msg.size());
     177        14597 :       } else if (timeout > duration_t::zero()) {
     178        14590 :         usleep(1000);
     179              :       }
     180        34623 :     } while (std::chrono::duration_cast<duration_t>(std::chrono::steady_clock::now() - start_time) < timeout &&
     181        54647 :              res.value_or(0) == 0);
     182              : 
     183        20757 :     if (res.value_or(0) == 0 && !no_tmoexcept_mode) {
     184            6 :       throw ReceiveTimeoutExpired(ERS_HERE, m_connection_info.connection_name, timeout.count());
     185              :     }
     186              : 
     187        20751 :     TLOG_DEBUG(TLVL_ZMQRECEIVER_RECV_END)
     188            0 :       << m_connection_info.connection_name << ": Returning output with metadata size "
     189            0 :       << output.metadata.size()
     190        20751 :       << " and data size " << output.data.size();
     191        20751 :     return output;
     192        20769 :   }
     193              : 
     194              : private:
     195              :   zmq::socket_t m_socket;
     196              :   bool m_socket_connected{ false };
     197              :   CallbackAdapter m_callback_adapter;
     198              : };
     199              : } // namespace dunedaq::ipm
     200              : 
     201           26 : DEFINE_DUNE_IPM_RECEIVER(dunedaq::ipm::ZmqReceiver)
        

Generated by: LCOV version 2.0-1