15#include <fmt/format.h>
26 std::chrono::milliseconds timeout)
28 , m_socket(m_context, zmq::socket_type::dealer)
29 , m_timeout(timeout) {
32 m_socket.set(zmq::sockopt::routing_id, routing);
34 TLOG() << routing <<
" timeout set to " << value <<
" ms";
35 m_socket.set(zmq::sockopt::rcvtimeo, value);
36 m_socket.set(zmq::sockopt::sndtimeo, value);
37 m_socket.set(zmq::sockopt::immediate, 1);
40 static const std::regex ip_with_port(R
"(^([^\/\s:]+)(?::(\d{1,5}))?$)");
41 std::smatch string_values;
42 if (! std::regex_match(
address, string_values, ip_with_port ) ) {
47 fmt::format(
"tcp://{}",
address) :
53 auto add = string_values[1];
58 auto add = string_values[1];
72 m_socket.set(zmq::sockopt::linger, 0);
82 _send(std::move(message), type);
99 env.
set_timestamp_ns(std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::steady_clock::now().time_since_epoch()).count());
102 std::string bytes = env.SerializeAsString();
104 if (!
m_socket.send(zmq::buffer(bytes), zmq::send_flags::none)) {
111 zmq::message_t reply;
112 if (!
m_socket.recv(reply, zmq::recv_flags::none)) {
116 if (reply.size() <= 0) {
121 if (!rep.ParseFromArray(reply.data(),
static_cast<int>(reply.size()) )) {
122 throw FailedDecoding(
ERS_HERE, rep.GetTypeName(), reply.to_string());
135 static const uint64_t good_value = 0xdeadbeef;
139 MessageTypeV2::MT2_READ_TEST_REG_REQ,
140 MessageTypeV2::MT2_READ_TEST_REG_RESP );
142 return reply.value() == good_value;
void set_timestamp_ns(::uint64_t value)
::uint32_t version() const
void set_payload(Arg_ &&arg, Args_... args)
::daphne::Direction dir() const
void set_type(::daphne::MessageTypeV2 value)
void set_dir(::daphne::Direction value)
void set_version(::uint32_t value)
void set_msg_id(::uint64_t value)
static const size_t s_default_control_port
std::mutex m_access_mutex
DaphneV3Interface(std::string address, std::string routing, std::chrono::milliseconds timeout=std::chrono::milliseconds(500))
daphne::ControlEnvelopeV2 send(std::string &&message, daphne::MessageTypeV2)
std::chrono::milliseconds m_timeout
void _send(std::string &&message, daphne::MessageTypeV2)
bool validate_connection()
uint64_t m_message_counter
daphne::ControlEnvelopeV2 _receive()
Base class for any user define issue.
const std::string & MessageTypeV2_Name(T value)
const std::string & Direction_Name(T value)
void warning(const Issue &issue)