DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
dunedaq::iomanager::NetworkReceiverModel< Datatype > Class Template Reference

#include <NetworkReceiverModel.hpp>

Inheritance diagram for dunedaq::iomanager::NetworkReceiverModel< Datatype >:
[legend]
Collaboration diagram for dunedaq::iomanager::NetworkReceiverModel< Datatype >:
[legend]

Public Member Functions

 NetworkReceiverModel (ConnectionId const &conn_id)
 
 ~NetworkReceiverModel ()
 
 NetworkReceiverModel (NetworkReceiverModel &&other)
 
Datatype receive (Receiver::timeout_t timeout) override
 
std::optional< Datatype > try_receive (Receiver::timeout_t timeout) override
 
void add_callback (std::function< void(Datatype &)> callback) override
 
void remove_callback () override
 
void subscribe (std::string topic) override
 
void unsubscribe (std::string topic) override
 
- Public Member Functions inherited from dunedaq::iomanager::ReceiverConcept< Datatype >
 ReceiverConcept (ConnectionId const &conn_id)
 
- Public Member Functions inherited from dunedaq::iomanager::Receiver
 Receiver (ConnectionId const &this_conn)
 
virtual ~Receiver ()=default
 
ConnectionId id () const
 
- Public Member Functions inherited from dunedaq::utilities::NamedObject
 NamedObject (const std::string &name)
 NamedObject Constructor.
 
 NamedObject (NamedObject const &)=delete
 NamedObject is not copy-constructible.
 
 NamedObject (NamedObject &&)=default
 NamedObject is move-constructible.
 
NamedObjectoperator= (NamedObject const &)=delete
 NamedObject is not copy-assignable.
 
NamedObjectoperator= (NamedObject &&)=default
 NamedObject is move-assignable.
 
virtual ~NamedObject ()=default
 Default virtual destructor.
 
const std::string & get_name () const final
 Get the name of this NamedObejct.
 
- Public Member Functions inherited from dunedaq::utilities::Named
 Named ()=default
 Named Constructor.
 
 Named (Named const &)=delete
 Named is not copy-constructible.
 
 Named (Named &&)=default
 Named is move-constructible.
 
Namedoperator= (Named const &)=delete
 Named is not copy-assignable.
 
Namedoperator= (Named &&)=default
 Named is move-assignable.
 
virtual ~Named ()=default
 Default virtual destructor.
 

Private Member Functions

void get_receiver (Receiver::timeout_t timeout)
 
template<typename MessageType >
std::enable_if< serialization::is_serializable< MessageType >::value, MessageType >::type read_network (Receiver::timeout_t const &timeout)
 
template<typename MessageType >
std::enable_if<!serialization::is_serializable< MessageType >::value, MessageType >::type read_network (Receiver::timeout_t const &)
 
template<typename MessageType >
std::enable_if< serialization::is_serializable< MessageType >::value, std::optional< MessageType > >::type try_read_network (Receiver::timeout_t const &timeout)
 
template<typename MessageType >
std::enable_if<!serialization::is_serializable< MessageType >::value, std::optional< MessageType > >::type try_read_network (Receiver::timeout_t const &)
 
template<typename MessageType >
std::enable_if< serialization::is_serializable< MessageType >::value, void >::type add_callback_impl (std::function< void(MessageType &)> callback)
 
template<typename MessageType >
std::enable_if<!serialization::is_serializable< MessageType >::value, void >::type add_callback_impl (std::function< void(MessageType &)>)
 

Private Attributes

std::atomic< boolm_with_callback { false }
 
std::function< void(Datatype &)> m_callback
 
std::unique_ptr< std::thread > m_event_loop_runner
 
std::shared_ptr< ipm::Receiverm_network_receiver_ptr { nullptr }
 
std::mutex m_callback_mutex
 
std::mutex m_receive_mutex
 

Additional Inherited Members

- Public Types inherited from dunedaq::iomanager::Receiver
using timeout_t = std::chrono::milliseconds
 
- Static Public Attributes inherited from dunedaq::iomanager::Receiver
static constexpr timeout_t s_block = timeout_t::max()
 
static constexpr timeout_t s_no_block = timeout_t::zero()
 
- Protected Attributes inherited from dunedaq::iomanager::Receiver
ConnectionId m_conn
 

Detailed Description

