16template<
typename Datatype>
20 TLOG(
"NetworkSenderModel") <<
"NetworkSenderModel created with DT! Addr: " <<
static_cast<void*
>(
this)
21 <<
", uid=" << conn_id.
uid <<
", data_type=" << conn_id.
data_type;
24 TLOG(
"NetworkSenderModel") <<
"Initial connection attempt failed for uid=" << conn_id.
uid
29template<
typename Datatype>
32 , m_network_sender_ptr(
std::move(other.m_network_sender_ptr))
33 , m_topic(
std::move(other.m_topic))
37template<
typename Datatype>
42 write_network<Datatype>(data, timeout);
43 }
catch (ipm::SendTimeoutExpired& ex) {
48template<
typename Datatype>
52 return try_write_network<Datatype>(data, timeout);
55template<
typename Datatype>
60 write_network_with_topic<Datatype>(data, timeout, topic);
61 }
catch (ipm::SendTimeoutExpired& ex) {
66template<
typename Datatype>
71 return (m_network_sender_ptr !=
nullptr);
74template<
typename Datatype>
78 auto start = std::chrono::steady_clock::now();
79 while (m_network_sender_ptr ==
nullptr &&
80 std::chrono::duration_cast<Sender::timeout_t>(std::chrono::steady_clock::now() - start) <= timeout) {
86 TLOG(
"NetworkSenderModel") <<
"Setting topic to " << this->id().data_type;
87 m_topic = this->id().data_type;
90 m_network_sender_ptr =
nullptr;
91 std::this_thread::sleep_for(std::chrono::milliseconds(1));
96template<
typename Datatype>
97template<
typename MessageType>
98inline typename std::enable_if<dunedaq::serialization::is_serializable<MessageType>::value,
void>::type
101 std::lock_guard<std::mutex> lk(m_send_mutex);
103 if (m_network_sender_ptr ==
nullptr) {
105 ERS_HERE, this->
id().uid,
"send", timeout.count(), ConnectionInstanceNotFound(
ERS_HERE, this->
id().uid));
114 m_network_sender_ptr->send(serialized.data(), serialized.size(), extend_first_timeout(timeout), m_topic);
115 }
catch (ipm::SendTimeoutExpired
const& ex) {
116 TLOG(
"NetworkSenderModel") <<
"Timeout detected, removing sender to re-acquire connection";
118 m_network_sender_ptr =
nullptr;
123template<
typename Datatype>
124template<
typename MessageType>
125inline typename std::enable_if<!dunedaq::serialization::is_serializable<MessageType>::value,
void>::type
128 throw NetworkMessageNotSerializable(
ERS_HERE,
typeid(MessageType).name());
131template<
typename Datatype>
132template<
typename MessageType>
133inline typename std::enable_if<dunedaq::serialization::is_serializable<MessageType>::value,
bool>::type
136 std::lock_guard<std::mutex> lk(m_send_mutex);
138 if (m_network_sender_ptr ==
nullptr) {
139 TLOG(
"NetworkSenderModel") << ConnectionInstanceNotFound(
ERS_HERE, this->
id().uid);
149 m_network_sender_ptr->send(serialized.data(), serialized.size(), extend_first_timeout(timeout), m_topic,
true);
151 TLOG(
"NetworkSenderModel") <<
"Timeout detected, removing sender to re-acquire connection";
153 m_network_sender_ptr =
nullptr;
158template<
typename Datatype>
159template<
typename MessageType>
160inline typename std::enable_if<!dunedaq::serialization::is_serializable<MessageType>::value,
bool>::type
167template<
typename Datatype>
168template<
typename MessageType>
169inline typename std::enable_if<dunedaq::serialization::is_serializable<MessageType>::value,
void>::type
174 std::lock_guard<std::mutex> lk(m_send_mutex);
176 if (m_network_sender_ptr ==
nullptr) {
178 ERS_HERE, this->
id().uid,
"send", timeout.count(), ConnectionInstanceNotFound(
ERS_HERE, this->
id().uid));
187 m_network_sender_ptr->send(serialized.data(), serialized.size(), timeout, topic);
189 m_network_sender_ptr =
nullptr;
194template<
typename Datatype>
195template<
typename MessageType>
196inline typename std::enable_if<!dunedaq::serialization::is_serializable<MessageType>::value,
void>::type
199 throw NetworkMessageNotSerializable(
ERS_HERE,
typeid(MessageType).name());
202template<
typename Datatype>
208 if (timeout > 1000ms) {
static NetworkManager & get()
std::shared_ptr< ipm::Sender > get_sender(ConnectionId const &conn_id)
void remove_sender(ConnectionId const &conn_id)
void get_sender(Sender::timeout_t const &timeout)
std::enable_if< dunedaq::serialization::is_serializable< MessageType >::value, void >::type write_network(MessageType &message, Sender::timeout_t const &timeout)
bool try_send(Datatype &&data, Sender::timeout_t timeout) override
void send(Datatype &&data, Sender::timeout_t timeout) override
NetworkSenderModel(ConnectionId const &conn_id)
std::shared_ptr< ipm::Sender > m_network_sender_ptr
bool is_ready_for_sending(Sender::timeout_t timeout) override
std::enable_if< dunedaq::serialization::is_serializable< MessageType >::value, void >::type write_network_with_topic(MessageType &message, Sender::timeout_t const &timeout, std::string topic)
void send_with_topic(Datatype &&data, Sender::timeout_t timeout, std::string topic) override
std::enable_if< dunedaq::serialization::is_serializable< MessageType >::value, bool >::type try_write_network(MessageType &message, Sender::timeout_t const &timeout)
Sender::timeout_t extend_first_timeout(Sender::timeout_t timeout)
std::chrono::milliseconds timeout_t
Base class for any user define issue.
std::vector< uint8_t > serialize(const T &obj, SerializationType stype)
Serialize object obj using serialization method stype.
void error(const Issue &issue)