DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
dunedaq::iomanager::NetworkReceiverModel< Datatype > Class Template Reference

#include <NetworkReceiverModel.hpp>

Inheritance diagram for dunedaq::iomanager::NetworkReceiverModel< Datatype >:
[legend]
Collaboration diagram for dunedaq::iomanager::NetworkReceiverModel< Datatype >:
[legend]

Public Member Functions

 NetworkReceiverModel (ConnectionId const &conn_id)
 ~NetworkReceiverModel ()
 NetworkReceiverModel (NetworkReceiverModel &&other)
Datatype receive (Receiver::timeout_t timeout) override
std::optional< Datatype > try_receive (Receiver::timeout_t timeout) override
bool data_pending () override
void add_callback (std::function< void(Datatype &)> callback) override
void remove_callback () override
void subscribe (std::string topic) override
void unsubscribe (std::string topic) override
Public Member Functions inherited from dunedaq::iomanager::ReceiverConcept< Datatype >
 ReceiverConcept (ConnectionId const &conn_id)
Public Member Functions inherited from dunedaq::iomanager::Receiver
 Receiver (ConnectionId const &this_conn)
virtual ~Receiver ()=default
ConnectionId id () const
Public Member Functions inherited from dunedaq::utilities::NamedObject
 NamedObject (const std::string &name)
 NamedObject Constructor.
 NamedObject (NamedObject const &)=delete
 NamedObject is not copy-constructible.
 NamedObject (NamedObject &&)=default
 NamedObject is move-constructible.
NamedObjectoperator= (NamedObject const &)=delete
 NamedObject is not copy-assignable.
NamedObjectoperator= (NamedObject &&)=default
 NamedObject is move-assignable.
virtual ~NamedObject ()=default
 Default virtual destructor.
const std::string & get_name () const final
 Get the name of this NamedObejct.
Public Member Functions inherited from dunedaq::utilities::Named
 Named ()=default
 Named Constructor.
 Named (Named const &)=delete
 Named is not copy-constructible.
 Named (Named &&)=default
 Named is move-constructible.
Namedoperator= (Named const &)=delete
 Named is not copy-assignable.
Namedoperator= (Named &&)=default
 Named is move-assignable.
virtual ~Named ()=default
 Default virtual destructor.

Private Member Functions

void get_receiver (Receiver::timeout_t timeout)
template<typename MessageType>
std::enable_if< serialization::is_serializable< MessageType >::value, MessageType >::type read_network (Receiver::timeout_t const &timeout)
template<typename MessageType>
std::enable_if<!serialization::is_serializable< MessageType >::value, MessageType >::type read_network (Receiver::timeout_t const &)
template<typename MessageType>
std::enable_if< serialization::is_serializable< MessageType >::value, std::optional< MessageType > >::type try_read_network (Receiver::timeout_t const &timeout)
template<typename MessageType>
std::enable_if<!serialization::is_serializable< MessageType >::value, std::optional< MessageType > >::type try_read_network (Receiver::timeout_t const &)
template<typename MessageType>
std::enable_if< serialization::is_serializable< MessageType >::value, void >::type add_callback_impl (std::function< void(MessageType &)> callback)
template<typename MessageType>
std::enable_if<!serialization::is_serializable< MessageType >::value, void >::type add_callback_impl (std::function< void(MessageType &)>)

Private Attributes

std::function< void(Datatype &)> m_callback
std::unique_ptr< std::jthread > m_event_loop_runner
std::shared_ptr< ipm::Receiverm_network_receiver_ptr { nullptr }
std::mutex m_callback_mutex
std::mutex m_receive_mutex

Additional Inherited Members

Public Types inherited from dunedaq::iomanager::Receiver
using timeout_t = std::chrono::milliseconds
Static Public Attributes inherited from dunedaq::iomanager::Receiver
static constexpr timeout_t s_block = timeout_t::max()
static constexpr timeout_t s_no_block = timeout_t::zero()
Protected Attributes inherited from dunedaq::iomanager::Receiver
ConnectionId m_conn

Detailed Description

template<typename Datatype>
class dunedaq::iomanager::NetworkReceiverModel< Datatype >

Definition at line 23 of file NetworkReceiverModel.hpp.

Constructor & Destructor Documentation

◆ NetworkReceiverModel() [1/2]

template<typename Datatype>
dunedaq::iomanager::NetworkReceiverModel< Datatype >::NetworkReceiverModel ( ConnectionId const & conn_id)
inlineexplicit

Definition at line 21 of file NetworkReceiverModel.hxx.

