DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
dunedaq::daphnemodules::DaphneV3Interface Class Reference

#include <DaphneV3Interface.hpp>

Public Member Functions

 DaphneV3Interface (std::string address, std::string routing, std::chrono::milliseconds timeout=std::chrono::milliseconds(500))
 
 ~DaphneV3Interface ()
 
 DaphneV3Interface (const DaphneV3Interface &)=delete
 
DaphneV3Interfaceoperator= (const DaphneV3Interface &)=delete
 
 DaphneV3Interface (DaphneV3Interface &&)=delete
 
DaphneV3Interfaceoperator= (DaphneV3Interface &&)=delete
 
daphne::ControlEnvelopeV2 send (std::string &&message, daphne::MessageTypeV2)
 
template<class T >
send (std::string &&message, daphne::MessageTypeV2 sent_type, daphne::MessageTypeV2 received_type)
 
bool validate_connection ()
 

Protected Member Functions

void _send (std::string &&message, daphne::MessageTypeV2)
 
daphne::ControlEnvelopeV2 _receive ()
 
void close ()
 

Private Attributes

zmq::context_t m_context
 
zmq::socket_t m_socket
 
std::mutex m_access_mutex
 
std::string m_connection
 
std::chrono::milliseconds m_timeout {1000}
 
uint64_t m_message_counter = 0
 

Static Private Attributes

static const size_t s_default_control_port = 40001
 

Detailed Description

Definition at line 88 of file DaphneV3Interface.hpp.

Constructor & Destructor Documentation

◆ DaphneV3Interface() [1/3]

DaphneV3Interface::DaphneV3Interface ( std::string address,
std::string routing,
std::chrono::milliseconds timeout = std::chrono::milliseconds(500) )

Definition at line 24 of file DaphneV3Interface.cpp.

27 : m_context(1)
28 , m_socket(m_context, zmq::socket_type::dealer)
29 , m_timeout(timeout) {
30
31
32 m_socket.set(zmq::sockopt::routing_id, routing);
33 auto value = (int) m_timeout.count(); // NOLINT
34 TLOG() << routing << " timeout set to " << value << " ms";
35 m_socket.set(zmq::sockopt::rcvtimeo, value);
36 m_socket.set(zmq::sockopt::sndtimeo, value);
37 m_socket.set(zmq::sockopt::immediate, 1); // Don't queue messages to incomplete connections
38
39 // find out if the address has a port with a regex
40 static const std::regex ip_with_port(R"(^([^\/\s:]+)(?::(\d{1,5}))?$)");
41 std::smatch string_values;
42 if (! std::regex_match( address, string_values, ip_with_port ) ) {
44 }
45
46 m_connection = string_values[2].matched ?
47 fmt::format("tcp://{}", address) :
48 fmt::format("tcp://{}:{}", address, s_default_control_port) ;
49 TLOG() << "Connecting to: " << m_connection;
50
51 m_socket.connect(m_connection);
52
53 auto add = string_values[1];
54 auto port = string_values[2].matched ? std::stoi(string_values[2]) : s_default_control_port;
55
56 try {
57 if ( ! validate_connection() ) {
58 auto add = string_values[1];
59 auto port = string_values[2].matched ? std::stoi(string_values[2]) : s_default_control_port;
60 throw FailedPing(ERS_HERE, add, port );
61 }
62 } catch ( const ers::Issue & e ) {
63 throw FailedPing(ERS_HERE, add, port, e );
64 }
65
66}
#define ERS_HERE
Base class for any user define issue.
Definition Issue.hpp:69
#define TLOG(...)
Definition macro.hpp:22
void add(QString const &db, QString const &fn)
Invalid address

◆ ~DaphneV3Interface()

dunedaq::daphnemodules::DaphneV3Interface::~DaphneV3Interface ( )
inline

◆ DaphneV3Interface() [2/3]

dunedaq::daphnemodules::DaphneV3Interface::DaphneV3Interface ( const DaphneV3Interface & )
delete

◆ DaphneV3Interface() [3/3]

dunedaq::daphnemodules::DaphneV3Interface::DaphneV3Interface ( DaphneV3Interface && )
delete

Member Function Documentation

◆ _receive()

ControlEnvelopeV2 DaphneV3Interface::_receive ( )
protected

Definition at line 109 of file DaphneV3Interface.cpp.

109 {
110
111 zmq::message_t reply;
112 if (! m_socket.recv(reply, zmq::recv_flags::none)) {
113 // timeout or EAGAIN
114 throw FailedReceive(ERS_HERE, m_connection);
115 }
116 if (reply.size() <= 0) {
117 throw EmptyPayload(ERS_HERE, m_connection);
118 }
119
121 if (!rep.ParseFromArray(reply.data(), static_cast<int>(reply.size()) )) {
122 throw FailedDecoding(ERS_HERE, rep.GetTypeName(), reply.to_string());
123 }
124
125 if (rep.version() != 2 || rep.dir() != DIR_RESPONSE) {
126 ers::warning(UnexpectedDirection(ERS_HERE, rep.version(), Direction_Name(rep.dir()), reply.to_string() ));
127 }
128
129 return rep;
130}
::daphne::Direction dir() const
const std::string & Direction_Name(T value)
void warning(const Issue &issue)
Definition ers.hpp:115

◆ _send()

