DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
DaphneV3Interface.cpp
Go to the documentation of this file.
1
12#include "DaphneV3Interface.hpp"
13
14#include "logging/Logging.hpp"
15#include <fmt/format.h>
16
17#include <regex>
18#include <string>
19#include <utility>
20
21using namespace dunedaq::daphnemodules;
22using namespace daphne;
23
25 std::string routing,
26 std::chrono::milliseconds timeout)
27 : m_context(1)
28 , m_socket(m_context, zmq::socket_type::dealer)
29 , m_timeout(timeout) {
30
31
32 m_socket.set(zmq::sockopt::routing_id, routing);
33 auto value = (int) m_timeout.count(); // NOLINT
34 TLOG() << routing << " timeout set to " << value << " ms";
35 m_socket.set(zmq::sockopt::rcvtimeo, value);
36 m_socket.set(zmq::sockopt::sndtimeo, value);
37 m_socket.set(zmq::sockopt::immediate, 1); // Don't queue messages to incomplete connections
38
39 // find out if the address has a port with a regex
40 static const std::regex ip_with_port(R"(^([^\/\s:]+)(?::(\d{1,5}))?$)");
41 std::smatch string_values;
42 if (! std::regex_match( address, string_values, ip_with_port ) ) {
44 }
45
46 m_connection = string_values[2].matched ?
47 fmt::format("tcp://{}", address) :
48 fmt::format("tcp://{}:{}", address, s_default_control_port) ;
49 TLOG() << "Connecting to: " << m_connection;
50
51 m_socket.connect(m_connection);
52
53 auto add = string_values[1];
54 auto port = string_values[2].matched ? std::stoi(string_values[2]) : s_default_control_port;
55
56 try {
57 if ( ! validate_connection() ) {
58 auto add = string_values[1];
59 auto port = string_values[2].matched ? std::stoi(string_values[2]) : s_default_control_port;
60 throw FailedPing(ERS_HERE, add, port );
61 }
62 } catch ( const ers::Issue & e ) {
63 throw FailedPing(ERS_HERE, add, port, e );
64 }
65
66}
67
68
70
71 const std::lock_guard<std::mutex> lock(m_access_mutex);
72 m_socket.set(zmq::sockopt::linger, 0);
73 m_socket.close();
74
75}
76
77
79
80 const std::lock_guard<std::mutex> lock(m_access_mutex);
81
82 _send(std::move(message), type);
83
84 return _receive();
85}
86
87
88void DaphneV3Interface::_send( std::string && message, daphne::MessageTypeV2 type) {
89
91 env.set_version(2);
93
94 env.set_type(type);
95 env.set_payload(message);
96
97 // additional information
99 env.set_timestamp_ns(std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::steady_clock::now().time_since_epoch()).count());
100
101
102 std::string bytes = env.SerializeAsString();
103
104 if (! m_socket.send(zmq::buffer(bytes), zmq::send_flags::none)) {
105 throw FailedSend(ERS_HERE, MessageTypeV2_Name(type) );
106 }
107}
108
110
111 zmq::message_t reply;
112 if (! m_socket.recv(reply, zmq::recv_flags::none)) {
113 // timeout or EAGAIN
114 throw FailedReceive(ERS_HERE, m_connection);
115 }
116 if (reply.size() <= 0) {
117 throw EmptyPayload(ERS_HERE, m_connection);
118 }
119
121 if (!rep.ParseFromArray(reply.data(), static_cast<int>(reply.size()) )) {
122 throw FailedDecoding(ERS_HERE, rep.GetTypeName(), reply.to_string());
123 }
124
125 if (rep.version() != 2 || rep.dir() != DIR_RESPONSE) {
126 ers::warning(UnexpectedDirection(ERS_HERE, rep.version(), Direction_Name(rep.dir()), reply.to_string() ));
127 }
128
129 return rep;
130}
131
132
134{
135 static const uint64_t good_value = 0xdeadbeef; // NOLINT
136
137 TestRegRequest req; // empty
138 auto reply = send<TestRegResponse>( req.SerializeAsString(),
139 MessageTypeV2::MT2_READ_TEST_REG_REQ,
140 MessageTypeV2::MT2_READ_TEST_REG_RESP );
141
142 return reply.value() == good_value;
143}
144
145
146
#define ERS_HERE
void set_timestamp_ns(::uint64_t value)
void set_payload(Arg_ &&arg, Args_... args)
::daphne::Direction dir() const
void set_type(::daphne::MessageTypeV2 value)
void set_dir(::daphne::Direction value)
void set_version(::uint32_t value)
void set_msg_id(::uint64_t value)
DaphneV3Interface(std::string address, std::string routing, std::chrono::milliseconds timeout=std::chrono::milliseconds(500))
daphne::ControlEnvelopeV2 send(std::string &&message, daphne::MessageTypeV2)
void _send(std::string &&message, daphne::MessageTypeV2)
Base class for any user define issue.
Definition Issue.hpp:69
#define TLOG(...)
Definition macro.hpp:22
const std::string & MessageTypeV2_Name(T value)
const std::string & Direction_Name(T value)
Invalid address
void warning(const Issue &issue)
Definition ers.hpp:115