23{
24 TLOG() << "NetworkReceiverModel created with DT! ID: " << conn_id.uid << " Addr: " << static_cast<void*>(this);
25 try {
27 } catch (ConnectionNotFound const& ex) {
28 TLOG() << "Initial connection attempt failed: " << ex;
29 }
30}
void get_receiver(Receiver::timeout_t timeout)
ReceiverConcept(ConnectionId const &conn_id)
Definition Receiver.hpp:50
#define TLOG(...)
Definition macro.hpp:22

◆ ~NetworkReceiverModel()

template<typename Datatype>
dunedaq::iomanager::NetworkReceiverModel< Datatype >::~NetworkReceiverModel ( )
inline

Definition at line 27 of file NetworkReceiverModel.hpp.

◆ NetworkReceiverModel() [2/2]

template<typename Datatype>
dunedaq::iomanager::NetworkReceiverModel< Datatype >::NetworkReceiverModel ( NetworkReceiverModel< Datatype > && other)
inline

Member Function Documentation

◆ add_callback()

template<typename Datatype>
void dunedaq::iomanager::NetworkReceiverModel< Datatype >::add_callback ( std::function< void(Datatype &)> callback)
inlineoverridevirtual

Implements dunedaq::iomanager::ReceiverConcept< Datatype >.

Definition at line 40 of file NetworkReceiverModel.hpp.

std::enable_if< serialization::is_serializable< MessageType >::value, void >::type add_callback_impl(std::function< void(MessageType &)> callback)

◆ add_callback_impl() [1/2]

template<typename Datatype>
template<typename MessageType>
std::enable_if<!serialization::is_serializable< MessageType >::value, void >::type dunedaq::iomanager::NetworkReceiverModel< Datatype >::add_callback_impl ( std::function< void(MessageType &)> callback)
inlineprivate

Definition at line 167 of file NetworkReceiverModel.hxx.

168{
170 {
171 // This ensures that add_callback_impl and remove_callback are not processing concurrently
173 }
174 TLOG() << "Registering callback.";
176 // start event loop (thread that calls when receive happens). remove_callback() is called in the destructor, so this
177 // will never go out-of-scope while this is running
180 while (!token.stop_requested() || message) {
181 try {
182 // 0 timeout when we are trying to stop
185 if (message) {
187 }
188 } catch (const ers::Issue&) {
189 // Intentionally ignoring any ers::Issues that might have been raised
190 ;
191 }
192 }
193 });
194 auto handle = m_event_loop_runner->native_handle();
195 std::string name = "N_" + this->id().uid;
196 name.resize(15);
197 auto rc = pthread_setname_np(handle, name.c_str());
198 if (rc != 0) {
200 s << "The name " << name << " provided for the thread is too long.";
202 }
203}
std::enable_if< serialization::is_serializable< MessageType >::value, std::optional< MessageType > >::type try_read_network(Receiver::timeout_t const &timeout)
ConnectionId id() const
Definition Receiver.hpp:39
void warning(const Issue &issue)
Definition ers.hpp:115

◆ add_callback_impl() [2/2]

template<typename Datatype>
template<typename MessageType>
std::enable_if<!serialization::is_serializable< MessageType >::value, void >::type dunedaq::iomanager::NetworkReceiverModel< Datatype >::add_callback_impl ( std::function< void(MessageType &)> )
private

◆ data_pending()

template<typename Datatype>
bool dunedaq::iomanager::NetworkReceiverModel< Datatype >::data_pending ( )
inlineoverridevirtual

Implements dunedaq::iomanager::ReceiverConcept< Datatype >.

Definition at line 38 of file NetworkReceiverModel.hpp.

38{ return m_network_receiver_ptr && m_network_receiver_ptr->data_pending(); }

◆ get_receiver()

template<typename Datatype>
void dunedaq::iomanager::NetworkReceiverModel< Datatype >::get_receiver ( Receiver::timeout_t timeout)
inlineprivate

Definition at line 87 of file NetworkReceiverModel.hxx.

88{
89 // get network resources
91 while (m_network_receiver_ptr == nullptr &&
93
94 try {
96 } catch (ConnectionNotFound const& ex) {
97 m_network_receiver_ptr = nullptr;
99 }
100 }
101}
static NetworkManager & get()
std::shared_ptr< ipm::Receiver > get_receiver(ConnectionId const &conn_id)

◆ read_network() [1/2]

template<typename Datatype>
template<typename MessageType>
std::enable_if<!serialization::is_serializable< MessageType >::value, MessageType >::type dunedaq::iomanager::NetworkReceiverModel< Datatype >::read_network ( Receiver::timeout_t const & )
private

◆ read_network() [2/2]

template<typename Datatype>
template<typename MessageType>
std::enable_if<!serialization::is_serializable< MessageType >::value, MessageType >::type dunedaq::iomanager::NetworkReceiverModel< Datatype >::read_network ( Receiver::timeout_t const & timeout)
inlineprivate

