DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
ZmqContext.hpp
Go to the documentation of this file.
1#ifndef IPM_INCLUDE_IPM_ZMQCONTEXT_HPP_
2#define IPM_INCLUDE_IPM_ZMQCONTEXT_HPP_
3
13#include "ers/Issue.hpp"
14#include "logging/Logging.hpp" // NOTE: if ISSUES ARE DECLARED BEFORE include logging/Logging.hpp, TLOG_DEBUG<<issue wont work.
15#include "zmq.hpp"
16
17enum
18{
39};
40
41namespace dunedaq {
42
52 ZmqOperationError,
53 connection_name << ": An exception occured while calling " << operation << " on the ZMQ " << direction
54 << " socket: " << what << " (connection_string: " << connection_string << ")",
55 ((std::string)connection_name)((std::string)operation)((std::string)direction)((const char*)what)(
56 (std::string)connection_string)) // NOLINT
58
67 ipm,
68 ZmqSendError,
69 connection_name << ": An exception occurred while sending " << N << " bytes to " << topic << ": " << what,
70 ((std::string)connection_name)((const char*)what)((int)N)((std::string)topic)) // NOLINT
72
80 ZmqReceiveError,
81 connection_name << ": An exception occured while receiving " << which << ": " << what,
82 ((std::string)connection_name)((const char*)what)((const char*)which)) // NOLINT
84
92 ZmqSubscribeError,
93 connection_name << ": An execption occured while subscribing to " << topic << ": " << what,
94 ((std::string)connection_name)((const char*)what)((std::string)topic)) // NOLINT
96
104 ZmqUnsubscribeError,
105 connection_name << ": An execption occured while unsubscribing from " << topic << ": " << what,
106 ((std::string)connection_name)((const char*)what)((std::string)topic)) // NOLINT
108
109namespace ipm {
111{
112public:
114 {
115 static ZmqContext s_ctx;
116 return s_ctx;
117 }
118
119 zmq::context_t& GetContext() { return m_context; }
120
121 void set_context_threads(int nthreads)
122 {
123 TLOG_DEBUG(TLVL_ZMQCONTEXT) << "Setting ZMQ Context IO thread count to " << nthreads;
124 m_context.set(zmq::ctxopt::io_threads, nthreads);
125 }
126 void set_context_maxsockets(int max_sockets)
127 {
128 TLOG_DEBUG(TLVL_ZMQCONTEXT) << "Setting ZMQ Context max sockets to " << max_sockets;
129 m_context.set(zmq::ctxopt::max_sockets, max_sockets);
130 }
131
132private:
134 {
135 auto threads_c = getenv("IPM_ZMQ_IO_THREADS");
136 if (threads_c != nullptr) {
137 auto threads = std::atoi(threads_c); // NOLINT If a conversion error occurs, we discard the result
138 if (threads > 1) {
139 set_context_threads(threads);
140 }
141 }
142
143 bool sockets_set = false;
144 auto sockets_c = getenv("IPM_ZMQ_MAX_SOCKETS");
145 if (sockets_c != nullptr) {
146 auto sockets = std::atoi(sockets_c); // NOLINT If a conversion error occurs, we discard the result
147 if (sockets > s_minimum_sockets) {
148 set_context_maxsockets(sockets);
149 sockets_set = true;
150 }
151 }
152 if (!sockets_set) {
154 }
155 }
157 {
158 TLOG_DEBUG(TLVL_ZMQCONTEXT) << "Closing ZMQ Context";
159 m_context.close();
160 TLOG_DEBUG(TLVL_ZMQCONTEXT) << "ZMQ Context closed";
161 }
162 zmq::context_t m_context;
163 static constexpr int s_minimum_sockets = 16636;
164
165 ZmqContext(ZmqContext const&) = delete;
167 ZmqContext& operator=(ZmqContext const&) = delete;
169};
170} // namespace ipm
171} // namespace dunedaq
172
173#endif // IPM_INCLUDE_IPM_ZMQCONTEXT_HPP_
@ TLVL_ZMQSUBSCRIBER_RECV_DATA_2
@ TLVL_ZMQSUBSCRIBER_RECV_HDR
@ TLVL_ZMQSUBSCRIBER_RECV_END
@ TLVL_ZMQRECEIVER_RECV_HDR_2
@ TLVL_ZMQSUBSCRIBER_RECV_DATA
@ TLVL_ZMQSENDER_SEND_END
@ TLVL_ZMQRECEIVER_RECV_DATA
@ TLVL_ZMQSUBSCRIBER_RECV_HDR_2
@ TLVL_ZMQCONTEXT
@ TLVL_ZMQSENDER_DESTRUCTOR
@ TLVL_ZMQPUBLISHER_DESTRUCTOR
@ TLVL_ZMQPUBLISHER_SEND_START
@ TLVL_ZMQPUBLISHER_SEND_ERR
@ TLVL_CONNECTIONSTRING
@ TLVL_ZMQRECEIVER_RECV_END
@ TLVL_ZMQRECEIVER_RECV_HDR
@ TLVL_ZMQRECEIVER_RECV_DATA_2
@ TLVL_ZMQSENDER_SEND_ERR
@ TLVL_ZMQPUBLISHER_SEND_END
@ TLVL_ZMQSENDER_SEND_START
void set_context_threads(int nthreads)
static ZmqContext & instance()
ZmqContext & operator=(ZmqContext &&)=delete
void set_context_maxsockets(int max_sockets)
ZmqContext(ZmqContext const &)=delete
static constexpr int s_minimum_sockets
ZmqContext(ZmqContext &&)=delete
zmq::context_t & GetContext()
ZmqContext & operator=(ZmqContext const &)=delete
#define TLOG_DEBUG(lvl,...)
Definition Logging.hpp:112
#define ERS_DECLARE_ISSUE( namespace_name, class_name, message_, attributes)
Definition macro.hpp:65
Including Qt Headers.