DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
dunedaq::iomanager::NetworkSenderModel< Datatype > Class Template Reference

#include <NetworkSenderModel.hpp>

Inheritance diagram for dunedaq::iomanager::NetworkSenderModel< Datatype >:
[legend]
Collaboration diagram for dunedaq::iomanager::NetworkSenderModel< Datatype >:
[legend]

Public Member Functions

 NetworkSenderModel (ConnectionId const &conn_id)
 
 NetworkSenderModel (NetworkSenderModel &&other)
 
void send (Datatype &&data, Sender::timeout_t timeout) override
 
bool try_send (Datatype &&data, Sender::timeout_t timeout) override
 
void send_with_topic (Datatype &&data, Sender::timeout_t timeout, std::string topic) override
 
bool is_ready_for_sending (Sender::timeout_t timeout) override
 
- Public Member Functions inherited from dunedaq::iomanager::SenderConcept< Datatype >
 SenderConcept (ConnectionId const &conn_id)
 
- Public Member Functions inherited from dunedaq::iomanager::Sender
 Sender (ConnectionId const &this_conn)
 
virtual ~Sender ()=default
 
ConnectionId id () const
 
- Public Member Functions inherited from dunedaq::utilities::NamedObject
 NamedObject (const std::string &name)
 NamedObject Constructor.
 
 NamedObject (NamedObject const &)=delete
 NamedObject is not copy-constructible.
 
 NamedObject (NamedObject &&)=default
 NamedObject is move-constructible.
 
NamedObjectoperator= (NamedObject const &)=delete
 NamedObject is not copy-assignable.
 
NamedObjectoperator= (NamedObject &&)=default
 NamedObject is move-assignable.
 
virtual ~NamedObject ()=default
 Default virtual destructor.
 
const std::string & get_name () const final
 Get the name of this NamedObejct.
 
- Public Member Functions inherited from dunedaq::utilities::Named
 Named ()=default
 Named Constructor.
 
 Named (Named const &)=delete
 Named is not copy-constructible.
 
 Named (Named &&)=default
 Named is move-constructible.
 
Namedoperator= (Named const &)=delete
 Named is not copy-assignable.
 
Namedoperator= (Named &&)=default
 Named is move-assignable.
 
virtual ~Named ()=default
 Default virtual destructor.
 

Private Member Functions

void get_sender (Sender::timeout_t const &timeout)
 
template<typename MessageType >
std::enable_if< serialization::is_serializable< MessageType >::value, void >::type write_network (MessageType &message, Sender::timeout_t const &timeout)
 
template<typename MessageType >
std::enable_if<!serialization::is_serializable< MessageType >::value, void >::type write_network (MessageType &, Sender::timeout_t const &)
 
template<typename MessageType >
std::enable_if< serialization::is_serializable< MessageType >::value, bool >::type try_write_network (MessageType &message, Sender::timeout_t const &timeout)
 
template<typename MessageType >
std::enable_if<!serialization::is_serializable< MessageType >::value, bool >::type try_write_network (MessageType &, Sender::timeout_t const &)
 
template<typename MessageType >
std::enable_if< serialization::is_serializable< MessageType >::value, void >::type write_network_with_topic (MessageType &message, Sender::timeout_t const &timeout, std::string topic)
 
template<typename MessageType >
std::enable_if<!serialization::is_serializable< MessageType >::value, void >::type write_network_with_topic (MessageType &, Sender::timeout_t const &, std::string)
 
Sender::timeout_t extend_first_timeout (Sender::timeout_t timeout)
 

Private Attributes

std::shared_ptr< ipm::Senderm_network_sender_ptr
 
std::mutex m_send_mutex
 
std::string m_topic { "" }
 
std::atomic< boolm_first { true }
 

Additional Inherited Members

- Public Types inherited from dunedaq::iomanager::Sender
using timeout_t = std::chrono::milliseconds
 
- Static Public Attributes inherited from dunedaq::iomanager::Sender
static constexpr timeout_t s_block = timeout_t::max()
 
static constexpr timeout_t s_no_block = timeout_t::zero()
 