template<typename Datatype>
class dunedaq::iomanager::NetworkReceiverModel< Datatype >

Definition at line 23 of file NetworkReceiverModel.hpp.

Constructor & Destructor Documentation

◆ NetworkReceiverModel() [1/2]

template<typename Datatype >
dunedaq::iomanager::NetworkReceiverModel< Datatype >::NetworkReceiverModel ( ConnectionId const & conn_id)
inlineexplicit

Definition at line 21 of file NetworkReceiverModel.hxx.

23{
24 TLOG() << "NetworkReceiverModel created with DT! ID: " << conn_id.uid << " Addr: " << static_cast<void*>(this);
25 try {
26 get_receiver(std::chrono::milliseconds(1000));
27 } catch (ConnectionNotFound const& ex) {
28 TLOG() << "Initial connection attempt failed: " << ex;
29 }
30}
void get_receiver(Receiver::timeout_t timeout)
ReceiverConcept(ConnectionId const &conn_id)
Definition Receiver.hpp:50
#define TLOG(...)
Definition macro.hpp:22

◆ ~NetworkReceiverModel()

template<typename Datatype >
dunedaq::iomanager::NetworkReceiverModel< Datatype >::~NetworkReceiverModel ( )
inline

Definition at line 27 of file NetworkReceiverModel.hpp.

◆ NetworkReceiverModel() [2/2]

template<typename Datatype >
dunedaq::iomanager::NetworkReceiverModel< Datatype >::NetworkReceiverModel ( NetworkReceiverModel< Datatype > && other)
inline

Definition at line 33 of file NetworkReceiverModel.hxx.

34 : ReceiverConcept<Datatype>(other.m_conn.uid)
35 , m_with_callback(other.m_with_callback.load())
36 , m_callback(std::move(other.m_callback))
37 , m_event_loop_runner(std::move(other.m_event_loop_runner))
38 , m_network_receiver_ptr(std::move(other.m_network_receiver_ptr))
39{
40}
std::function< void(Datatype &)> m_callback
std::shared_ptr< ipm::Receiver > m_network_receiver_ptr
std::unique_ptr< std::thread > m_event_loop_runner

Member Function Documentation

◆ add_callback()

template<typename Datatype >
void dunedaq::iomanager::NetworkReceiverModel< Datatype >::add_callback ( std::function< void(Datatype &)> callback)
inlineoverridevirtual

Implements dunedaq::iomanager::ReceiverConcept< Datatype >.

Definition at line 37 of file NetworkReceiverModel.hpp.

37{ add_callback_impl<Datatype>(callback); }
std::enable_if< serialization::is_serializable< MessageType >::value, void >::type add_callback_impl(std::function< void(MessageType &)> callback)

◆ add_callback_impl() [1/2]

template<typename Datatype >
template<typename MessageType >
std::enable_if<!serialization::is_serializable< MessageType >::value, void >::type dunedaq::iomanager::NetworkReceiverModel< Datatype >::add_callback_impl ( std::function< void(MessageType &)> callback)
inlineprivate

Definition at line 168 of file NetworkReceiverModel.hxx.

169{
171 {
172 // This ensures that add_callback_impl and remove_callback are not processing concurrently
173 std::lock_guard<std::mutex> lk(m_callback_mutex);
174 }
175 TLOG() << "Registering callback.";
176 m_callback = callback;
177 m_with_callback = true;
178 // start event loop (thread that calls when receive happens). remove_callback() is called in the destructor, so this
179 // will never go out-of-scope while this is running
180 m_event_loop_runner = std::make_unique<std::thread>([&]() {
181 std::optional<Datatype> message;
182 while (m_with_callback.load() || message) {
183 try {
184 // 0 timeout when we are trying to stop
185 message = try_read_network<Datatype>(m_with_callback.load() ? std::chrono::milliseconds(20)
186 : std::chrono::milliseconds(0));
187 if (message) {
188 m_callback(*message);
189 }
190 } catch (const ers::Issue&) {
191 // Intentionally ignoring any ers::Issues that might have been raised
192 ;
193 }
194 }
195 });
196}
std::enable_if< serialization::is_serializable< MessageType >::value, std::optional< MessageType > >::type try_read_network(Receiver::timeout_t const &timeout)
Base class for any user define issue.
Definition Issue.hpp:69

