DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
NetworkSenderModel.hxx
Go to the documentation of this file.
4
5#include "ipm/Sender.hpp"
6#include "logging/Logging.hpp"
8
9#include <memory>
10#include <string>
11#include <typeinfo>
12#include <utility>
13
14namespace dunedaq::iomanager {
15
16template<typename Datatype>
18 : SenderConcept<Datatype>(conn_id)
19{
20 TLOG("NetworkSenderModel") << "NetworkSenderModel created with DT! Addr: " << static_cast<void*>(this)
21 << ", uid=" << conn_id.uid << ", data_type=" << conn_id.data_type;
22 get_sender(std::chrono::milliseconds(1000));
23 if (m_network_sender_ptr == nullptr) {
24 TLOG("NetworkSenderModel") << "Initial connection attempt failed for uid=" << conn_id.uid
25 << ", data_type=" << conn_id.data_type;
26 }
27}
28
29template<typename Datatype>
31 : SenderConcept<Datatype>(other.m_conn.uid)
32 , m_network_sender_ptr(std::move(other.m_network_sender_ptr))
33 , m_topic(std::move(other.m_topic))
34{
35}
36
37template<typename Datatype>
38inline void
39NetworkSenderModel<Datatype>::send(Datatype&& data, Sender::timeout_t timeout) // NOLINT
40{
41 try {
42 write_network<Datatype>(data, timeout);
43 } catch (ipm::SendTimeoutExpired& ex) {
44 throw TimeoutExpired(ERS_HERE, this->id().uid, "send", timeout.count(), ex);
45 }
46}
47
48template<typename Datatype>
49inline bool
51{
52 return try_write_network<Datatype>(data, timeout);
53}
54
55template<typename Datatype>
56inline void
57NetworkSenderModel<Datatype>::send_with_topic(Datatype&& data, Sender::timeout_t timeout, std::string topic) // NOLINT
58{
59 try {
60 write_network_with_topic<Datatype>(data, timeout, topic);
61 } catch (ipm::SendTimeoutExpired& ex) {
62 throw TimeoutExpired(ERS_HERE, this->id().uid, "send", timeout.count(), ex);
63 }
64}
65
66template<typename Datatype>
67inline bool
69{
70 get_sender(timeout);
71 return (m_network_sender_ptr != nullptr);
72}
73
74template<typename Datatype>
75inline void
77{
78 auto start = std::chrono::steady_clock::now();
79 while (m_network_sender_ptr == nullptr &&
80 std::chrono::duration_cast<Sender::timeout_t>(std::chrono::steady_clock::now() - start) <= timeout) {
81 // get network resources
82 try {
83 m_network_sender_ptr = NetworkManager::get().get_sender(this->id());
84
85 if (NetworkManager::get().is_pubsub_connection(this->id())) {
86 TLOG("NetworkSenderModel") << "Setting topic to " << this->id().data_type;
87 m_topic = this->id().data_type;
88 }
89 } catch (ers::Issue const& ex) {
90 m_network_sender_ptr = nullptr;
91 std::this_thread::sleep_for(std::chrono::milliseconds(1));
92 }
93 }
94}
95
96template<typename Datatype>
97template<typename MessageType>
98inline typename std::enable_if<dunedaq::serialization::is_serializable<MessageType>::value, void>::type
100{
101 std::lock_guard<std::mutex> lk(m_send_mutex);
102 get_sender(timeout);
103 if (m_network_sender_ptr == nullptr) {
104 throw TimeoutExpired(
105 ERS_HERE, this->id().uid, "send", timeout.count(), ConnectionInstanceNotFound(ERS_HERE, this->id().uid));
106 }
107
109 // TLOG("NetworkSenderModel") << "Serialized message for network sending: " << serialized.size() << ", topic=" <<
110 // m_topic << ", this="
111 // << (void*)this;
112
113 try {
114 m_network_sender_ptr->send(serialized.data(), serialized.size(), extend_first_timeout(timeout), m_topic);
115 } catch (ipm::SendTimeoutExpired const& ex) {
116 TLOG("NetworkSenderModel") << "Timeout detected, removing sender to re-acquire connection";
118 m_network_sender_ptr = nullptr;
119 throw;
120 }
121}
122
123template<typename Datatype>
124template<typename MessageType>
125inline typename std::enable_if<!dunedaq::serialization::is_serializable<MessageType>::value, void>::type
127{
128 throw NetworkMessageNotSerializable(ERS_HERE, typeid(MessageType).name()); // NOLINT(runtime/rtti)
129}
130
131template<typename Datatype>
132template<typename MessageType>
133inline typename std::enable_if<dunedaq::serialization::is_serializable<MessageType>::value, bool>::type
135{
136 std::lock_guard<std::mutex> lk(m_send_mutex);
137 get_sender(timeout);
138 if (m_network_sender_ptr == nullptr) {
139 TLOG("NetworkSenderModel") << ConnectionInstanceNotFound(ERS_HERE, this->id().uid);
140 return false;
141 }
142
144 // TLOG("NetworkSenderModel") << "Serialized message for network sending: " << serialized.size() << ", topic=" <<
145 // m_topic <<
146 // ", this=" << (void*)this;
147
148 auto res =
149 m_network_sender_ptr->send(serialized.data(), serialized.size(), extend_first_timeout(timeout), m_topic, true);
150 if (!res) {
151 TLOG("NetworkSenderModel") << "Timeout detected, removing sender to re-acquire connection";
153 m_network_sender_ptr = nullptr;
154 }
155 return res;
156}
157
158template<typename Datatype>
159template<typename MessageType>
160inline typename std::enable_if<!dunedaq::serialization::is_serializable<MessageType>::value, bool>::type
162{
163 ers::error(NetworkMessageNotSerializable(ERS_HERE, typeid(MessageType).name())); // NOLINT(runtime/rtti)
164 return false;
165}
166
167template<typename Datatype>
168template<typename MessageType>
169inline typename std::enable_if<dunedaq::serialization::is_serializable<MessageType>::value, void>::type
171 Sender::timeout_t const& timeout,
172 std::string topic)
173{
174 std::lock_guard<std::mutex> lk(m_send_mutex);
175 get_sender(timeout);
176 if (m_network_sender_ptr == nullptr) {
177 throw TimeoutExpired(
178 ERS_HERE, this->id().uid, "send", timeout.count(), ConnectionInstanceNotFound(ERS_HERE, this->id().uid));
179 }
180
182 // TLOG("NetworkSenderModel") << "Serialized message for network sending: " << serialized.size() << ", topic=" <<
183 // m_topic << ", this="
184 // << (void*)this;
185
186 try {
187 m_network_sender_ptr->send(serialized.data(), serialized.size(), timeout, topic);
188 } catch (TimeoutExpired const& ex) {
189 m_network_sender_ptr = nullptr;
190 throw;
191 }
192}
193
194template<typename Datatype>
195template<typename MessageType>
196inline typename std::enable_if<!dunedaq::serialization::is_serializable<MessageType>::value, void>::type
198{
199 throw NetworkMessageNotSerializable(ERS_HERE, typeid(MessageType).name()); // NOLINT(runtime/rtti)
200}
201
202template<typename Datatype>
205{
206 if (m_first) {
207 m_first = false;
208 if (timeout > 1000ms) {
209 return timeout;
210 }
211 return 1000ms;
212 }
213
214 return timeout;
215}
216
217} // namespace dunedaq::iomanager
#define ERS_HERE
static NetworkManager & get()
std::shared_ptr< ipm::Sender > get_sender(ConnectionId const &conn_id)
void remove_sender(ConnectionId const &conn_id)
void get_sender(Sender::timeout_t const &timeout)
std::enable_if< dunedaq::serialization::is_serializable< MessageType >::value, void >::type write_network(MessageType &message, Sender::timeout_t const &timeout)
bool try_send(Datatype &&data, Sender::timeout_t timeout) override
void send(Datatype &&data, Sender::timeout_t timeout) override
NetworkSenderModel(ConnectionId const &conn_id)
std::shared_ptr< ipm::Sender > m_network_sender_ptr
bool is_ready_for_sending(Sender::timeout_t timeout) override
std::enable_if< dunedaq::serialization::is_serializable< MessageType >::value, void >::type write_network_with_topic(MessageType &message, Sender::timeout_t const &timeout, std::string topic)
void send_with_topic(Datatype &&data, Sender::timeout_t timeout, std::string topic) override
std::enable_if< dunedaq::serialization::is_serializable< MessageType >::value, bool >::type try_write_network(MessageType &message, Sender::timeout_t const &timeout)
Sender::timeout_t extend_first_timeout(Sender::timeout_t timeout)
std::chrono::milliseconds timeout_t
Definition Sender.hpp:24
Base class for any user define issue.
Definition Issue.hpp:69
#define TLOG(...)
Definition macro.hpp:22
std::vector< uint8_t > serialize(const T &obj, SerializationType stype)
Serialize object obj using serialization method stype.
void error(const Issue &issue)
Definition ers.hpp:81