- Protected Attributes inherited from dunedaq::iomanager::Sender
ConnectionId m_conn
 

Detailed Description

template<typename Datatype>
class dunedaq::iomanager::NetworkSenderModel< Datatype >

Definition at line 25 of file NetworkSenderModel.hpp.

Constructor & Destructor Documentation

◆ NetworkSenderModel() [1/2]

template<typename Datatype >
dunedaq::iomanager::NetworkSenderModel< Datatype >::NetworkSenderModel ( ConnectionId const & conn_id)
inlineexplicit

Definition at line 19 of file NetworkSenderModel.hxx.

21{
22 TLOG() << "NetworkSenderModel created with DT! Addr: " << static_cast<void*>(this)
23 << ", uid=" << conn_id.uid << ", data_type=" << conn_id.data_type;
24 get_sender(std::chrono::milliseconds(1000));
25 if (m_network_sender_ptr == nullptr) {
26 TLOG() << "Initial connection attempt failed for uid=" << conn_id.uid
27 << ", data_type=" << conn_id.data_type;
28 }
29}
void get_sender(Sender::timeout_t const &timeout)
std::shared_ptr< ipm::Sender > m_network_sender_ptr
SenderConcept(ConnectionId const &conn_id)
Definition Sender.hpp:46
#define TLOG(...)
Definition macro.hpp:22

◆ NetworkSenderModel() [2/2]

template<typename Datatype >
dunedaq::iomanager::NetworkSenderModel< Datatype >::NetworkSenderModel ( NetworkSenderModel< Datatype > && other)
inline

Definition at line 32 of file NetworkSenderModel.hxx.

33 : SenderConcept<Datatype>(other.m_conn.uid)
34 , m_network_sender_ptr(std::move(other.m_network_sender_ptr))
35 , m_topic(std::move(other.m_topic))
36{
37}

Member Function Documentation

◆ extend_first_timeout()

template<typename Datatype >
Sender::timeout_t dunedaq::iomanager::NetworkSenderModel< Datatype >::extend_first_timeout ( Sender::timeout_t timeout)
inlineprivate

Definition at line 206 of file NetworkSenderModel.hxx.

207{
208 if (m_first) {
209 m_first = false;
210 if (timeout > 1000ms) {
211 return timeout;
212 }
213 return 1000ms;
214 }
215
216 return timeout;
217}

◆ get_sender()

template<typename Datatype >
void dunedaq::iomanager::NetworkSenderModel< Datatype >::get_sender ( Sender::timeout_t const & timeout)
inlineprivate

Definition at line 78 of file NetworkSenderModel.hxx.

79{
80 auto start = std::chrono::steady_clock::now();
81 while (m_network_sender_ptr == nullptr &&
82 std::chrono::duration_cast<Sender::timeout_t>(std::chrono::steady_clock::now() - start) <= timeout) {
83 // get network resources
84 try {
86
87 if (NetworkManager::get().is_pubsub_connection(this->id())) {
88 TLOG() << "Setting topic to " << this->id().data_type;
89 m_topic = this->id().data_type;
90 }
91 } catch (ers::Issue const& ex) {
92 m_network_sender_ptr = nullptr;
93 std::this_thread::sleep_for(std::chrono::milliseconds(1));
94 }
95 }
96}
static NetworkManager & get()
std::shared_ptr< ipm::Sender > get_sender(ConnectionId const &conn_id)
ConnectionId id() const
Definition Sender.hpp:35
Base class for any user define issue.
Definition Issue.hpp:69

◆ is_ready_for_sending()

template<typename Datatype >
bool dunedaq::iomanager::NetworkSenderModel< Datatype >::is_ready_for_sending ( Sender::timeout_t timeout)
inlineoverridevirtual

Implements dunedaq::iomanager::SenderConcept< Datatype >.

Definition at line 70 of file NetworkSenderModel.hxx.

71{
72 get_sender(timeout);
73 return (m_network_sender_ptr != nullptr);
74}

◆ send()