◆ add_callback_impl() [2/2]

template<typename Datatype >
template<typename MessageType >
std::enable_if<!serialization::is_serializable< MessageType >::value, void >::type dunedaq::iomanager::NetworkReceiverModel< Datatype >::add_callback_impl ( std::function< void(MessageType &)> )
private

◆ get_receiver()

template<typename Datatype >
void dunedaq::iomanager::NetworkReceiverModel< Datatype >::get_receiver ( Receiver::timeout_t timeout)
inlineprivate

Definition at line 88 of file NetworkReceiverModel.hxx.

89{
90 // get network resources
91 auto start = std::chrono::steady_clock::now();
92 while (m_network_receiver_ptr == nullptr &&
93 std::chrono::duration_cast<Receiver::timeout_t>(std::chrono::steady_clock::now() - start) < timeout) {
94
95 try {
97 } catch (ConnectionNotFound const& ex) {
98 m_network_receiver_ptr = nullptr;
99 std::this_thread::sleep_for(std::chrono::milliseconds(10));
100 }
101 }
102}
static NetworkManager & get()
std::shared_ptr< ipm::Receiver > get_receiver(ConnectionId const &conn_id)

◆ read_network() [1/2]

template<typename Datatype >
template<typename MessageType >
std::enable_if<!serialization::is_serializable< MessageType >::value, MessageType >::type dunedaq::iomanager::NetworkReceiverModel< Datatype >::read_network ( Receiver::timeout_t const & )
private

◆ read_network() [2/2]

template<typename Datatype >
template<typename MessageType >
std::enable_if<!serialization::is_serializable< MessageType >::value, MessageType >::type dunedaq::iomanager::NetworkReceiverModel< Datatype >::read_network ( Receiver::timeout_t const & timeout)
inlineprivate

Definition at line 107 of file NetworkReceiverModel.hxx.

108{
109 std::lock_guard<std::mutex> lk(m_receive_mutex);
110 get_receiver(timeout);
111
112 if (m_network_receiver_ptr == nullptr) {
113 throw ConnectionInstanceNotFound(ERS_HERE, this->id().uid);
114 }
115
116 auto response = m_network_receiver_ptr->receive(timeout);
117 if (response.data.size() > 0) {
118 return serialization::deserialize<MessageType>(response.data);
119 }
120
121 throw TimeoutExpired(ERS_HERE, this->id().uid, "network receive", timeout.count());
122 return MessageType();
123}
#define ERS_HERE

◆ receive()

template<typename Datatype >
Datatype dunedaq::iomanager::NetworkReceiverModel< Datatype >::receive ( Receiver::timeout_t timeout)
inlineoverridevirtual

Implements dunedaq::iomanager::ReceiverConcept< Datatype >.

Definition at line 44 of file NetworkReceiverModel.hxx.

45{
46 try {
47 return read_network<Datatype>(timeout);
48 } catch (ipm::ReceiveTimeoutExpired& ex) {
49 throw TimeoutExpired(ERS_HERE, this->id().uid, "receive", timeout.count(), ex);
50 }
51}
std::enable_if< serialization::is_serializable< MessageType >::value, MessageType >::type read_network(Receiver::timeout_t const &timeout)

◆ remove_callback()

template<typename Datatype >
void dunedaq::iomanager::NetworkReceiverModel< Datatype >::remove_callback ( )
inlineoverridevirtual

Implements dunedaq::iomanager::ReceiverConcept< Datatype >.

Definition at line 55 of file NetworkReceiverModel.hxx.

56{
57 std::lock_guard<std::mutex> lk(m_callback_mutex);
58 m_with_callback = false;
59 if (m_event_loop_runner != nullptr && m_event_loop_runner->joinable()) {
60 m_event_loop_runner->join();
61 m_event_loop_runner.reset(nullptr);
62 } else if (m_event_loop_runner != nullptr) {
63 TLOG() << "Event loop can't be closed!";
64 }
65 // remove function.
66}

◆ subscribe()

template<typename Datatype >
void dunedaq::iomanager::NetworkReceiverModel< Datatype >::subscribe ( std::string topic)
inlineoverridevirtual

Implements dunedaq::iomanager::ReceiverConcept< Datatype >.

Definition at line 70 of file NetworkReceiverModel.hxx.

