Line data Source code
1 : /**
2 : *
3 : * @file ZmqReceiver.cpp ZmqReceiver messaging class definitions
4 : *
5 : * This is part of the DUNE DAQ Application Framework, copyright 2020.
6 : * Licensing/copyright details are in the COPYING file that you should have
7 : * received with this code.
8 : */
9 :
10 : #include "CallbackAdapter.hpp"
11 : #include "ipm/Receiver.hpp"
12 : #include "ipm/ZmqContext.hpp"
13 :
14 : #include "logging/Logging.hpp"
15 : #include "utilities/ZmqUri.hpp"
16 :
17 : #include <string>
18 : #include <vector>
19 :
20 : namespace dunedaq::ipm {
21 :
22 : class ZmqReceiver : public Receiver
23 : {
24 : public:
25 26 : ZmqReceiver()
26 26 : : m_socket(ZmqContext::instance().GetContext(), zmq::socket_type::pull)
27 : {
28 26 : }
29 :
30 52 : ~ZmqReceiver()
31 26 : {
32 26 : unregister_callback();
33 : // Probably (cpp)zmq does this in the socket dtor anyway, but I guess it doesn't hurt to be explicit
34 26 : if (m_connection_info.connection_string != "" && m_socket_connected) {
35 24 : try {
36 24 : m_socket.unbind(m_connection_info.connection_string);
37 24 : m_socket_connected = false;
38 0 : } catch (zmq::error_t const& err) {
39 0 : ers::error(ZmqOperationError(ERS_HERE,
40 0 : m_connection_info.connection_name,
41 : "unbind",
42 : "receive",
43 0 : err.what(),
44 0 : m_connection_info.connection_string));
45 0 : }
46 : }
47 26 : m_socket.close();
48 52 : }
49 :
50 27 : std::string connect_for_receives(const ConnectionInfo& connection_info) override
51 : {
52 27 : m_connection_info = connection_info;
53 27 : try {
54 27 : m_socket.set(zmq::sockopt::rcvtimeo, 0); // Return immediately if we can't receive
55 0 : } catch (zmq::error_t const& err) {
56 0 : throw ZmqOperationError(ERS_HERE, m_connection_info.connection_name,
57 : "set timeout",
58 : "receive",
59 0 : err.what(),
60 0 : m_connection_info.connection_string);
61 0 : }
62 :
63 27 : try {
64 27 : m_socket.set(zmq::sockopt::linger, 0); // Close connection immediately when close is called
65 0 : } catch (zmq::error_t const& err) {
66 0 : throw ZmqOperationError(ERS_HERE,
67 0 : m_connection_info.connection_name,
68 : "set linger",
69 : "receive",
70 0 : err.what(),
71 0 : m_connection_info.connection_string);
72 0 : }
73 :
74 27 : std::vector<std::string> resolved;
75 27 : try {
76 27 : utilities::ZmqUri this_uri(m_connection_info.connection_string);
77 26 : resolved = this_uri.get_uri_ip_addresses();
78 27 : } catch (utilities::InvalidUri const& err) {
79 1 : throw ZmqOperationError(ERS_HERE,
80 1 : m_connection_info.connection_name,
81 : "resolve connection_string",
82 : "receive",
83 : "An invalid URI was passed",
84 : m_connection_info.connection_string,
85 2 : err);
86 1 : }
87 :
88 26 : if (resolved.size() == 0) {
89 1 : throw ZmqOperationError(ERS_HERE,
90 1 : m_connection_info.connection_name,
91 : "resolve connection_string",
92 : "receive",
93 : "Unable to resolve connection_string",
94 2 : m_connection_info.connection_string);
95 : }
96 26 : for (auto& connection_string : resolved) {
97 25 : TLOG_DEBUG(TLVL_CONNECTIONSTRING) << m_connection_info.connection_name << ": Connection String is " << connection_string;
98 25 : try {
99 25 : m_socket.bind(connection_string);
100 24 : m_connection_info.connection_string = m_socket.get(zmq::sockopt::last_endpoint);
101 24 : m_socket_connected = true;
102 24 : break;
103 1 : } catch (zmq::error_t const& err) {
104 1 : ers::error(ZmqOperationError(
105 2 : ERS_HERE, m_connection_info.connection_name, "bind", "receive", err.what(), connection_string));
106 1 : }
107 : }
108 25 : if (!m_socket_connected) {
109 1 : throw ZmqOperationError(ERS_HERE,
110 1 : m_connection_info.connection_name,
111 : "bind",
112 : "receive",
113 : "Bind failed for all resolved connection strings",
114 2 : "");
115 : }
116 :
117 24 : m_callback_adapter.set_receiver(this);
118 :
119 48 : return m_connection_info.connection_string;
120 27 : }
121 :
122 20767 : bool can_receive() const noexcept override { return m_socket_connected; }
123 :
124 1 : void register_callback(std::function<void(Response&)> callback) override
125 : {
126 1 : m_callback_adapter.set_callback(callback);
127 1 : }
128 27 : void unregister_callback() override { m_callback_adapter.clear_callback(); }
129 :
130 4 : bool data_pending() override
131 : {
132 4 : try {
133 4 : auto events = m_socket.get(zmq::sockopt::events);
134 4 : return (events & ZMQ_POLLIN) != 0;
135 0 : } catch (zmq::error_t const& err) {
136 0 : ers::error(ZmqOperationError(ERS_HERE, m_connection_info.connection_name, "get events sockopt", "data_pending", err.what(), m_connection_info.connection_string));
137 0 : }
138 0 : return false;
139 : }
140 :
141 : protected:
142 20757 : Receiver::Response receive_(const duration_t& timeout, bool no_tmoexcept_mode) override
143 : {
144 20757 : Receiver::Response output;
145 20757 : zmq::message_t hdr;
146 20757 : zmq::message_t msg;
147 20757 : zmq::recv_result_t res{};
148 :
149 20757 : auto start_time = std::chrono::steady_clock::now();
150 34623 : do {
151 :
152 34623 : try {
153 34623 : TLOG_DEBUG(TLVL_ZMQRECEIVER_RECV_HDR) << m_connection_info.connection_name << ": Going to receive header";
154 34623 : res = m_socket.recv(hdr);
155 34623 : TLOG_DEBUG(TLVL_ZMQRECEIVER_RECV_HDR_2) << m_connection_info.connection_name << ": Recv res=" << res.value_or(0)
156 34623 : << " for header (hdr.size() == " << hdr.size() << ")";
157 0 : } catch (zmq::error_t const& err) {
158 0 : throw ZmqReceiveError(ERS_HERE, m_connection_info.connection_name, err.what(), "header");
159 0 : }
160 34623 : if (res || hdr.more()) {
161 20026 : TLOG_DEBUG(TLVL_ZMQRECEIVER_RECV_DATA) << m_connection_info.connection_name << ": Going to receive data";
162 20026 : output.metadata.resize(hdr.size());
163 20026 : memcpy(&output.metadata[0], hdr.data(), hdr.size());
164 :
165 : // ZMQ guarantees that the entire message has arrived
166 :
167 20026 : try {
168 20026 : res = m_socket.recv(msg);
169 0 : } catch (zmq::error_t const& err) {
170 0 : throw ZmqReceiveError(ERS_HERE, m_connection_info.connection_name, err.what(), "data");
171 0 : }
172 20026 : TLOG_DEBUG(TLVL_ZMQRECEIVER_RECV_DATA_2)
173 0 : << m_connection_info.connection_name << ": Recv res=" << res.value_or(0)
174 20026 : << " for data (msg.size() == " << msg.size() << ")";
175 20026 : output.data.resize(msg.size());
176 20026 : memcpy(&output.data[0], msg.data(), msg.size());
177 14597 : } else if (timeout > duration_t::zero()) {
178 14590 : usleep(1000);
179 : }
180 34623 : } while (std::chrono::duration_cast<duration_t>(std::chrono::steady_clock::now() - start_time) < timeout &&
181 54647 : res.value_or(0) == 0);
182 :
183 20757 : if (res.value_or(0) == 0 && !no_tmoexcept_mode) {
184 6 : throw ReceiveTimeoutExpired(ERS_HERE, m_connection_info.connection_name, timeout.count());
185 : }
186 :
187 20751 : TLOG_DEBUG(TLVL_ZMQRECEIVER_RECV_END)
188 0 : << m_connection_info.connection_name << ": Returning output with metadata size "
189 0 : << output.metadata.size()
190 20751 : << " and data size " << output.data.size();
191 20751 : return output;
192 20769 : }
193 :
194 : private:
195 : zmq::socket_t m_socket;
196 : bool m_socket_connected{ false };
197 : CallbackAdapter m_callback_adapter;
198 : };
199 : } // namespace dunedaq::ipm
200 :
201 26 : DEFINE_DUNE_IPM_RECEIVER(dunedaq::ipm::ZmqReceiver)
|