DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
ZmqContext.hpp
Go to the documentation of this file.
1#ifndef IPM_INCLUDE_IPM_ZMQCONTEXT_HPP_
2#define IPM_INCLUDE_IPM_ZMQCONTEXT_HPP_
3
13#include "ers/Issue.hpp"
14#include "logging/Logging.hpp" // NOTE: if ISSUES ARE DECLARED BEFORE include logging/Logging.hpp, TLOG_DEBUG<<issue wont work.
15#include "zmq.hpp"
16
17namespace dunedaq {
18
28 ZmqOperationError,
29 "An exception occured while calling " << operation << " on the ZMQ " << direction << " socket: "
30 << what << " (connection_string: " << connection_string << ")",
31 ((std::string)operation)((std::string)direction)((const char*)what)(
32 (std::string)connection_string)) // NOLINT
34
43 ZmqSendError,
44 "An exception occurred while sending " << N << " bytes to " << topic << ": " << what,
45 ((const char*)what)((int)N)((std::string)topic)) // NOLINT
47
55 ZmqReceiveError,
56 "An exception occured while receiving " << which << ": " << what,
57 ((const char*)what)((const char*)which)) // NOLINT
59
67 ZmqSubscribeError,
68 "An execption occured while subscribing to " << topic << ": " << what,
69 ((const char*)what)((std::string)topic)) // NOLINT
71
79 ZmqUnsubscribeError,
80 "An execption occured while unsubscribing from " << topic << ": " << what,
81 ((const char*)what)((std::string)topic)) // NOLINT
83
84namespace ipm {
86{
87public:
89 {
90 static ZmqContext s_ctx;
91 return s_ctx;
92 }
93
94 zmq::context_t& GetContext() { return m_context; }
95
96 void set_context_threads(int nthreads) {
97 TLOG_DEBUG(10) << "Setting ZMQ Context IO thread count to " << nthreads;
98 m_context.set(zmq::ctxopt::io_threads, nthreads); }
99 void set_context_maxsockets(int max_sockets) {
100 TLOG_DEBUG(10) << "Setting ZMQ Context max sockets to " << max_sockets;
101 m_context.set(zmq::ctxopt::max_sockets, max_sockets); }
102
103private:
105 {
106 auto threads_c = getenv("IPM_ZMQ_IO_THREADS");
107 if (threads_c != nullptr) {
108 auto threads = std::atoi(threads_c);
109 if (threads > 1) {
110 set_context_threads(threads);
111 }
112 }
113
114 bool sockets_set = false;
115 auto sockets_c = getenv("IPM_ZMQ_MAX_SOCKETS");
116 if (sockets_c != nullptr) {
117 auto sockets = std::atoi(sockets_c);
118 if (sockets > s_minimum_sockets) {
119 set_context_maxsockets(sockets);
120 sockets_set = true;
121 }
122 }
123 if(!sockets_set) {
125 }
126
127 }
128 ~ZmqContext() { m_context.close(); }
129 zmq::context_t m_context;
130 static constexpr int s_minimum_sockets = 16636;
131
132 ZmqContext(ZmqContext const&) = delete;
134 ZmqContext& operator=(ZmqContext const&) = delete;
136};
137} // namespace ipm
138} // namespace dunedaq
139
140#endif // IPM_INCLUDE_IPM_ZMQCONTEXT_HPP_
#define ERS_DECLARE_ISSUE(namespace_name, class_name, message, attributes)
void set_context_threads(int nthreads)
static ZmqContext & instance()
ZmqContext & operator=(ZmqContext &&)=delete
void set_context_maxsockets(int max_sockets)
ZmqContext(ZmqContext const &)=delete
static constexpr int s_minimum_sockets
ZmqContext(ZmqContext &&)=delete
zmq::context_t & GetContext()
ZmqContext & operator=(ZmqContext const &)=delete
#define TLOG_DEBUG(lvl,...)
Definition Logging.hpp:112
Including Qt Headers.