71{
72 if (NetworkManager::get().is_pubsub_connection(this->m_conn)) {
73 std::dynamic_pointer_cast<ipm::Subscriber>(m_network_receiver_ptr)->subscribe(topic);
74 }
75}

◆ try_read_network() [1/2]

template<typename Datatype >
template<typename MessageType >
std::enable_if<!serialization::is_serializable< MessageType >::value, std::optional< MessageType > >::type dunedaq::iomanager::NetworkReceiverModel< Datatype >::try_read_network ( Receiver::timeout_t const & )
private

◆ try_read_network() [2/2]

template<typename Datatype >
template<typename MessageType >
std::enable_if<!serialization::is_serializable< MessageType >::value, std::optional< MessageType > >::type dunedaq::iomanager::NetworkReceiverModel< Datatype >::try_read_network ( Receiver::timeout_t const & timeout)
inlineprivate

Definition at line 137 of file NetworkReceiverModel.hxx.

138{
139 std::lock_guard<std::mutex> lk(m_receive_mutex);
140 get_receiver(timeout);
141 if (m_network_receiver_ptr == nullptr) {
142 TLOG_DEBUG(5) << ConnectionInstanceNotFound(ERS_HERE, this->id().uid);
143 return std::nullopt;
144 }
145
146 ipm::Receiver::Response res;
147 res = m_network_receiver_ptr->receive(timeout, ipm::Receiver::s_any_size, true);
148
149 if (res.data.size() > 0) {
150 return std::make_optional<MessageType>(serialization::deserialize<MessageType>(res.data));
151 }
152
153 return std::nullopt;
154}
static constexpr message_size_t s_any_size
Definition Receiver.hpp:83
#define TLOG_DEBUG(lvl,...)
Definition Logging.hpp:112

◆ try_receive()

template<typename Datatype >
std::optional< Datatype > dunedaq::iomanager::NetworkReceiverModel< Datatype >::try_receive ( Receiver::timeout_t timeout)
inlineoverridevirtual

Implements dunedaq::iomanager::ReceiverConcept< Datatype >.

Definition at line 33 of file NetworkReceiverModel.hpp.

34 {
35 return try_read_network<Datatype>(timeout);
36 }

◆ unsubscribe()

template<typename Datatype >
void dunedaq::iomanager::NetworkReceiverModel< Datatype >::unsubscribe ( std::string topic)
inlineoverridevirtual

Implements dunedaq::iomanager::ReceiverConcept< Datatype >.

Definition at line 79 of file NetworkReceiverModel.hxx.

80{
81 if (NetworkManager::get().is_pubsub_connection(this->m_conn)) {
82 std::dynamic_pointer_cast<ipm::Subscriber>(m_network_receiver_ptr)->unsubscribe(topic);
83 }
84}

Member Data Documentation

◆ m_callback

template<typename Datatype >
std::function<void(Datatype&)> dunedaq::iomanager::NetworkReceiverModel< Datatype >::m_callback
private

Definition at line 72 of file NetworkReceiverModel.hpp.

◆ m_callback_mutex

template<typename Datatype >
std::mutex dunedaq::iomanager::NetworkReceiverModel< Datatype >::m_callback_mutex
private

Definition at line 75 of file NetworkReceiverModel.hpp.

◆ m_event_loop_runner

template<typename Datatype >
std::unique_ptr<std::thread> dunedaq::iomanager::NetworkReceiverModel< Datatype >::m_event_loop_runner
private

Definition at line 73 of file NetworkReceiverModel.hpp.

◆ m_network_receiver_ptr

template<typename Datatype >
std::shared_ptr<ipm::Receiver> dunedaq::iomanager::NetworkReceiverModel< Datatype >::m_network_receiver_ptr { nullptr }
private

Definition at line 74 of file NetworkReceiverModel.hpp.

74{ nullptr };

◆ m_receive_mutex

template<typename Datatype >
std::mutex dunedaq::iomanager::NetworkReceiverModel< Datatype >::m_receive_mutex
private

Definition at line 76 of file NetworkReceiverModel.hpp.

◆ m_with_callback

template<typename Datatype >
std::atomic<bool> dunedaq::iomanager::NetworkReceiverModel< Datatype >::m_with_callback { false }
private

Definition at line 71 of file NetworkReceiverModel.hpp.

71{ false };

The documentation for this class was generated from the following files: