Line data Source code
1 : /**
2 : *
3 : * @file ZmqSender.cpp ZmqSender messaging class definitions
4 : *
5 : * This is part of the DUNE DAQ Application Framework, copyright 2020.
6 : * Licensing/copyright details are in the COPYING file that you should have
7 : * received with this code.
8 : */
9 :
10 : #include "ipm/Sender.hpp"
11 : #include "ipm/ZmqContext.hpp"
12 :
13 : #include "logging/Logging.hpp"
14 : #include "zmq.hpp"
15 :
16 : #include <string>
17 : #include <vector>
18 :
19 : namespace dunedaq::ipm {
20 : class ZmqSender : public Sender
21 : {
22 : public:
23 26 : ZmqSender()
24 26 : : m_socket(ZmqContext::instance().GetContext(), zmq::socket_type::push)
25 : {
26 26 : }
27 :
28 52 : ~ZmqSender()
29 26 : {
30 : // Probably (cpp)zmq does this in the socket dtor anyway, but I guess it doesn't hurt to be explicit
31 26 : if (m_connection_string != "" && m_socket_connected) {
32 24 : try {
33 48 : TLOG_DEBUG(TLVL_ZMQSENDER_DESTRUCTOR) << "Setting socket HWM to zero";
34 24 : m_socket.set(zmq::sockopt::sndhwm, 1);
35 :
36 48 : TLOG_DEBUG(TLVL_ZMQSENDER_DESTRUCTOR)
37 24 : << "Waiting up to 10s for socket to become writable before disconnecting";
38 24 : auto start_time = std::chrono::steady_clock::now();
39 24 : while (
40 104001 : std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - start_time).count() <
41 : 10000) {
42 103990 : auto events = m_socket.get(zmq::sockopt::events);
43 103990 : if ((events & ZMQ_POLLOUT) != 0) {
44 : break;
45 : }
46 103977 : usleep(1000);
47 : }
48 48 : TLOG_DEBUG(TLVL_ZMQSENDER_DESTRUCTOR) << "Disconnecting socket from " << m_connection_string;
49 :
50 :
51 24 : m_socket.disconnect(m_connection_string);
52 24 : m_socket_connected = false;
53 0 : } catch (zmq::error_t const& err) {
54 0 : ers::error(ZmqOperationError(ERS_HERE, "disconnect", "send", err.what(), m_connection_string));
55 0 : }
56 : }
57 26 : m_socket.close();
58 52 : }
59 :
60 20037 : bool can_send() const noexcept override { return m_socket_connected; }
61 27 : std::string connect_for_sends(const nlohmann::json& connection_info) override
62 : {
63 27 : try {
64 27 : m_socket.set(zmq::sockopt::sndtimeo, 0); // Return immediately if we can't send
65 0 : } catch (zmq::error_t const& err) {
66 0 : throw ZmqOperationError(ERS_HERE,
67 : "set timeout",
68 : "send",
69 0 : err.what(),
70 0 : connection_info.value<std::string>("connection_string", "inproc://default"));
71 0 : }
72 :
73 27 : auto hwm = connection_info.value<int>("capacity", 0);
74 27 : if (hwm > 0) {
75 0 : try {
76 0 : m_socket.set(zmq::sockopt::sndhwm, hwm);
77 0 : } catch (zmq::error_t const& err) {
78 0 : throw ZmqOperationError(ERS_HERE,
79 : "set hwm",
80 : "send",
81 0 : err.what(),
82 0 : connection_info.value<std::string>("connection_string", "inproc://default"));
83 0 : }
84 : }
85 :
86 27 : auto connection_string = connection_info.value<std::string>("connection_string", "inproc://default");
87 :
88 27 : TLOG_DEBUG(TLVL_CONNECTIONSTRING) << "Connection String is " << connection_string;
89 27 : try {
90 27 : m_socket.set(zmq::sockopt::immediate, 1); // Don't queue messages to incomplete connections
91 0 : } catch (zmq::error_t const& err) {
92 0 : throw ZmqOperationError(ERS_HERE, "set immediate mode", "send", err.what(), connection_string);
93 0 : }
94 :
95 27 : try {
96 27 : m_socket.connect(connection_string);
97 24 : m_connection_string = m_socket.get(zmq::sockopt::last_endpoint);
98 24 : m_socket_connected = true;
99 3 : } catch (zmq::error_t const& err) {
100 3 : ers::error(ZmqOperationError(ERS_HERE, "connect", "send", err.what(), connection_string));
101 3 : }
102 :
103 27 : if (!m_socket_connected) {
104 3 : throw ZmqOperationError(ERS_HERE, "connect", "send", "Operation failed for all resolved connection strings", "");
105 : }
106 48 : return m_connection_string;
107 27 : }
108 :
109 : protected:
110 20028 : bool send_(const void* message,
111 : int N,
112 : const duration_t& timeout,
113 : std::string const& topic,
114 : bool no_tmoexcept_mode) override
115 : {
116 20028 : TLOG_DEBUG(TLVL_ZMQSENDER_SEND_START)
117 20028 : << "Endpoint " << m_connection_string << ": Starting send of " << N << " bytes";
118 20028 : auto start_time = std::chrono::steady_clock::now();
119 20028 : zmq::send_result_t res{};
120 7409368 : do {
121 :
122 7409368 : zmq::message_t topic_msg(topic.c_str(), topic.size());
123 7409368 : try {
124 7409368 : res = m_socket.send(topic_msg, zmq::send_flags::sndmore);
125 0 : } catch (zmq::error_t const& err) {
126 0 : throw ZmqSendError(ERS_HERE, err.what(), topic.size(), topic);
127 0 : }
128 :
129 7409368 : if (!res || res != topic.size()) {
130 7389341 : TLOG_DEBUG(TLVL_ZMQSENDER_SEND_ERR) << "Endpoint " << m_connection_string << ": Unable to send message";
131 7389341 : continue;
132 7389341 : }
133 :
134 20027 : zmq::message_t msg(message, N);
135 20027 : try {
136 20027 : res = m_socket.send(msg, zmq::send_flags::none);
137 0 : } catch (zmq::error_t const& err) {
138 0 : throw ZmqSendError(ERS_HERE, err.what(), N, topic);
139 0 : }
140 :
141 20027 : if (!res && timeout > duration_t::zero()) {
142 0 : usleep(1000);
143 : }
144 14838764 : } while (std::chrono::duration_cast<duration_t>(std::chrono::steady_clock::now() - start_time) < timeout && !res);
145 :
146 20028 : if (!res && !no_tmoexcept_mode) {
147 1 : throw SendTimeoutExpired(ERS_HERE, timeout.count());
148 : }
149 :
150 20027 : TLOG_DEBUG(TLVL_ZMQSENDER_SEND_END) << "Endpoint " << m_connection_string << ": Completed send of " << N
151 20027 : << " bytes";
152 20027 : return res && res == N;
153 : }
154 :
155 : private:
156 : zmq::socket_t m_socket;
157 : std::string m_connection_string;
158 : bool m_socket_connected{ false };
159 : };
160 :
161 : } // namespace dunedaq::ipm
162 :
163 26 : DEFINE_DUNE_IPM_SENDER(dunedaq::ipm::ZmqSender)
|