Line data Source code
1 :
2 : /**
3 : *
4 : * @file ZmqSubscriber.cpp ZmqSubscriber messaging class definitions
5 : *
6 : * This is part of the DUNE DAQ Application Framework, copyright 2020.
7 : * Licensing/copyright details are in the COPYING file that you should have
8 : * received with this code.
9 : */
10 :
11 : #include "CallbackAdapter.hpp"
12 : #include "ipm/Subscriber.hpp"
13 : #include "ipm/ZmqContext.hpp"
14 :
15 : #include "logging/Logging.hpp"
16 :
17 : #include <set>
18 : #include <string>
19 : #include <vector>
20 :
21 : namespace dunedaq::ipm {
22 :
23 : class ZmqSubscriber : public Subscriber
24 : {
25 : public:
26 13 : ZmqSubscriber()
27 13 : : m_socket(ZmqContext::instance().GetContext(), zmq::socket_type::sub)
28 : {
29 13 : }
30 :
31 26 : ~ZmqSubscriber()
32 13 : {
33 13 : unregister_callback();
34 : // Probably (cpp)zmq does this in the socket dtor anyway, but I guess it doesn't hurt to be explicit
35 13 : if (!m_connection_strings.empty() && m_socket_connected) {
36 10 : m_socket_connected = false;
37 25 : for (auto& conn_string : m_connection_strings) {
38 15 : try {
39 15 : m_socket.disconnect(conn_string);
40 0 : } catch (zmq::error_t const& err) {
41 0 : ers::error(ZmqOperationError(
42 0 : ERS_HERE, m_connection_info.connection_name, "disconnect", "receive", err.what(), conn_string));
43 0 : }
44 : }
45 : }
46 13 : m_socket.close();
47 26 : }
48 :
49 13 : std::string connect_for_receives(const ConnectionInfo& connection_info) override
50 : {
51 13 : m_connection_info = connection_info;
52 13 : std::set<std::string> new_connection_strings;
53 13 : if (connection_info.connection_string != "") {
54 5 : new_connection_strings.insert(connection_info.connection_string);
55 : }
56 :
57 26 : for (auto& conn_string : connection_info.connection_strings) {
58 13 : if (m_connection_strings.count(conn_string) == 0)
59 13 : new_connection_strings.insert(conn_string);
60 : }
61 :
62 13 : if (m_connection_strings.size() == 0 && new_connection_strings.size() == 0) {
63 0 : throw ZmqOperationError(ERS_HERE,
64 0 : m_connection_info.connection_name,
65 : "resolve connections",
66 : "receive",
67 : "No valid connection strings passed",
68 0 : "");
69 : }
70 :
71 13 : if (!m_socket_connected) {
72 11 : try {
73 11 : m_socket.set(zmq::sockopt::rcvtimeo, 0); // Return immediately if we can't receive
74 0 : } catch (zmq::error_t const& err) {
75 0 : throw ZmqOperationError(ERS_HERE,
76 0 : m_connection_info.connection_name,
77 : "set timeout",
78 : "receive",
79 0 : err.what(),
80 0 : *m_connection_strings.begin());
81 0 : }
82 : }
83 31 : for (auto& conn_string : new_connection_strings) {
84 18 : try {
85 18 : TLOG_DEBUG(TLVL_CONNECTIONSTRING)
86 18 : << m_connection_info.connection_name << ": Connection String is " << conn_string;
87 18 : m_socket.connect(conn_string);
88 15 : m_connection_strings.insert(conn_string);
89 3 : } catch (zmq::error_t const& err) {
90 3 : ers::error(ZmqOperationError(
91 6 : ERS_HERE, m_connection_info.connection_name, "connect", "receive", err.what(), conn_string));
92 3 : }
93 : }
94 13 : m_socket_connected = true;
95 13 : m_callback_adapter.set_receiver(this);
96 :
97 13 : if (m_connection_strings.size() > 0) {
98 10 : return *m_connection_strings.begin();
99 : }
100 3 : return {};
101 13 : }
102 :
103 63 : bool can_receive() const noexcept override { return m_socket_connected; }
104 :
105 12 : void subscribe(std::string const& topic) override
106 : {
107 12 : try {
108 12 : m_socket.set(zmq::sockopt::subscribe, topic);
109 0 : } catch (zmq::error_t const& err) {
110 0 : throw ZmqSubscribeError(ERS_HERE, m_connection_info.connection_name, err.what(), topic);
111 0 : }
112 12 : }
113 2 : void unsubscribe(std::string const& topic) override
114 : {
115 2 : try {
116 2 : m_socket.set(zmq::sockopt::unsubscribe, topic);
117 0 : } catch (zmq::error_t const& err) {
118 0 : throw ZmqUnsubscribeError(ERS_HERE, m_connection_info.connection_name, err.what(), topic);
119 0 : }
120 2 : }
121 :
122 6 : bool data_pending() override
123 : {
124 6 : try {
125 6 : auto events = m_socket.get(zmq::sockopt::events);
126 6 : return (events & ZMQ_POLLIN) != 0;
127 0 : } catch (zmq::error_t const& err) {
128 0 : ers::error(
129 0 : ZmqOperationError(ERS_HERE, m_connection_info.connection_name, "get events sockopt", "data_pending", err.what(), *m_connection_strings.begin()));
130 0 : }
131 0 : return false;
132 : }
133 :
134 1 : void register_callback(std::function<void(Response&)> callback) override
135 : {
136 1 : m_callback_adapter.set_callback(callback);
137 1 : }
138 14 : void unregister_callback() override { m_callback_adapter.clear_callback(); }
139 :
140 : protected:
141 54 : Receiver::Response receive_(const duration_t& timeout, bool no_tmoexcept_mode) override
142 : {
143 54 : Receiver::Response output;
144 54 : zmq::message_t hdr;
145 54 : zmq::message_t msg;
146 54 : zmq::recv_result_t res{};
147 :
148 54 : auto start_time = std::chrono::steady_clock::now();
149 2159 : do {
150 :
151 2159 : try {
152 2159 : TLOG_DEBUG(TLVL_ZMQSUBSCRIBER_RECV_HDR) << m_connection_info.connection_name << ": Going to receive header";
153 2159 : res = m_socket.recv(hdr);
154 2159 : TLOG_DEBUG(TLVL_ZMQSUBSCRIBER_RECV_HDR_2)
155 0 : << m_connection_info.connection_name << ": Recv res=" << res.value_or(0)
156 2159 : << " for header (hdr.size() == " << hdr.size() << ")";
157 0 : } catch (zmq::error_t const& err) {
158 0 : throw ZmqReceiveError(ERS_HERE, m_connection_info.connection_name, err.what(), "header");
159 0 : }
160 2159 : if (res || hdr.more()) {
161 16 : TLOG_DEBUG(TLVL_ZMQSUBSCRIBER_RECV_DATA) << m_connection_info.connection_name << ": Going to receive data";
162 16 : output.metadata.resize(hdr.size());
163 16 : memcpy(&output.metadata[0], hdr.data(), hdr.size());
164 :
165 : // ZMQ guarantees that the entire message has arrived
166 :
167 16 : try {
168 16 : res = m_socket.recv(msg);
169 0 : } catch (zmq::error_t const& err) {
170 0 : throw ZmqReceiveError(ERS_HERE, m_connection_info.connection_name, err.what(), "data");
171 0 : }
172 16 : TLOG_DEBUG(TLVL_ZMQSUBSCRIBER_RECV_DATA_2)
173 0 : << m_connection_info.connection_name << ": Recv res=" << res.value_or(0)
174 16 : << " for data (msg.size() == " << msg.size() << ")";
175 16 : output.data.resize(msg.size());
176 16 : memcpy(&output.data[0], msg.data(), msg.size());
177 2143 : } else if (timeout > duration_t::zero()) {
178 2121 : usleep(1000);
179 : }
180 2159 : } while (std::chrono::duration_cast<duration_t>(std::chrono::steady_clock::now() - start_time) < timeout &&
181 2174 : res.value_or(0) == 0);
182 :
183 54 : if (res.value_or(0) == 0 && !no_tmoexcept_mode) {
184 38 : throw ReceiveTimeoutExpired(ERS_HERE, m_connection_info.connection_name, timeout.count());
185 : }
186 :
187 16 : TLOG_DEBUG(TLVL_ZMQSUBSCRIBER_RECV_END)
188 0 : << m_connection_info.connection_name << ": Returning output with metadata size " << output.metadata.size()
189 16 : << " and data size " << output.data.size();
190 16 : return output;
191 130 : }
192 :
193 : private:
194 : zmq::socket_t m_socket;
195 : std::set<std::string> m_connection_strings{};
196 : bool m_socket_connected{ false };
197 : CallbackAdapter m_callback_adapter;
198 : };
199 : } // namespace dunedaq::ipm
200 :
201 13 : DEFINE_DUNE_IPM_RECEIVER(dunedaq::ipm::ZmqSubscriber)
|