Definition at line 106 of file NetworkReceiverModel.hxx.

107{
110
111 if (m_network_receiver_ptr == nullptr) {
112 throw ConnectionInstanceNotFound(ERS_HERE, this->id().uid);
113 }
114
115 auto response = m_network_receiver_ptr->receive(timeout);
116 if (response.data.size() > 0) {
118 }
119
120 throw TimeoutExpired(ERS_HERE, this->id().uid, "network receive", timeout.count());
121 return MessageType();
122}

◆ receive()

template<typename Datatype>
Datatype dunedaq::iomanager::NetworkReceiverModel< Datatype >::receive ( Receiver::timeout_t timeout)
inlineoverridevirtual

Implements dunedaq::iomanager::ReceiverConcept< Datatype >.

Definition at line 43 of file NetworkReceiverModel.hxx.

44{
45 try {
48 throw TimeoutExpired(ERS_HERE, this->id().uid, "receive", timeout.count(), ex);
49 }
50}
std::enable_if< serialization::is_serializable< MessageType >::value, MessageType >::type read_network(Receiver::timeout_t const &timeout)

◆ remove_callback()

template<typename Datatype>
void dunedaq::iomanager::NetworkReceiverModel< Datatype >::remove_callback ( )
inlineoverridevirtual

Implements dunedaq::iomanager::ReceiverConcept< Datatype >.

Definition at line 54 of file NetworkReceiverModel.hxx.

55{
57 if (m_event_loop_runner != nullptr && m_event_loop_runner->joinable()) {
58 m_event_loop_runner->request_stop();
59 m_event_loop_runner->join();
60 } else if (m_event_loop_runner != nullptr) {
61 TLOG() << "Event loop can't be closed!";
62 }
63 m_event_loop_runner.reset(nullptr);
64 // remove function.
65}

◆ subscribe()

template<typename Datatype>
void dunedaq::iomanager::NetworkReceiverModel< Datatype >::subscribe ( std::string topic)
inlineoverridevirtual

◆ try_read_network() [1/2]

template<typename Datatype>
template<typename MessageType>
std::enable_if<!serialization::is_serializable< MessageType >::value, std::optional< MessageType > >::type dunedaq::iomanager::NetworkReceiverModel< Datatype >::try_read_network ( Receiver::timeout_t const & )
private

◆ try_read_network() [2/2]

template<typename Datatype>
template<typename MessageType>
std::enable_if<!serialization::is_serializable< MessageType >::value, std::optional< MessageType > >::type dunedaq::iomanager::NetworkReceiverModel< Datatype >::try_read_network ( Receiver::timeout_t const & timeout)
inlineprivate

Definition at line 136 of file NetworkReceiverModel.hxx.

137{
140 if (m_network_receiver_ptr == nullptr) {
142 return std::nullopt;
143 }
144
147
148 if (res.data.size() > 0) {
150 }
151
152 return std::nullopt;
153}
#define TLOG_DEBUG(lvl,...)
Definition Logging.hpp:112

◆ try_receive()

template<typename Datatype>
std::optional< Datatype > dunedaq::iomanager::NetworkReceiverModel< Datatype >::try_receive ( Receiver::timeout_t timeout)
inlineoverridevirtual

◆ unsubscribe()

template<typename Datatype>
void dunedaq::iomanager::NetworkReceiverModel< Datatype >::unsubscribe ( std::string topic)
inlineoverridevirtual

Member Data Documentation

◆ m_callback

template<typename Datatype>
std::function<void(Datatype&)> dunedaq::iomanager::NetworkReceiverModel< Datatype >::m_callback
private

Definition at line 74 of file NetworkReceiverModel.hpp.

◆ m_callback_mutex

template<typename Datatype>
std::mutex dunedaq::iomanager::NetworkReceiverModel< Datatype >::m_callback_mutex
private

Definition at line 77 of file NetworkReceiverModel.hpp.

◆ m_event_loop_runner

template<typename Datatype>
std::unique_ptr<std::jthread> dunedaq::iomanager::NetworkReceiverModel< Datatype >::m_event_loop_runner
private

Definition at line 75 of file NetworkReceiverModel.hpp.

◆ m_network_receiver_ptr

template<typename Datatype>
std::shared_ptr<ipm::Receiver> dunedaq::iomanager::NetworkReceiverModel< Datatype >::m_network_receiver_ptr { nullptr }
private

Definition at line 76 of file NetworkReceiverModel.hpp.

76{ nullptr };

◆ m_receive_mutex

template<typename Datatype>
std::mutex dunedaq::iomanager::NetworkReceiverModel< Datatype >::m_receive_mutex
private

Definition at line 78 of file NetworkReceiverModel.hpp.


The documentation for this class was generated from the following files: