Line data Source code
1 :
2 : /**
3 : *
4 : * @file ZmqSubscriber.cpp ZmqSubscriber messaging class definitions
5 : *
6 : * This is part of the DUNE DAQ Application Framework, copyright 2020.
7 : * Licensing/copyright details are in the COPYING file that you should have
8 : * received with this code.
9 : */
10 :
11 : #include "CallbackAdapter.hpp"
12 : #include "ipm/Subscriber.hpp"
13 : #include "ipm/ZmqContext.hpp"
14 :
15 : #include "logging/Logging.hpp"
16 :
17 : #include <set>
18 : #include <string>
19 : #include <vector>
20 :
21 : namespace dunedaq::ipm {
22 :
23 : class ZmqSubscriber : public Subscriber
24 : {
25 : public:
26 13 : ZmqSubscriber()
27 13 : : m_socket(ZmqContext::instance().GetContext(), zmq::socket_type::sub)
28 : {
29 13 : }
30 :
31 26 : ~ZmqSubscriber()
32 13 : {
33 13 : unregister_callback();
34 : // Probably (cpp)zmq does this in the socket dtor anyway, but I guess it doesn't hurt to be explicit
35 13 : if (!m_connection_strings.empty() && m_socket_connected) {
36 10 : m_socket_connected = false;
37 25 : for (auto& conn_string : m_connection_strings) {
38 15 : try {
39 15 : m_socket.disconnect(conn_string);
40 0 : } catch (zmq::error_t const& err) {
41 0 : ers::error(ZmqOperationError(ERS_HERE, "disconnect", "receive", err.what(), conn_string));
42 0 : }
43 : }
44 : }
45 13 : m_socket.close();
46 26 : }
47 :
48 13 : std::string connect_for_receives(const nlohmann::json& connection_info) override
49 : {
50 13 : std::set<std::string> new_connection_strings;
51 13 : if (connection_info.contains("connection_string")) {
52 5 : if (m_connection_strings.count(connection_info.value<std::string>("connection_string", "")) == 0)
53 5 : new_connection_strings.insert(connection_info.value<std::string>("connection_string", ""));
54 : }
55 :
56 26 : for (auto& conn_string : connection_info.value<std::vector<std::string>>("connection_strings", {})) {
57 13 : if (m_connection_strings.count(conn_string) == 0)
58 13 : new_connection_strings.insert(conn_string);
59 13 : }
60 :
61 13 : if (m_connection_strings.size() == 0 && new_connection_strings.size() == 0) {
62 0 : throw ZmqOperationError(ERS_HERE, "resolve connections", "receive", "No valid connection strings passed", "");
63 : }
64 :
65 13 : if (!m_socket_connected) {
66 11 : try {
67 11 : m_socket.set(zmq::sockopt::rcvtimeo, 0); // Return immediately if we can't receive
68 0 : } catch (zmq::error_t const& err) {
69 0 : throw ZmqOperationError(ERS_HERE, "set timeout", "receive", err.what(), *m_connection_strings.begin());
70 0 : }
71 : }
72 31 : for (auto& conn_string : new_connection_strings) {
73 18 : try {
74 18 : TLOG_DEBUG(TLVL_CONNECTIONSTRING) << "Connection String is " << conn_string;
75 18 : m_socket.connect(conn_string);
76 15 : m_connection_strings.insert(conn_string);
77 3 : } catch (zmq::error_t const& err) {
78 3 : ers::error(ZmqOperationError(ERS_HERE, "connect", "receive", err.what(), conn_string));
79 3 : }
80 : }
81 13 : m_socket_connected = true;
82 13 : m_callback_adapter.set_receiver(this);
83 :
84 13 : if (m_connection_strings.size() > 0) {
85 10 : return *m_connection_strings.begin();
86 : }
87 3 : return {};
88 13 : }
89 :
90 62 : bool can_receive() const noexcept override { return m_socket_connected; }
91 :
92 12 : void subscribe(std::string const& topic) override
93 : {
94 12 : try {
95 12 : m_socket.set(zmq::sockopt::subscribe, topic);
96 0 : } catch (zmq::error_t const& err) {
97 0 : throw ZmqSubscribeError(ERS_HERE, err.what(), topic);
98 0 : }
99 12 : }
100 2 : void unsubscribe(std::string const& topic) override
101 : {
102 2 : try {
103 2 : m_socket.set(zmq::sockopt::unsubscribe, topic);
104 0 : } catch (zmq::error_t const& err) {
105 0 : throw ZmqUnsubscribeError(ERS_HERE, err.what(), topic);
106 0 : }
107 2 : }
108 :
109 1 : void register_callback(std::function<void(Response&)> callback) override
110 : {
111 1 : m_callback_adapter.set_callback(callback);
112 1 : }
113 14 : void unregister_callback() override { m_callback_adapter.clear_callback(); }
114 :
115 : protected:
116 53 : Receiver::Response receive_(const duration_t& timeout, bool no_tmoexcept_mode) override
117 : {
118 53 : Receiver::Response output;
119 53 : zmq::message_t hdr;
120 53 : zmq::message_t msg;
121 53 : zmq::recv_result_t res{};
122 :
123 53 : auto start_time = std::chrono::steady_clock::now();
124 2164 : do {
125 :
126 2164 : try {
127 2164 : TLOG_DEBUG(TLVL_ZMQSUBSCRIBER_RECV_HDR) << "Subscriber: Going to receive header";
128 2164 : res = m_socket.recv(hdr);
129 2164 : TLOG_DEBUG(TLVL_ZMQSUBSCRIBER_RECV_HDR_2)
130 2164 : << "Subscriber: Recv res=" << res.value_or(0) << " for header (hdr.size() == " << hdr.size() << ")";
131 0 : } catch (zmq::error_t const& err) {
132 0 : throw ZmqReceiveError(ERS_HERE, err.what(), "header");
133 0 : }
134 2164 : if (res || hdr.more()) {
135 16 : TLOG_DEBUG(TLVL_ZMQSUBSCRIBER_RECV_DATA) << "Subscriber: Going to receive data";
136 16 : output.metadata.resize(hdr.size());
137 16 : memcpy(&output.metadata[0], hdr.data(), hdr.size());
138 :
139 : // ZMQ guarantees that the entire message has arrived
140 :
141 16 : try {
142 16 : res = m_socket.recv(msg);
143 0 : } catch (zmq::error_t const& err) {
144 0 : throw ZmqReceiveError(ERS_HERE, err.what(), "data");
145 0 : }
146 16 : TLOG_DEBUG(TLVL_ZMQSUBSCRIBER_RECV_DATA_2)
147 16 : << "Subscriber: Recv res=" << res.value_or(0) << " for data (msg.size() == " << msg.size() << ")";
148 16 : output.data.resize(msg.size());
149 16 : memcpy(&output.data[0], msg.data(), msg.size());
150 2148 : } else if (timeout > duration_t::zero()) {
151 2127 : usleep(1000);
152 : }
153 2164 : } while (std::chrono::duration_cast<duration_t>(std::chrono::steady_clock::now() - start_time) < timeout &&
154 2179 : res.value_or(0) == 0);
155 :
156 53 : if (res.value_or(0) == 0 && !no_tmoexcept_mode) {
157 37 : throw ReceiveTimeoutExpired(ERS_HERE, timeout.count());
158 : }
159 :
160 16 : TLOG_DEBUG(TLVL_ZMQSUBSCRIBER_RECV_END) << "Subscriber: Returning output with metadata size "
161 16 : << output.metadata.size() << " and data size " << output.data.size();
162 16 : return output;
163 127 : }
164 :
165 : private:
166 : zmq::socket_t m_socket;
167 : std::set<std::string> m_connection_strings{};
168 : bool m_socket_connected{ false };
169 : CallbackAdapter m_callback_adapter;
170 : };
171 : } // namespace dunedaq::ipm
172 :
173 13 : DEFINE_DUNE_IPM_RECEIVER(dunedaq::ipm::ZmqSubscriber)
|