template<typename Datatype >
void dunedaq::iomanager::NetworkSenderModel< Datatype >::send ( Datatype && data,
Sender::timeout_t timeout )
inlineoverridevirtual

Implements dunedaq::iomanager::SenderConcept< Datatype >.

Definition at line 41 of file NetworkSenderModel.hxx.

42{
43 try {
44 write_network<Datatype>(data, timeout);
45 } catch (ipm::SendTimeoutExpired& ex) {
46 throw TimeoutExpired(ERS_HERE, this->id().uid, "send", timeout.count(), ex);
47 }
48}
#define ERS_HERE
std::enable_if< serialization::is_serializable< MessageType >::value, void >::type write_network(MessageType &message, Sender::timeout_t const &timeout)

◆ send_with_topic()

template<typename Datatype >
void dunedaq::iomanager::NetworkSenderModel< Datatype >::send_with_topic ( Datatype && data,
Sender::timeout_t timeout,
std::string topic )
inlineoverridevirtual

Implements dunedaq::iomanager::SenderConcept< Datatype >.

Definition at line 59 of file NetworkSenderModel.hxx.

60{
61 try {
62 write_network_with_topic<Datatype>(data, timeout, topic);
63 } catch (ipm::SendTimeoutExpired& ex) {
64 throw TimeoutExpired(ERS_HERE, this->id().uid, "send", timeout.count(), ex);
65 }
66}
std::enable_if< serialization::is_serializable< MessageType >::value, void >::type write_network_with_topic(MessageType &message, Sender::timeout_t const &timeout, std::string topic)

◆ try_send()

template<typename Datatype >
bool dunedaq::iomanager::NetworkSenderModel< Datatype >::try_send ( Datatype && data,
Sender::timeout_t timeout )
inlineoverridevirtual

Implements dunedaq::iomanager::SenderConcept< Datatype >.

Definition at line 52 of file NetworkSenderModel.hxx.

53{
54 return try_write_network<Datatype>(data, timeout);
55}
std::enable_if< serialization::is_serializable< MessageType >::value, bool >::type try_write_network(MessageType &message, Sender::timeout_t const &timeout)

◆ try_write_network() [1/2]

template<typename Datatype >
template<typename MessageType >
std::enable_if<!serialization::is_serializable< MessageType >::value, bool >::type dunedaq::iomanager::NetworkSenderModel< Datatype >::try_write_network ( MessageType & ,
Sender::timeout_t const &  )
private

◆ try_write_network() [2/2]

template<typename Datatype >
template<typename MessageType >
std::enable_if<!dunedaq::serialization::is_serializable< MessageType >::value, bool >::type dunedaq::iomanager::NetworkSenderModel< Datatype >::try_write_network ( MessageType & message,
Sender::timeout_t const & timeout )
inlineprivate

Definition at line 136 of file NetworkSenderModel.hxx.

137{
138 std::lock_guard<std::mutex> lk(m_send_mutex);
139 get_sender(timeout);
140 if (m_network_sender_ptr == nullptr) {
141 TLOG_DEBUG(5) << ConnectionInstanceNotFound(ERS_HERE, this->id().uid);
142 return false;
143 }
144
145 auto serialized = dunedaq::serialization::serialize(message);
146 // TLOG() << "Serialized message for network sending: " << serialized.size() << ", topic=" <<
147 // m_topic <<
148 // ", this=" << (void*)this;
149
150 auto res =
151 m_network_sender_ptr->send(serialized.data(), serialized.size(), extend_first_timeout(timeout), m_topic, true);
152 if (!res) {
153 TLOG() << "Timeout detected, removing sender to re-acquire connection";
155 m_network_sender_ptr = nullptr;
156 }
157 return res;
158}
void remove_sender(ConnectionId const &conn_id)
Sender::timeout_t extend_first_timeout(Sender::timeout_t timeout)
#define TLOG_DEBUG(lvl,...)
Definition Logging.hpp:112

◆ write_network() [1/2]

template<typename Datatype >
template<typename MessageType >
std::enable_if<!serialization::is_serializable< MessageType >::value, void >::type dunedaq::iomanager::NetworkSenderModel< Datatype >::write_network ( MessageType & ,
Sender::timeout_t const &  )
private