void DaphneV3Interface::_send ( std::string && message,
daphne::MessageTypeV2 type )
protected

Definition at line 88 of file DaphneV3Interface.cpp.

88 {
89
91 env.set_version(2);
93
94 env.set_type(type);
95 env.set_payload(message);
96
97 // additional information
99 env.set_timestamp_ns(std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::steady_clock::now().time_since_epoch()).count());
100
101
102 std::string bytes = env.SerializeAsString();
103
104 if (! m_socket.send(zmq::buffer(bytes), zmq::send_flags::none)) {
105 throw FailedSend(ERS_HERE, MessageTypeV2_Name(type) );
106 }
107}
void set_timestamp_ns(::uint64_t value)
void set_payload(Arg_ &&arg, Args_... args)
void set_type(::daphne::MessageTypeV2 value)
void set_dir(::daphne::Direction value)
void set_version(::uint32_t value)
void set_msg_id(::uint64_t value)
const std::string & MessageTypeV2_Name(T value)

◆ close()

void DaphneV3Interface::close ( )
protected

Definition at line 69 of file DaphneV3Interface.cpp.

69 {
70
71 const std::lock_guard<std::mutex> lock(m_access_mutex);
72 m_socket.set(zmq::sockopt::linger, 0);
73 m_socket.close();
74
75}

◆ operator=() [1/2]

DaphneV3Interface & dunedaq::daphnemodules::DaphneV3Interface::operator= ( const DaphneV3Interface & )
delete

◆ operator=() [2/2]

DaphneV3Interface & dunedaq::daphnemodules::DaphneV3Interface::operator= ( DaphneV3Interface && )
delete

◆ send() [1/2]

template<class T >
T dunedaq::daphnemodules::DaphneV3Interface::send ( std::string && message,
daphne::MessageTypeV2 sent_type,
daphne::MessageTypeV2 received_type )

Implementations of DaphneV3Interface's functions

This is part of the DUNE DAQ Software Suite, copyright 2020.
Licensing/copyright details are in the COPYING file that you should have

Definition at line 11 of file DaphneV3Interface.hxx.

11 {
12
13 std::unique_lock<std::mutex> lock(m_access_mutex);
14
15 _send(std::move(message), sent_type);
16
17 auto ret = _receive();
18
19 lock.unlock();
20
21 const auto ty = ret.type();
22 T out;
23 if ( ty != received_type ) {
24 throw FailedDecoding(ERS_HERE, out.GetTypeName(), ret.payload(),
25 TypeMismatch(ERS_HERE, MessageTypeV2_Name(ty), MessageTypeV2_Name(received_type)) );
26 }
27
28 if (!out.ParseFromString(ret.payload())) {
29 throw FailedDecoding(ERS_HERE, out.GetTypeName(), ret.payload());
30 }
31
32 return out;
33}
void _send(std::string &&message, daphne::MessageTypeV2)
FELIX Initialization std::string initerror FELIX queue timed out

◆ send() [2/2]

ControlEnvelopeV2 DaphneV3Interface::send ( std::string && message,
daphne::MessageTypeV2 type )

Definition at line 78 of file DaphneV3Interface.cpp.

78 {
79
80 const std::lock_guard<std::mutex> lock(m_access_mutex);
81
82 _send(std::move(message), type);
83
84 return _receive();
85}

◆ validate_connection()

bool DaphneV3Interface::validate_connection ( )

Definition at line 133 of file DaphneV3Interface.cpp.

134{
135 static const uint64_t good_value = 0xdeadbeef; // NOLINT
136
137 TestRegRequest req; // empty
138 auto reply = send<TestRegResponse>( req.SerializeAsString(),
139 MessageTypeV2::MT2_READ_TEST_REG_REQ,
140 MessageTypeV2::MT2_READ_TEST_REG_RESP );
141
142 return reply.value() == good_value;
143}
daphne::ControlEnvelopeV2 send(std::string &&message, daphne::MessageTypeV2)

Member Data Documentation

◆ m_access_mutex

std::mutex dunedaq::daphnemodules::DaphneV3Interface::m_access_mutex
mutableprivate

Definition at line 125 of file DaphneV3Interface.hpp.

◆ m_connection

std::string dunedaq::daphnemodules::DaphneV3Interface::m_connection
private

Definition at line 127 of file DaphneV3Interface.hpp.

◆ m_context

zmq::context_t dunedaq::daphnemodules::DaphneV3Interface::m_context
private

Definition at line 122 of file DaphneV3Interface.hpp.

◆ m_message_counter

uint64_t dunedaq::daphnemodules::DaphneV3Interface::m_message_counter = 0
private

Definition at line 131 of file DaphneV3Interface.hpp.

◆ m_socket

zmq::socket_t dunedaq::daphnemodules::DaphneV3Interface::m_socket
private

Definition at line 124 of file DaphneV3Interface.hpp.

◆ m_timeout

std::chrono::milliseconds dunedaq::daphnemodules::DaphneV3Interface::m_timeout {1000}
private

Definition at line 129 of file DaphneV3Interface.hpp.

129{1000};

◆ s_default_control_port

const size_t dunedaq::daphnemodules::DaphneV3Interface::s_default_control_port = 40001
inlinestaticprivate

Definition at line 133 of file DaphneV3Interface.hpp.


The documentation for this class was generated from the following files: