Line data Source code
1 : /**
2 : *
3 : * @file ZmqSender.cpp ZmqSender messaging class definitions
4 : *
5 : * This is part of the DUNE DAQ Application Framework, copyright 2020.
6 : * Licensing/copyright details are in the COPYING file that you should have
7 : * received with this code.
8 : */
9 :
10 : #include "ipm/Sender.hpp"
11 : #include "ipm/ZmqContext.hpp"
12 :
13 : #include "logging/Logging.hpp"
14 : #include "zmq.hpp"
15 :
16 : #include <string>
17 : #include <vector>
18 :
19 : namespace dunedaq::ipm {
20 : class ZmqSender : public Sender
21 : {
22 : public:
23 26 : ZmqSender()
24 26 : : m_socket(ZmqContext::instance().GetContext(), zmq::socket_type::push)
25 : {
26 26 : }
27 :
28 52 : ~ZmqSender()
29 26 : {
30 : // Probably (cpp)zmq does this in the socket dtor anyway, but I guess it doesn't hurt to be explicit
31 26 : if (m_connection_info.connection_string != "" && m_socket_connected) {
32 24 : try {
33 48 : TLOG_DEBUG(TLVL_ZMQSENDER_DESTRUCTOR) << m_connection_info.connection_name << ": Setting socket HWM to zero";
34 24 : m_socket.set(zmq::sockopt::sndhwm, 1);
35 :
36 48 : TLOG_DEBUG(TLVL_ZMQSENDER_DESTRUCTOR) << m_connection_info.connection_name
37 24 : << ": Waiting up to 10s for socket to become writable before disconnecting";
38 24 : auto start_time = std::chrono::steady_clock::now();
39 24 : while (
40 103803 : std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - start_time).count() <
41 : 10000) {
42 103792 : auto events = m_socket.get(zmq::sockopt::events);
43 103792 : if ((events & ZMQ_POLLOUT) != 0) {
44 : break;
45 : }
46 103779 : usleep(1000);
47 : }
48 48 : TLOG_DEBUG(TLVL_ZMQSENDER_DESTRUCTOR)
49 24 : << m_connection_info.connection_name << ": Disconnecting socket from " << m_connection_info.connection_string;
50 :
51 :
52 24 : m_socket.disconnect(m_connection_info.connection_string);
53 24 : m_socket_connected = false;
54 0 : } catch (zmq::error_t const& err) {
55 0 : ers::error(ZmqOperationError(ERS_HERE,
56 0 : m_connection_info.connection_name,
57 : "disconnect",
58 : "send",
59 0 : err.what(),
60 0 : m_connection_info.connection_string));
61 0 : }
62 : }
63 26 : m_socket.close();
64 52 : }
65 :
66 20037 : bool can_send() const noexcept override { return m_socket_connected; }
67 27 : std::string connect_for_sends(const ConnectionInfo& connection_info) override
68 : {
69 27 : m_connection_info = connection_info;
70 27 : try {
71 27 : m_socket.set(zmq::sockopt::sndtimeo, 0); // Return immediately if we can't send
72 0 : } catch (zmq::error_t const& err) {
73 0 : throw ZmqOperationError(ERS_HERE,
74 0 : m_connection_info.connection_name,
75 : "set timeout",
76 : "send",
77 0 : err.what(),
78 0 : m_connection_info.connection_string);
79 0 : }
80 :
81 27 : auto hwm = connection_info.capacity;
82 27 : if (hwm > 0) {
83 0 : try {
84 0 : m_socket.set(zmq::sockopt::sndhwm, hwm);
85 0 : } catch (zmq::error_t const& err) {
86 0 : throw ZmqOperationError(ERS_HERE,
87 0 : m_connection_info.connection_name,
88 : "set hwm",
89 : "send",
90 0 : err.what(),
91 0 : m_connection_info.connection_string);
92 0 : }
93 : }
94 :
95 27 : auto connection_string = m_connection_info.connection_string;
96 :
97 27 : TLOG_DEBUG(TLVL_CONNECTIONSTRING) << "Connection String is " << connection_string;
98 27 : try {
99 27 : m_socket.set(zmq::sockopt::immediate, 1); // Don't queue messages to incomplete connections
100 0 : } catch (zmq::error_t const& err) {
101 0 : throw ZmqOperationError(
102 0 : ERS_HERE, m_connection_info.connection_name, "set immediate mode", "send", err.what(), connection_string);
103 0 : }
104 :
105 27 : try {
106 27 : m_socket.connect(connection_string);
107 24 : m_connection_info.connection_string = m_socket.get(zmq::sockopt::last_endpoint);
108 24 : m_socket_connected = true;
109 3 : } catch (zmq::error_t const& err) {
110 3 : ers::error(ZmqOperationError(
111 6 : ERS_HERE, m_connection_info.connection_name, "connect", "send", err.what(), connection_string));
112 3 : }
113 :
114 27 : if (!m_socket_connected) {
115 3 : throw ZmqOperationError(ERS_HERE,
116 3 : m_connection_info.connection_name,
117 : "connect",
118 : "send",
119 : "Operation failed for all resolved connection strings",
120 6 : "");
121 : }
122 48 : return m_connection_info.connection_string;
123 27 : }
124 :
125 : protected:
126 20028 : bool send_(const void* message,
127 : int N,
128 : const duration_t& timeout,
129 : std::string const& topic,
130 : bool no_tmoexcept_mode) override
131 : {
132 20028 : TLOG_DEBUG(TLVL_ZMQSENDER_SEND_START)
133 20028 : << m_connection_info.connection_name << ": Starting send of " << N << " bytes";
134 20028 : auto start_time = std::chrono::steady_clock::now();
135 20028 : zmq::send_result_t res{};
136 7605704 : do {
137 :
138 7605704 : zmq::message_t topic_msg(topic.c_str(), topic.size());
139 7605704 : try {
140 7605704 : res = m_socket.send(topic_msg, zmq::send_flags::sndmore);
141 0 : } catch (zmq::error_t const& err) {
142 0 : throw ZmqSendError(ERS_HERE, m_connection_info.connection_name, err.what(), topic.size(), topic);
143 0 : }
144 :
145 7605704 : if (!res || res != topic.size()) {
146 7585677 : TLOG_DEBUG(TLVL_ZMQSENDER_SEND_ERR) << m_connection_info.connection_name << ": Unable to send message";
147 7585677 : continue;
148 7585677 : }
149 :
150 20027 : zmq::message_t msg(message, N);
151 20027 : try {
152 20027 : res = m_socket.send(msg, zmq::send_flags::none);
153 0 : } catch (zmq::error_t const& err) {
154 0 : throw ZmqSendError(ERS_HERE, m_connection_info.connection_name, err.what(), N, topic);
155 0 : }
156 :
157 20027 : if (!res && timeout > duration_t::zero()) {
158 0 : usleep(1000);
159 : }
160 15231436 : } while (std::chrono::duration_cast<duration_t>(std::chrono::steady_clock::now() - start_time) < timeout && !res);
161 :
162 20028 : if (!res && !no_tmoexcept_mode) {
163 1 : throw SendTimeoutExpired(ERS_HERE, m_connection_info.connection_name, timeout.count());
164 : }
165 :
166 20027 : TLOG_DEBUG(TLVL_ZMQSENDER_SEND_END) << m_connection_info.connection_name << ": Completed send of " << N
167 20027 : << " bytes";
168 20027 : return res && res == N;
169 : }
170 :
171 : private:
172 : zmq::socket_t m_socket;
173 : bool m_socket_connected{ false };
174 : };
175 :
176 : } // namespace dunedaq::ipm
177 :
178 26 : DEFINE_DUNE_IPM_SENDER(dunedaq::ipm::ZmqSender)
|