Line data Source code
1 : /**
2 : *
3 : * @file ZmqPublisher.cpp ZmqPublisher messaging class definitions
4 : *
5 : * This is part of the DUNE DAQ Application Framework, copyright 2020.
6 : * Licensing/copyright details are in the COPYING file that you should have
7 : * received with this code.
8 : */
9 :
10 : #include "ipm/Sender.hpp"
11 : #include "ipm/ZmqContext.hpp"
12 :
13 : #include "logging/Logging.hpp"
14 : #include "utilities/ZmqUri.hpp"
15 : #include "zmq.hpp"
16 :
17 : #include <string>
18 : #include <vector>
19 :
20 : namespace dunedaq::ipm {
21 : class ZmqPublisher : public Sender
22 : {
23 : public:
24 12 : ZmqPublisher()
25 12 : : m_socket(ZmqContext::instance().GetContext(), zmq::socket_type::pub)
26 : {
27 12 : }
28 :
29 24 : ~ZmqPublisher()
30 12 : {
31 : // Probably (cpp)zmq does this in the socket dtor anyway, but I guess it doesn't hurt to be explicit
32 12 : if (m_connection_info.connection_string != "" && m_socket_connected) {
33 10 : try {
34 20 : TLOG_DEBUG(TLVL_ZMQPUBLISHER_DESTRUCTOR) << m_connection_info.connection_name << ": Setting socket HWM to zero";
35 10 : m_socket.set(zmq::sockopt::sndhwm, 1);
36 :
37 20 : TLOG_DEBUG(TLVL_ZMQPUBLISHER_DESTRUCTOR)
38 0 : << m_connection_info.connection_name
39 10 : << ": Waiting up to 10s for socket to become writable before disconnecting";
40 10 : auto start_time = std::chrono::steady_clock::now();
41 10 : while (
42 10 : std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - start_time).count() <
43 : 10000) {
44 10 : auto events = m_socket.get(zmq::sockopt::events);
45 10 : if ((events & ZMQ_POLLOUT) != 0) {
46 : break;
47 : }
48 0 : usleep(1000);
49 : }
50 20 : TLOG_DEBUG(TLVL_ZMQPUBLISHER_DESTRUCTOR)
51 10 : << m_connection_info.connection_name << ": Disconnecting socket from " << m_connection_info.connection_string;
52 :
53 10 : m_socket.unbind(m_connection_info.connection_string);
54 10 : m_socket_connected = false;
55 0 : } catch (zmq::error_t const& err) {
56 0 : ers::error(ZmqOperationError(ERS_HERE,
57 0 : m_connection_info.connection_name,
58 : "unbind",
59 : "send",
60 0 : err.what(),
61 0 : m_connection_info.connection_string));
62 0 : }
63 : }
64 12 : m_socket.close();
65 24 : }
66 :
67 29 : bool can_send() const noexcept override { return m_socket_connected; }
68 13 : std::string connect_for_sends(const ConnectionInfo& connection_info) override
69 : {
70 13 : m_connection_info = connection_info;
71 13 : try {
72 13 : m_socket.set(zmq::sockopt::sndtimeo, 0); // Return immediately if we can't send
73 0 : } catch (zmq::error_t const& err) {
74 0 : throw ZmqOperationError(ERS_HERE,
75 0 : m_connection_info.connection_name,
76 : "set timeout",
77 : "send",
78 0 : err.what(),
79 0 : m_connection_info.connection_string);
80 0 : }
81 :
82 13 : auto hwm = connection_info.capacity;
83 13 : if (hwm > 0) {
84 0 : try {
85 0 : m_socket.set(zmq::sockopt::sndhwm, hwm);
86 0 : } catch (zmq::error_t const& err) {
87 0 : throw ZmqOperationError(ERS_HERE,
88 0 : m_connection_info.connection_name,
89 : "set hwm",
90 : "send",
91 0 : err.what(),
92 0 : m_connection_info.connection_string);
93 0 : }
94 : }
95 :
96 13 : std::vector<std::string> resolved;
97 13 : try {
98 13 : utilities::ZmqUri this_uri(m_connection_info.connection_string);
99 12 : resolved = this_uri.get_uri_ip_addresses();
100 13 : } catch (utilities::InvalidUri const& err) {
101 1 : throw ZmqOperationError(ERS_HERE,
102 1 : m_connection_info.connection_name,
103 : "resolve connection_string",
104 : "send",
105 : "An invalid URI was passed",
106 : m_connection_info.connection_string,
107 2 : err);
108 1 : }
109 :
110 12 : if (resolved.size() == 0) {
111 1 : throw ZmqOperationError(ERS_HERE,
112 1 : m_connection_info.connection_name,
113 : "resolve connection_string",
114 : "send",
115 : "Unable to resolve connection_string",
116 2 : m_connection_info.connection_string);
117 : }
118 12 : for (auto& connection_string : resolved) {
119 11 : TLOG_DEBUG(TLVL_CONNECTIONSTRING) << m_connection_info.connection_name << ": Connection String is "
120 11 : << connection_string;
121 11 : try {
122 11 : m_socket.bind(connection_string);
123 10 : m_connection_info.connection_string = m_socket.get(zmq::sockopt::last_endpoint);
124 10 : m_socket_connected = true;
125 10 : break;
126 1 : } catch (zmq::error_t const& err) {
127 1 : ers::error(ZmqOperationError(
128 2 : ERS_HERE, m_connection_info.connection_name, "bind", "send", err.what(), connection_string));
129 1 : }
130 : }
131 11 : if (!m_socket_connected) {
132 1 : throw ZmqOperationError(ERS_HERE,
133 1 : m_connection_info.connection_name,
134 : "bind",
135 : "send",
136 : "Bind failed for all resolved connection strings",
137 2 : "");
138 : }
139 :
140 20 : return m_connection_info.connection_string;
141 13 : }
142 :
143 : protected:
144 20 : bool send_(const void* message,
145 : int N,
146 : const duration_t& timeout,
147 : std::string const& topic,
148 : bool no_tmoexcept_mode) override
149 : {
150 20 : TLOG_DEBUG(TLVL_ZMQPUBLISHER_SEND_START)
151 20 : << m_connection_info.connection_name << ": Starting send of " << N << " bytes";
152 20 : auto start_time = std::chrono::steady_clock::now();
153 20 : zmq::send_result_t res{};
154 20 : do {
155 :
156 20 : zmq::message_t topic_msg(topic.c_str(), topic.size());
157 20 : try {
158 20 : res = m_socket.send(topic_msg, zmq::send_flags::sndmore);
159 0 : } catch (zmq::error_t const& err) {
160 0 : throw ZmqSendError(ERS_HERE, m_connection_info.connection_name, err.what(), topic.size(), topic);
161 0 : }
162 :
163 20 : if (!res || res != topic.size()) {
164 0 : TLOG_DEBUG(TLVL_ZMQPUBLISHER_SEND_ERR) << m_connection_info.connection_name << ": Unable to send message";
165 0 : continue;
166 0 : }
167 :
168 20 : zmq::message_t msg(message, N);
169 20 : try {
170 20 : res = m_socket.send(msg, zmq::send_flags::none);
171 0 : } catch (zmq::error_t const& err) {
172 0 : throw ZmqSendError(ERS_HERE, m_connection_info.connection_name, err.what(), N, topic);
173 0 : }
174 :
175 20 : if (!res && timeout > duration_t::zero()) {
176 0 : usleep(1000);
177 : }
178 60 : } while (std::chrono::duration_cast<duration_t>(std::chrono::steady_clock::now() - start_time) < timeout && !res);
179 :
180 20 : if (!res && !no_tmoexcept_mode) {
181 0 : throw SendTimeoutExpired(ERS_HERE, m_connection_info.connection_name, timeout.count());
182 : }
183 :
184 20 : TLOG_DEBUG(TLVL_ZMQPUBLISHER_SEND_END)
185 20 : << m_connection_info.connection_name << ": Completed send of " << N << " bytes";
186 20 : return res && res == N;
187 : }
188 :
189 : private:
190 : zmq::socket_t m_socket;
191 : bool m_socket_connected{ false };
192 : };
193 :
194 : } // namespace dunedaq::ipm
195 :
196 12 : DEFINE_DUNE_IPM_SENDER(dunedaq::ipm::ZmqPublisher)
|