Line data Source code
1 : /**
2 : *
3 : * @file ZmqReceiver.cpp ZmqReceiver messaging class definitions
4 : *
5 : * This is part of the DUNE DAQ Application Framework, copyright 2020.
6 : * Licensing/copyright details are in the COPYING file that you should have
7 : * received with this code.
8 : */
9 :
10 : #include "CallbackAdapter.hpp"
11 : #include "ipm/Receiver.hpp"
12 : #include "ipm/ZmqContext.hpp"
13 :
14 : #include "logging/Logging.hpp"
15 : #include "utilities/ZmqUri.hpp"
16 :
17 : #include <string>
18 : #include <vector>
19 :
20 : namespace dunedaq::ipm {
21 :
22 : class ZmqReceiver : public Receiver
23 : {
24 : public:
25 26 : ZmqReceiver()
26 26 : : m_socket(ZmqContext::instance().GetContext(), zmq::socket_type::pull)
27 : {
28 26 : }
29 :
30 52 : ~ZmqReceiver()
31 26 : {
32 26 : unregister_callback();
33 : // Probably (cpp)zmq does this in the socket dtor anyway, but I guess it doesn't hurt to be explicit
34 26 : if (m_connection_string != "" && m_socket_connected) {
35 23 : try {
36 23 : m_socket.unbind(m_connection_string);
37 23 : m_socket_connected = false;
38 0 : } catch (zmq::error_t const& err) {
39 0 : ers::error(ZmqOperationError(ERS_HERE, "unbind", "receive", err.what(), m_connection_string));
40 0 : }
41 : }
42 26 : m_socket.close();
43 52 : }
44 :
45 26 : std::string connect_for_receives(const nlohmann::json& connection_info) override
46 : {
47 26 : try {
48 26 : m_socket.set(zmq::sockopt::rcvtimeo, 0); // Return immediately if we can't receive
49 0 : } catch (zmq::error_t const& err) {
50 0 : throw ZmqOperationError(ERS_HERE,
51 : "set timeout",
52 : "receive",
53 0 : err.what(),
54 0 : connection_info.value<std::string>("connection_string", "inproc://default"));
55 0 : }
56 :
57 26 : try {
58 26 : m_socket.set(zmq::sockopt::linger, 0); // Close connection immediately when close is called
59 0 : } catch (zmq::error_t const& err) {
60 0 : throw ZmqOperationError(ERS_HERE,
61 : "set linger",
62 : "receive",
63 0 : err.what(),
64 0 : connection_info.value<std::string>("connection_string", "inproc://default"));
65 0 : }
66 :
67 26 : std::vector<std::string> resolved;
68 26 : try {
69 27 : utilities::ZmqUri this_uri(connection_info.value<std::string>("connection_string", "inproc://default"));
70 25 : resolved = this_uri.get_uri_ip_addresses();
71 26 : } catch (utilities::InvalidUri const& err) {
72 1 : throw ZmqOperationError(ERS_HERE,
73 : "resolve connection_string",
74 : "receive",
75 : "An invalid URI was passed",
76 2 : connection_info.value<std::string>("connection_string", "inproc://default"),
77 4 : err);
78 1 : }
79 :
80 25 : if (resolved.size() == 0) {
81 1 : throw ZmqOperationError(ERS_HERE,
82 : "resolve connection_string",
83 : "receive",
84 : "Unable to resolve connection_string",
85 2 : connection_info.value<std::string>("connection_string", "inproc://default"));
86 : }
87 25 : for (auto& connection_string : resolved) {
88 24 : TLOG_DEBUG(TLVL_CONNECTIONSTRING) << "Connection String is " << connection_string;
89 24 : try {
90 24 : m_socket.bind(connection_string);
91 23 : m_connection_string = m_socket.get(zmq::sockopt::last_endpoint);
92 23 : m_socket_connected = true;
93 23 : break;
94 1 : } catch (zmq::error_t const& err) {
95 1 : ers::error(ZmqOperationError(ERS_HERE, "bind", "receive", err.what(), connection_string));
96 1 : }
97 : }
98 24 : if (!m_socket_connected) {
99 1 : throw ZmqOperationError(ERS_HERE, "bind", "receive", "Bind failed for all resolved connection strings", "");
100 : }
101 :
102 23 : m_callback_adapter.set_receiver(this);
103 :
104 46 : return m_connection_string;
105 26 : }
106 :
107 20767 : bool can_receive() const noexcept override { return m_socket_connected; }
108 :
109 1 : void register_callback(std::function<void(Response&)> callback) override
110 : {
111 1 : m_callback_adapter.set_callback(callback);
112 1 : }
113 27 : void unregister_callback() override { m_callback_adapter.clear_callback(); }
114 :
115 : protected:
116 20758 : Receiver::Response receive_(const duration_t& timeout, bool no_tmoexcept_mode) override
117 : {
118 20758 : Receiver::Response output;
119 20758 : zmq::message_t hdr;
120 20758 : zmq::message_t msg;
121 20758 : zmq::recv_result_t res{};
122 :
123 20758 : auto start_time = std::chrono::steady_clock::now();
124 34709 : do {
125 :
126 34709 : try {
127 34709 : TLOG_DEBUG(TLVL_ZMQRECEIVER_RECV_HDR) << "Endpoint " << m_connection_string << ": Going to receive header";
128 34709 : res = m_socket.recv(hdr);
129 34709 : TLOG_DEBUG(TLVL_ZMQRECEIVER_RECV_HDR_2)
130 0 : << "Endpoint " << m_connection_string << ": Recv res=" << res.value_or(0)
131 34709 : << " for header (hdr.size() == " << hdr.size() << ")";
132 0 : } catch (zmq::error_t const& err) {
133 0 : throw ZmqReceiveError(ERS_HERE, err.what(), "header");
134 0 : }
135 34709 : if (res || hdr.more()) {
136 20026 : TLOG_DEBUG(TLVL_ZMQRECEIVER_RECV_DATA) << "Endpoint " << m_connection_string << ": Going to receive data";
137 20026 : output.metadata.resize(hdr.size());
138 20026 : memcpy(&output.metadata[0], hdr.data(), hdr.size());
139 :
140 : // ZMQ guarantees that the entire message has arrived
141 :
142 20026 : try {
143 20026 : res = m_socket.recv(msg);
144 0 : } catch (zmq::error_t const& err) {
145 0 : throw ZmqReceiveError(ERS_HERE, err.what(), "data");
146 0 : }
147 20026 : TLOG_DEBUG(TLVL_ZMQRECEIVER_RECV_DATA_2)
148 0 : << "Endpoint " << m_connection_string << ": Recv res=" << res.value_or(0)
149 20026 : << " for data (msg.size() == " << msg.size() << ")";
150 20026 : output.data.resize(msg.size());
151 20026 : memcpy(&output.data[0], msg.data(), msg.size());
152 14683 : } else if (timeout > duration_t::zero()) {
153 14678 : usleep(1000);
154 : }
155 34709 : } while (std::chrono::duration_cast<duration_t>(std::chrono::steady_clock::now() - start_time) < timeout &&
156 54733 : res.value_or(0) == 0);
157 :
158 20758 : if (res.value_or(0) == 0 && !no_tmoexcept_mode) {
159 4 : throw ReceiveTimeoutExpired(ERS_HERE, timeout.count());
160 : }
161 :
162 20754 : TLOG_DEBUG(TLVL_ZMQRECEIVER_RECV_END)
163 0 : << "Endpoint " << m_connection_string << ": Returning output with metadata size " << output.metadata.size()
164 20754 : << " and data size " << output.data.size();
165 20754 : return output;
166 20766 : }
167 :
168 : private:
169 : zmq::socket_t m_socket;
170 : std::string m_connection_string;
171 : bool m_socket_connected{ false };
172 : CallbackAdapter m_callback_adapter;
173 : };
174 : } // namespace dunedaq::ipm
175 :
176 26 : DEFINE_DUNE_IPM_RECEIVER(dunedaq::ipm::ZmqReceiver)
|