◆ write_network() [2/2]

template<typename Datatype >
template<typename MessageType >
std::enable_if<!dunedaq::serialization::is_serializable< MessageType >::value, void >::type dunedaq::iomanager::NetworkSenderModel< Datatype >::write_network ( MessageType & message,
Sender::timeout_t const & timeout )
inlineprivate

Definition at line 101 of file NetworkSenderModel.hxx.

102{
103 std::lock_guard<std::mutex> lk(m_send_mutex);
104 get_sender(timeout);
105 if (m_network_sender_ptr == nullptr) {
106 throw TimeoutExpired(
107 ERS_HERE, this->id().uid, "send", timeout.count(), ConnectionInstanceNotFound(ERS_HERE, this->id().uid));
108 }
109
110 auto serialized = dunedaq::serialization::serialize(message);
111 // TLOG() << "Serialized message for network sending: " << serialized.size() << ", topic=" <<
112 // m_topic << ", this="
113 // << (void*)this;
114
115 try {
116 m_network_sender_ptr->send(serialized.data(), serialized.size(), extend_first_timeout(timeout), m_topic);
117 } catch (ipm::SendTimeoutExpired const& ex) {
118 TLOG() << "Timeout detected, removing sender to re-acquire connection";
120 m_network_sender_ptr = nullptr;
121 throw;
122 }
123}

◆ write_network_with_topic() [1/2]

template<typename Datatype >
template<typename MessageType >
std::enable_if<!serialization::is_serializable< MessageType >::value, void >::type dunedaq::iomanager::NetworkSenderModel< Datatype >::write_network_with_topic ( MessageType & ,
Sender::timeout_t const & ,
std::string  )
private

◆ write_network_with_topic() [2/2]

template<typename Datatype >
template<typename MessageType >
std::enable_if<!dunedaq::serialization::is_serializable< MessageType >::value, void >::type dunedaq::iomanager::NetworkSenderModel< Datatype >::write_network_with_topic ( MessageType & message,
Sender::timeout_t const & timeout,
std::string topic )
inlineprivate

Definition at line 172 of file NetworkSenderModel.hxx.

175{
176 std::lock_guard<std::mutex> lk(m_send_mutex);
177 get_sender(timeout);
178 if (m_network_sender_ptr == nullptr) {
179 throw TimeoutExpired(
180 ERS_HERE, this->id().uid, "send", timeout.count(), ConnectionInstanceNotFound(ERS_HERE, this->id().uid));
181 }
182
183 auto serialized = dunedaq::serialization::serialize(message);
184 // TLOG() << "Serialized message for network sending: " << serialized.size() << ", topic=" <<
185 // m_topic << ", this="
186 // << (void*)this;
187
188 try {
189 m_network_sender_ptr->send(serialized.data(), serialized.size(), timeout, topic);
190 } catch (TimeoutExpired const& ex) {
191 m_network_sender_ptr = nullptr;
192 throw;
193 }
194}

Member Data Documentation

◆ m_first

template<typename Datatype >
std::atomic<bool> dunedaq::iomanager::NetworkSenderModel< Datatype >::m_first { true }
private

Definition at line 78 of file NetworkSenderModel.hpp.

78{ true };

◆ m_network_sender_ptr

template<typename Datatype >
std::shared_ptr<ipm::Sender> dunedaq::iomanager::NetworkSenderModel< Datatype >::m_network_sender_ptr
private

Definition at line 75 of file NetworkSenderModel.hpp.

◆ m_send_mutex

template<typename Datatype >
std::mutex dunedaq::iomanager::NetworkSenderModel< Datatype >::m_send_mutex
private

Definition at line 76 of file NetworkSenderModel.hpp.

◆ m_topic

template<typename Datatype >
std::string dunedaq::iomanager::NetworkSenderModel< Datatype >::m_topic { "" }
private

Definition at line 77 of file NetworkSenderModel.hpp.

77{ "" };

The documentation for this class was generated from the following files: