DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
NetworkSenderModel.hxx
Go to the documentation of this file.
4
5#include "ipm/Sender.hpp"
6#include "logging/Logging.hpp"
8
9#include <memory>
10#include <string>
11#include <typeinfo>
12#include <utility>
13
14using namespace std::chrono_literals; // NOLINT
15
16namespace dunedaq::iomanager {
17
18template<typename Datatype>
20 : SenderConcept<Datatype>(conn_id)
21{
22 TLOG() << "NetworkSenderModel created with DT! Addr: " << static_cast<void*>(this)
23 << ", uid=" << conn_id.uid << ", data_type=" << conn_id.data_type;
24 get_sender(std::chrono::milliseconds(1000));
25 if (m_network_sender_ptr == nullptr) {
26 TLOG() << "Initial connection attempt failed for uid=" << conn_id.uid
27 << ", data_type=" << conn_id.data_type;
28 }
29}
30
31template<typename Datatype>
33 : SenderConcept<Datatype>(other.m_conn.uid)
34 , m_network_sender_ptr(std::move(other.m_network_sender_ptr))
35 , m_topic(std::move(other.m_topic))
36{
37}
38
39template<typename Datatype>
40inline void
41NetworkSenderModel<Datatype>::send(Datatype&& data, Sender::timeout_t timeout) // NOLINT
42{
43 try {
44 write_network<Datatype>(data, timeout);
45 } catch (ipm::SendTimeoutExpired& ex) {
46 throw TimeoutExpired(ERS_HERE, this->id().uid, "send", timeout.count(), ex);
47 }
48}
49
50template<typename Datatype>
51inline bool
53{
54 return try_write_network<Datatype>(data, timeout);
55}
56
57template<typename Datatype>
58inline void
59NetworkSenderModel<Datatype>::send_with_topic(Datatype&& data, Sender::timeout_t timeout, std::string topic) // NOLINT
60{
61 try {
62 write_network_with_topic<Datatype>(data, timeout, topic);
63 } catch (ipm::SendTimeoutExpired& ex) {
64 throw TimeoutExpired(ERS_HERE, this->id().uid, "send", timeout.count(), ex);
65 }
66}
67
68template<typename Datatype>
69inline bool
71{
72 get_sender(timeout);
73 return (m_network_sender_ptr != nullptr);
74}
75
76template<typename Datatype>
77inline void
79{
80 auto start = std::chrono::steady_clock::now();
81 while (m_network_sender_ptr == nullptr &&
82 std::chrono::duration_cast<Sender::timeout_t>(std::chrono::steady_clock::now() - start) <= timeout) {
83 // get network resources
84 try {
85 m_network_sender_ptr = NetworkManager::get().get_sender(this->id());
86
87 if (NetworkManager::get().is_pubsub_connection(this->id())) {
88 TLOG() << "Setting topic to " << this->id().data_type;
89 m_topic = this->id().data_type;
90 }
91 } catch (ers::Issue const& ex) {
92 m_network_sender_ptr = nullptr;
93 std::this_thread::sleep_for(std::chrono::milliseconds(1));
94 }
95 }
96}
97
98template<typename Datatype>
99template<typename MessageType>
100inline typename std::enable_if<dunedaq::serialization::is_serializable<MessageType>::value, void>::type
102{
103 std::lock_guard<std::mutex> lk(m_send_mutex);
104 get_sender(timeout);
105 if (m_network_sender_ptr == nullptr) {
106 throw TimeoutExpired(
107 ERS_HERE, this->id().uid, "send", timeout.count(), ConnectionInstanceNotFound(ERS_HERE, this->id().uid));
108 }
109
110 auto serialized = dunedaq::serialization::serialize(message);
111 // TLOG() << "Serialized message for network sending: " << serialized.size() << ", topic=" <<
112 // m_topic << ", this="
113 // << (void*)this;
114
115 try {
116 m_network_sender_ptr->send(serialized.data(), serialized.size(), extend_first_timeout(timeout), m_topic);
117 } catch (ipm::SendTimeoutExpired const& ex) {
118 TLOG() << "Timeout detected, removing sender to re-acquire connection";
120 m_network_sender_ptr = nullptr;
121 throw;
122 }
123}
124
125template<typename Datatype>
126template<typename MessageType>
127inline typename std::enable_if<!dunedaq::serialization::is_serializable<MessageType>::value, void>::type
129{
130 throw NetworkMessageNotSerializable(ERS_HERE, typeid(MessageType).name()); // NOLINT(runtime/rtti)
131}
132
133template<typename Datatype>
134template<typename MessageType>
135inline typename std::enable_if<dunedaq::serialization::is_serializable<MessageType>::value, bool>::type
137{
138 std::lock_guard<std::mutex> lk(m_send_mutex);
139 get_sender(timeout);
140 if (m_network_sender_ptr == nullptr) {
141 TLOG_DEBUG(5) << ConnectionInstanceNotFound(ERS_HERE, this->id().uid);
142 return false;
143 }
144
145 auto serialized = dunedaq::serialization::serialize(message);
146 // TLOG() << "Serialized message for network sending: " << serialized.size() << ", topic=" <<
147 // m_topic <<
148 // ", this=" << (void*)this;
149
150 auto res =
151 m_network_sender_ptr->send(serialized.data(), serialized.size(), extend_first_timeout(timeout), m_topic, true);
152 if (!res) {
153 TLOG() << "Timeout detected, removing sender to re-acquire connection";
155 m_network_sender_ptr = nullptr;
156 }
157 return res;
158}
159
160template<typename Datatype>
161template<typename MessageType>
162inline typename std::enable_if<!dunedaq::serialization::is_serializable<MessageType>::value, bool>::type
164{
165 ers::error(NetworkMessageNotSerializable(ERS_HERE, typeid(MessageType).name())); // NOLINT(runtime/rtti)
166 return false;
167}
168
169template<typename Datatype>
170template<typename MessageType>
171inline typename std::enable_if<dunedaq::serialization::is_serializable<MessageType>::value, void>::type
173 Sender::timeout_t const& timeout,
174 std::string topic)
175{
176 std::lock_guard<std::mutex> lk(m_send_mutex);
177 get_sender(timeout);
178 if (m_network_sender_ptr == nullptr) {
179 throw TimeoutExpired(
180 ERS_HERE, this->id().uid, "send", timeout.count(), ConnectionInstanceNotFound(ERS_HERE, this->id().uid));
181 }
182
183 auto serialized = dunedaq::serialization::serialize(message);
184 // TLOG() << "Serialized message for network sending: " << serialized.size() << ", topic=" <<
185 // m_topic << ", this="
186 // << (void*)this;
187
188 try {
189 m_network_sender_ptr->send(serialized.data(), serialized.size(), timeout, topic);
190 } catch (TimeoutExpired const& ex) {
191 m_network_sender_ptr = nullptr;
192 throw;
193 }
194}
195
196template<typename Datatype>
197template<typename MessageType>
198inline typename std::enable_if<!dunedaq::serialization::is_serializable<MessageType>::value, void>::type
200{
201 throw NetworkMessageNotSerializable(ERS_HERE, typeid(MessageType).name()); // NOLINT(runtime/rtti)
202}
203
204template<typename Datatype>
207{
208 if (m_first) {
209 m_first = false;
210 if (timeout > 1000ms) {
211 return timeout;
212 }
213 return 1000ms;
214 }
215
216 return timeout;
217}
218
219} // namespace dunedaq::iomanager
#define ERS_HERE
static NetworkManager & get()
std::shared_ptr< ipm::Sender > get_sender(ConnectionId const &conn_id)
void remove_sender(ConnectionId const &conn_id)
void get_sender(Sender::timeout_t const &timeout)
std::enable_if< serialization::is_serializable< MessageType >::value, void >::type write_network(MessageType &message, Sender::timeout_t const &timeout)
bool try_send(Datatype &&data, Sender::timeout_t timeout) override
void send(Datatype &&data, Sender::timeout_t timeout) override
NetworkSenderModel(ConnectionId const &conn_id)
std::shared_ptr< ipm::Sender > m_network_sender_ptr
bool is_ready_for_sending(Sender::timeout_t timeout) override
std::enable_if< serialization::is_serializable< MessageType >::value, void >::type write_network_with_topic(MessageType &message, Sender::timeout_t const &timeout, std::string topic)
void send_with_topic(Datatype &&data, Sender::timeout_t timeout, std::string topic) override
std::enable_if< serialization::is_serializable< MessageType >::value, bool >::type try_write_network(MessageType &message, Sender::timeout_t const &timeout)
Sender::timeout_t extend_first_timeout(Sender::timeout_t timeout)
std::chrono::milliseconds timeout_t
Definition Sender.hpp:24
Base class for any user define issue.
Definition Issue.hpp:69
#define TLOG_DEBUG(lvl,...)
Definition Logging.hpp:112
#define TLOG(...)
Definition macro.hpp:22
void error(const Issue &issue)
Definition ers.hpp:81