Line data Source code
1 : /**
2 : *
3 : * @file ZmqPublisher.cpp ZmqPublisher messaging class definitions
4 : *
5 : * This is part of the DUNE DAQ Application Framework, copyright 2020.
6 : * Licensing/copyright details are in the COPYING file that you should have
7 : * received with this code.
8 : */
9 :
10 : #include "ipm/Sender.hpp"
11 : #include "ipm/ZmqContext.hpp"
12 :
13 : #include "logging/Logging.hpp"
14 : #include "utilities/ZmqUri.hpp"
15 : #include "zmq.hpp"
16 :
17 : #include <string>
18 : #include <vector>
19 :
20 : namespace dunedaq::ipm {
21 : class ZmqPublisher : public Sender
22 : {
23 : public:
24 12 : ZmqPublisher()
25 12 : : m_socket(ZmqContext::instance().GetContext(), zmq::socket_type::pub)
26 : {
27 12 : }
28 :
29 24 : ~ZmqPublisher()
30 12 : {
31 : // Probably (cpp)zmq does this in the socket dtor anyway, but I guess it doesn't hurt to be explicit
32 12 : if (m_connection_string != "" && m_socket_connected) {
33 10 : try {
34 20 : TLOG_DEBUG(TLVL_ZMQPUBLISHER_DESTRUCTOR) << "Setting socket HWM to zero";
35 10 : m_socket.set(zmq::sockopt::sndhwm, 1);
36 :
37 20 : TLOG_DEBUG(TLVL_ZMQPUBLISHER_DESTRUCTOR) << "Waiting up to 10s for socket to become writable before disconnecting";
38 10 : auto start_time = std::chrono::steady_clock::now();
39 20 : while (std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - start_time).count() < 10000) {
40 10 : auto events = m_socket.get(zmq::sockopt::events);
41 10 : if ((events & ZMQ_POLLOUT) != 0) {
42 : break;
43 : }
44 0 : usleep(1000);
45 : }
46 20 : TLOG_DEBUG(TLVL_ZMQPUBLISHER_DESTRUCTOR) << "Disconnecting socket from " << m_connection_string;
47 :
48 10 : m_socket.unbind(m_connection_string);
49 10 : m_socket_connected = false;
50 0 : } catch (zmq::error_t const& err) {
51 0 : ers::error(ZmqOperationError(ERS_HERE, "unbind", "send", err.what(), m_connection_string));
52 0 : }
53 : }
54 12 : m_socket.close();
55 24 : }
56 :
57 29 : bool can_send() const noexcept override { return m_socket_connected; }
58 13 : std::string connect_for_sends(const nlohmann::json& connection_info) override
59 : {
60 13 : try {
61 13 : m_socket.set(zmq::sockopt::sndtimeo, 0); // Return immediately if we can't send
62 0 : } catch (zmq::error_t const& err) {
63 0 : throw ZmqOperationError(ERS_HERE,
64 : "set timeout",
65 : "send",
66 0 : err.what(),
67 0 : connection_info.value<std::string>("connection_string", "inproc://default"));
68 0 : }
69 :
70 13 : auto hwm = connection_info.value<int>("capacity", 0);
71 13 : if (hwm > 0) {
72 0 : try {
73 0 : m_socket.set(zmq::sockopt::sndhwm, hwm);
74 0 : } catch (zmq::error_t const& err) {
75 0 : throw ZmqOperationError(ERS_HERE,
76 : "set hwm",
77 : "send",
78 0 : err.what(),
79 0 : connection_info.value<std::string>("connection_string", "inproc://default"));
80 0 : }
81 : }
82 :
83 13 : std::vector<std::string> resolved;
84 13 : try {
85 14 : utilities::ZmqUri this_uri(connection_info.value<std::string>("connection_string", "inproc://default"));
86 12 : resolved = this_uri.get_uri_ip_addresses();
87 13 : } catch (utilities::InvalidUri const& err) {
88 1 : throw ZmqOperationError(ERS_HERE,
89 : "resolve connection_string",
90 : "send",
91 : "An invalid URI was passed",
92 2 : connection_info.value<std::string>("connection_string", "inproc://default"),
93 4 : err);
94 1 : }
95 :
96 12 : if (resolved.size() == 0) {
97 1 : throw ZmqOperationError(ERS_HERE,
98 : "resolve connection_string",
99 : "send",
100 : "Unable to resolve connection_string",
101 2 : connection_info.value<std::string>("connection_string", "inproc://default"));
102 : }
103 12 : for (auto& connection_string : resolved) {
104 11 : TLOG_DEBUG(TLVL_CONNECTIONSTRING) << "Connection String is " << connection_string;
105 11 : try {
106 11 : m_socket.bind(connection_string);
107 10 : m_connection_string = m_socket.get(zmq::sockopt::last_endpoint);
108 10 : m_socket_connected = true;
109 10 : break;
110 1 : } catch (zmq::error_t const& err) {
111 1 : ers::error(ZmqOperationError(ERS_HERE, "bind", "send", err.what(), connection_string));
112 1 : }
113 : }
114 11 : if (!m_socket_connected) {
115 1 : throw ZmqOperationError(ERS_HERE, "bind", "send", "Bind failed for all resolved connection strings", "");
116 : }
117 :
118 20 : return m_connection_string;
119 13 : }
120 :
121 : protected:
122 20 : bool send_(const void* message,
123 : int N,
124 : const duration_t& timeout,
125 : std::string const& topic,
126 : bool no_tmoexcept_mode) override
127 : {
128 20 : TLOG_DEBUG(TLVL_ZMQPUBLISHER_SEND_START)
129 20 : << "Endpoint " << m_connection_string << ": Starting send of " << N << " bytes";
130 20 : auto start_time = std::chrono::steady_clock::now();
131 20 : zmq::send_result_t res{};
132 20 : do {
133 :
134 20 : zmq::message_t topic_msg(topic.c_str(), topic.size());
135 20 : try {
136 20 : res = m_socket.send(topic_msg, zmq::send_flags::sndmore);
137 0 : } catch (zmq::error_t const& err) {
138 0 : throw ZmqSendError(ERS_HERE, err.what(), topic.size(), topic);
139 0 : }
140 :
141 20 : if (!res || res != topic.size()) {
142 0 : TLOG_DEBUG(TLVL_ZMQPUBLISHER_SEND_ERR) << "Endpoint " << m_connection_string << ": Unable to send message";
143 0 : continue;
144 0 : }
145 :
146 20 : zmq::message_t msg(message, N);
147 20 : try {
148 20 : res = m_socket.send(msg, zmq::send_flags::none);
149 0 : } catch (zmq::error_t const& err) {
150 0 : throw ZmqSendError(ERS_HERE, err.what(), N, topic);
151 0 : }
152 :
153 20 : if (!res && timeout > duration_t::zero()) {
154 0 : usleep(1000);
155 : }
156 60 : } while (std::chrono::duration_cast<duration_t>(std::chrono::steady_clock::now() - start_time) < timeout && !res);
157 :
158 20 : if (!res && !no_tmoexcept_mode) {
159 0 : throw SendTimeoutExpired(ERS_HERE, timeout.count());
160 : }
161 :
162 20 : TLOG_DEBUG(TLVL_ZMQPUBLISHER_SEND_END)
163 20 : << "Endpoint " << m_connection_string << ": Completed send of " << N << " bytes";
164 20 : return res && res == N;
165 : }
166 :
167 : private:
168 : zmq::socket_t m_socket;
169 : std::string m_connection_string;
170 : bool m_socket_connected{ false };
171 : };
172 :
173 : } // namespace dunedaq::ipm
174 :
175 12 : DEFINE_DUNE_IPM_SENDER(dunedaq::ipm::ZmqPublisher)
|