DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
NetworkReceiverModel.hxx
Go to the documentation of this file.
4
5#include "ipm/Subscriber.hpp"
6#include "logging/Logging.hpp"
9
10#include <atomic>
11#include <memory>
12#include <optional>
13#include <string>
14#include <thread>
15#include <typeinfo>
16#include <utility>
17
18namespace dunedaq::iomanager {
19
20template<typename Datatype>
22 : ReceiverConcept<Datatype>(conn_id)
23{
24 TLOG() << "NetworkReceiverModel created with DT! ID: " << conn_id.uid << " Addr: " << static_cast<void*>(this);
25 try {
26 get_receiver(std::chrono::milliseconds(1000));
27 } catch (ConnectionNotFound const& ex) {
28 TLOG() << "Initial connection attempt failed: " << ex;
29 }
30}
31
32template<typename Datatype>
34 : ReceiverConcept<Datatype>(other.m_conn.uid)
35 , m_with_callback(other.m_with_callback.load())
36 , m_callback(std::move(other.m_callback))
37 , m_event_loop_runner(std::move(other.m_event_loop_runner))
38 , m_network_receiver_ptr(std::move(other.m_network_receiver_ptr))
39{
40}
41
42template<typename Datatype>
43inline Datatype
45{
46 try {
47 return read_network<Datatype>(timeout);
48 } catch (ipm::ReceiveTimeoutExpired& ex) {
49 throw TimeoutExpired(ERS_HERE, this->id().uid, "receive", timeout.count(), ex);
50 }
51}
52
53template<typename Datatype>
54inline void
56{
57 std::lock_guard<std::mutex> lk(m_callback_mutex);
58 m_with_callback = false;
59 if (m_event_loop_runner != nullptr && m_event_loop_runner->joinable()) {
60 m_event_loop_runner->join();
61 m_event_loop_runner.reset(nullptr);
62 } else if (m_event_loop_runner != nullptr) {
63 TLOG() << "Event loop can't be closed!";
64 }
65 // remove function.
66}
67
68template<typename Datatype>
69inline void
71{
72 if (NetworkManager::get().is_pubsub_connection(this->m_conn)) {
73 std::dynamic_pointer_cast<ipm::Subscriber>(m_network_receiver_ptr)->subscribe(topic);
74 }
75}
76
77template<typename Datatype>
78inline void
80{
81 if (NetworkManager::get().is_pubsub_connection(this->m_conn)) {
82 std::dynamic_pointer_cast<ipm::Subscriber>(m_network_receiver_ptr)->unsubscribe(topic);
83 }
84}
85
86template<typename Datatype>
87inline void
89{
90 // get network resources
91 auto start = std::chrono::steady_clock::now();
92 while (m_network_receiver_ptr == nullptr &&
93 std::chrono::duration_cast<Receiver::timeout_t>(std::chrono::steady_clock::now() - start) < timeout) {
94
95 try {
96 m_network_receiver_ptr = NetworkManager::get().get_receiver(this->id());
97 } catch (ConnectionNotFound const& ex) {
98 m_network_receiver_ptr = nullptr;
99 std::this_thread::sleep_for(std::chrono::milliseconds(10));
100 }
101 }
102}
103
104template<typename Datatype>
105template<typename MessageType>
106inline typename std::enable_if<serialization::is_serializable<MessageType>::value, MessageType>::type
108{
109 std::lock_guard<std::mutex> lk(m_receive_mutex);
110 get_receiver(timeout);
111
112 if (m_network_receiver_ptr == nullptr) {
113 throw ConnectionInstanceNotFound(ERS_HERE, this->id().uid);
114 }
115
116 auto response = m_network_receiver_ptr->receive(timeout);
117 if (response.data.size() > 0) {
118 return serialization::deserialize<MessageType>(response.data);
119 }
120
121 throw TimeoutExpired(ERS_HERE, this->id().uid, "network receive", timeout.count());
122 return MessageType();
123}
124
125template<typename Datatype>
126template<typename MessageType>
127inline typename std::enable_if<!serialization::is_serializable<MessageType>::value, MessageType>::type
129{
130 throw NetworkMessageNotSerializable(ERS_HERE, typeid(MessageType).name()); // NOLINT(runtime/rtti)
131 return MessageType();
132}
133
134template<typename Datatype>
135template<typename MessageType>
136inline typename std::enable_if<serialization::is_serializable<MessageType>::value, std::optional<MessageType>>::type
138{
139 std::lock_guard<std::mutex> lk(m_receive_mutex);
140 get_receiver(timeout);
141 if (m_network_receiver_ptr == nullptr) {
142 TLOG_DEBUG(5) << ConnectionInstanceNotFound(ERS_HERE, this->id().uid);
143 return std::nullopt;
144 }
145
147 res = m_network_receiver_ptr->receive(timeout, ipm::Receiver::s_any_size, true);
148
149 if (res.data.size() > 0) {
150 return std::make_optional<MessageType>(serialization::deserialize<MessageType>(res.data));
151 }
152
153 return std::nullopt;
154}
155
156template<typename Datatype>
157template<typename MessageType>
158inline typename std::enable_if<!serialization::is_serializable<MessageType>::value, std::optional<MessageType>>::type
160{
161 ers::error(NetworkMessageNotSerializable(ERS_HERE, typeid(MessageType).name())); // NOLINT(runtime/rtti)
162 return std::nullopt;
163}
164
165template<typename Datatype>
166template<typename MessageType>
167inline typename std::enable_if<serialization::is_serializable<MessageType>::value, void>::type
168NetworkReceiverModel<Datatype>::add_callback_impl(std::function<void(MessageType&)> callback)
169{
170 remove_callback();
171 {
172 // This ensures that add_callback_impl and remove_callback are not processing concurrently
173 std::lock_guard<std::mutex> lk(m_callback_mutex);
174 }
175 TLOG() << "Registering callback.";
176 m_callback = callback;
177 m_with_callback = true;
178 // start event loop (thread that calls when receive happens). remove_callback() is called in the destructor, so this
179 // will never go out-of-scope while this is running
180 m_event_loop_runner = std::make_unique<std::thread>([&]() {
181 std::optional<Datatype> message;
182 while (m_with_callback.load() || message) {
183 try {
184 // 0 timeout when we are trying to stop
185 message = try_read_network<Datatype>(m_with_callback.load() ? std::chrono::milliseconds(20)
186 : std::chrono::milliseconds(0));
187 if (message) {
188 m_callback(*message);
189 }
190 } catch (const ers::Issue&) {
191 // Intentionally ignoring any ers::Issues that might have been raised
192 ;
193 }
194 }
195 });
196}
197
198template<typename Datatype>
199template<typename MessageType>
200inline typename std::enable_if<!serialization::is_serializable<MessageType>::value, void>::type
201NetworkReceiverModel<Datatype>::add_callback_impl(std::function<void(MessageType&)>)
202{
203 throw NetworkMessageNotSerializable(ERS_HERE, typeid(MessageType).name()); // NOLINT(runtime/rtti)
204}
205
206} // namespace dunedaq::iomanager
#define ERS_HERE
static NetworkManager & get()
std::shared_ptr< ipm::Receiver > get_receiver(ConnectionId const &conn_id)
void get_receiver(Receiver::timeout_t timeout)
void subscribe(std::string topic) override
std::enable_if< serialization::is_serializable< MessageType >::value, void >::type add_callback_impl(std::function< void(MessageType &)> callback)
std::enable_if< serialization::is_serializable< MessageType >::value, MessageType >::type read_network(Receiver::timeout_t const &timeout)
NetworkReceiverModel(ConnectionId const &conn_id)
void unsubscribe(std::string topic) override
std::enable_if< serialization::is_serializable< MessageType >::value, std::optional< MessageType > >::type try_read_network(Receiver::timeout_t const &timeout)
Datatype receive(Receiver::timeout_t timeout) override
std::chrono::milliseconds timeout_t
Definition Receiver.hpp:27
static constexpr message_size_t s_any_size
Definition Receiver.hpp:83
Base class for any user define issue.
Definition Issue.hpp:69
#define TLOG_DEBUG(lvl,...)
Definition Logging.hpp:112
#define TLOG(...)
Definition macro.hpp:22
void error(const Issue &issue)
Definition ers.hpp:81