14using namespace std::chrono_literals;
18template<
typename Datatype>
22 TLOG() <<
"NetworkSenderModel created with DT! Addr: " <<
static_cast<void*
>(
this)
23 <<
", uid=" << conn_id.
uid <<
", data_type=" << conn_id.
data_type;
26 TLOG() <<
"Initial connection attempt failed for uid=" << conn_id.
uid
31template<
typename Datatype>
34 , m_network_sender_ptr(
std::move(other.m_network_sender_ptr))
35 , m_topic(
std::move(other.m_topic))
39template<
typename Datatype>
44 write_network<Datatype>(data, timeout);
45 }
catch (ipm::SendTimeoutExpired& ex) {
50template<
typename Datatype>
54 return try_write_network<Datatype>(data, timeout);
57template<
typename Datatype>
62 write_network_with_topic<Datatype>(data, timeout, topic);
63 }
catch (ipm::SendTimeoutExpired& ex) {
68template<
typename Datatype>
73 return (m_network_sender_ptr !=
nullptr);
76template<
typename Datatype>
80 auto start = std::chrono::steady_clock::now();
81 while (m_network_sender_ptr ==
nullptr &&
82 std::chrono::duration_cast<Sender::timeout_t>(std::chrono::steady_clock::now() - start) <= timeout) {
88 TLOG() <<
"Setting topic to " << this->id().data_type;
89 m_topic = this->id().data_type;
92 m_network_sender_ptr =
nullptr;
93 std::this_thread::sleep_for(std::chrono::milliseconds(1));
98template<
typename Datatype>
99template<
typename MessageType>
100inline typename std::enable_if<dunedaq::serialization::is_serializable<MessageType>::value,
void>::type
103 std::lock_guard<std::mutex> lk(m_send_mutex);
105 if (m_network_sender_ptr ==
nullptr) {
107 ERS_HERE, this->
id().uid,
"send", timeout.count(), ConnectionInstanceNotFound(
ERS_HERE, this->
id().uid));
110 auto serialized = dunedaq::serialization::serialize(message);
116 m_network_sender_ptr->send(serialized.data(), serialized.size(), extend_first_timeout(timeout), m_topic);
117 }
catch (ipm::SendTimeoutExpired
const& ex) {
118 TLOG() <<
"Timeout detected, removing sender to re-acquire connection";
120 m_network_sender_ptr =
nullptr;
125template<
typename Datatype>
126template<
typename MessageType>
127inline typename std::enable_if<!dunedaq::serialization::is_serializable<MessageType>::value,
void>::type
130 throw NetworkMessageNotSerializable(
ERS_HERE,
typeid(MessageType).name());
133template<
typename Datatype>
134template<
typename MessageType>
135inline typename std::enable_if<dunedaq::serialization::is_serializable<MessageType>::value,
bool>::type
138 std::lock_guard<std::mutex> lk(m_send_mutex);
140 if (m_network_sender_ptr ==
nullptr) {
145 auto serialized = dunedaq::serialization::serialize(message);
151 m_network_sender_ptr->send(serialized.data(), serialized.size(), extend_first_timeout(timeout), m_topic,
true);
153 TLOG() <<
"Timeout detected, removing sender to re-acquire connection";
155 m_network_sender_ptr =
nullptr;
160template<
typename Datatype>
161template<
typename MessageType>
162inline typename std::enable_if<!dunedaq::serialization::is_serializable<MessageType>::value,
bool>::type
169template<
typename Datatype>
170template<
typename MessageType>
171inline typename std::enable_if<dunedaq::serialization::is_serializable<MessageType>::value,
void>::type
176 std::lock_guard<std::mutex> lk(m_send_mutex);
178 if (m_network_sender_ptr ==
nullptr) {
180 ERS_HERE, this->
id().uid,
"send", timeout.count(), ConnectionInstanceNotFound(
ERS_HERE, this->
id().uid));
183 auto serialized = dunedaq::serialization::serialize(message);
189 m_network_sender_ptr->send(serialized.data(), serialized.size(), timeout, topic);
191 m_network_sender_ptr =
nullptr;
196template<
typename Datatype>
197template<
typename MessageType>
198inline typename std::enable_if<!dunedaq::serialization::is_serializable<MessageType>::value,
void>::type
201 throw NetworkMessageNotSerializable(
ERS_HERE,
typeid(MessageType).name());
204template<
typename Datatype>
210 if (timeout > 1000ms) {
static NetworkManager & get()
std::shared_ptr< ipm::Sender > get_sender(ConnectionId const &conn_id)
void remove_sender(ConnectionId const &conn_id)
void get_sender(Sender::timeout_t const &timeout)
std::enable_if< serialization::is_serializable< MessageType >::value, void >::type write_network(MessageType &message, Sender::timeout_t const &timeout)
bool try_send(Datatype &&data, Sender::timeout_t timeout) override
void send(Datatype &&data, Sender::timeout_t timeout) override
NetworkSenderModel(ConnectionId const &conn_id)
std::shared_ptr< ipm::Sender > m_network_sender_ptr
bool is_ready_for_sending(Sender::timeout_t timeout) override
std::enable_if< serialization::is_serializable< MessageType >::value, void >::type write_network_with_topic(MessageType &message, Sender::timeout_t const &timeout, std::string topic)
void send_with_topic(Datatype &&data, Sender::timeout_t timeout, std::string topic) override
std::enable_if< serialization::is_serializable< MessageType >::value, bool >::type try_write_network(MessageType &message, Sender::timeout_t const &timeout)
Sender::timeout_t extend_first_timeout(Sender::timeout_t timeout)
std::chrono::milliseconds timeout_t
Base class for any user define issue.
#define TLOG_DEBUG(lvl,...)
void error(const Issue &issue)