Line data Source code
1 : /**
2 : * @file DaphneV3Interface.cpp
3 : *
4 : * Implementations of DaphneV3Interface's functions
5 : *
6 : * This is part of the DUNE DAQ Software Suite, copyright 2020.
7 : * Licensing/copyright details are in the COPYING file that you should have
8 : * received with this code.
9 : *
10 : */
11 :
12 : #include "DaphneV3Interface.hpp"
13 :
14 : #include "logging/Logging.hpp"
15 : #include <fmt/format.h>
16 :
17 : #include <regex>
18 : #include <string>
19 : #include <utility>
20 :
21 : using namespace dunedaq::daphnemodules;
22 : using namespace daphne;
23 :
24 4 : DaphneV3Interface::DaphneV3Interface( std::string address,
25 : std::string routing,
26 4 : std::chrono::milliseconds timeout)
27 4 : : m_context(1)
28 4 : , m_socket(m_context, zmq::socket_type::dealer)
29 4 : , m_timeout(timeout) {
30 :
31 :
32 4 : m_socket.set(zmq::sockopt::routing_id, routing);
33 4 : auto value = (int) m_timeout.count(); // NOLINT
34 8 : TLOG() << routing << " timeout set to " << value << " ms";
35 4 : m_socket.set(zmq::sockopt::rcvtimeo, value);
36 4 : m_socket.set(zmq::sockopt::sndtimeo, value);
37 4 : m_socket.set(zmq::sockopt::immediate, 1); // Don't queue messages to incomplete connections
38 :
39 : // find out if the address has a port with a regex
40 4 : static const std::regex ip_with_port(R"(^([^\/\s:]+)(?::(\d{1,5}))?$)");
41 4 : std::smatch string_values;
42 4 : if (! std::regex_match( address, string_values, ip_with_port ) ) {
43 2 : throw InvalidIPAddress(ERS_HERE, address);
44 : }
45 :
46 6 : m_connection = string_values[2].matched ?
47 : fmt::format("tcp://{}", address) :
48 2 : fmt::format("tcp://{}:{}", address, s_default_control_port) ;
49 4 : TLOG() << "Connecting to: " << m_connection;
50 :
51 2 : m_socket.connect(m_connection);
52 :
53 2 : auto add = string_values[1];
54 2 : auto port = string_values[2].matched ? std::stoi(string_values[2]) : s_default_control_port;
55 :
56 2 : try {
57 2 : if ( ! validate_connection() ) {
58 0 : auto add = string_values[1];
59 0 : auto port = string_values[2].matched ? std::stoi(string_values[2]) : s_default_control_port;
60 0 : throw FailedPing(ERS_HERE, add, port );
61 : }
62 2 : } catch ( const ers::Issue & e ) {
63 2 : throw FailedPing(ERS_HERE, add, port, e );
64 2 : }
65 :
66 16 : }
67 :
68 :
69 0 : void DaphneV3Interface::close() {
70 :
71 0 : const std::lock_guard<std::mutex> lock(m_access_mutex);
72 0 : m_socket.set(zmq::sockopt::linger, 0);
73 0 : m_socket.close();
74 :
75 0 : }
76 :
77 :
78 0 : ControlEnvelopeV2 DaphneV3Interface::send( std::string && message, daphne::MessageTypeV2 type) {
79 :
80 0 : const std::lock_guard<std::mutex> lock(m_access_mutex);
81 :
82 0 : _send(std::move(message), type);
83 :
84 0 : return _receive();
85 0 : }
86 :
87 :
88 2 : void DaphneV3Interface::_send( std::string && message, daphne::MessageTypeV2 type) {
89 :
90 2 : ControlEnvelopeV2 env;
91 2 : env.set_version(2);
92 2 : env.set_dir(DIR_REQUEST);
93 :
94 2 : env.set_type(type);
95 2 : env.set_payload(message);
96 :
97 : // additional information
98 2 : env.set_msg_id( m_message_counter++ );
99 2 : env.set_timestamp_ns(std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::steady_clock::now().time_since_epoch()).count());
100 :
101 :
102 2 : std::string bytes = env.SerializeAsString();
103 :
104 2 : if (! m_socket.send(zmq::buffer(bytes), zmq::send_flags::none)) {
105 2 : throw FailedSend(ERS_HERE, MessageTypeV2_Name(type) );
106 : }
107 4 : }
108 :
109 0 : ControlEnvelopeV2 DaphneV3Interface::_receive() {
110 :
111 0 : zmq::message_t reply;
112 0 : if (! m_socket.recv(reply, zmq::recv_flags::none)) {
113 : // timeout or EAGAIN
114 0 : throw FailedReceive(ERS_HERE, m_connection);
115 : }
116 0 : if (reply.size() <= 0) {
117 0 : throw EmptyPayload(ERS_HERE, m_connection);
118 : }
119 :
120 0 : ControlEnvelopeV2 rep;
121 0 : if (!rep.ParseFromArray(reply.data(), static_cast<int>(reply.size()) )) {
122 0 : throw FailedDecoding(ERS_HERE, rep.GetTypeName(), reply.to_string());
123 : }
124 :
125 0 : if (rep.version() != 2 || rep.dir() != DIR_RESPONSE) {
126 0 : ers::warning(UnexpectedDirection(ERS_HERE, rep.version(), Direction_Name(rep.dir()), reply.to_string() ));
127 : }
128 :
129 0 : return rep;
130 0 : }
131 :
132 :
133 2 : bool DaphneV3Interface::validate_connection()
134 : {
135 2 : static const uint64_t good_value = 0xdeadbeef; // NOLINT
136 :
137 2 : TestRegRequest req; // empty
138 4 : auto reply = send<TestRegResponse>( req.SerializeAsString(),
139 : MessageTypeV2::MT2_READ_TEST_REG_REQ,
140 2 : MessageTypeV2::MT2_READ_TEST_REG_RESP );
141 :
142 0 : return reply.value() == good_value;
143 2 : }
144 :
145 :
146 :
|