DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
NetworkReceiverModel.hxx
Go to the documentation of this file.
4
5#include "ipm/Subscriber.hpp"
6#include "logging/Logging.hpp"
9
10#include <atomic>
11#include <memory>
12#include <optional>
13#include <string>
14#include <thread>
15#include <typeinfo>
16#include <utility>
17
18namespace dunedaq {
19
20namespace iomanager {
21
22template<typename Datatype>
24 : ReceiverConcept<Datatype>(conn_id)
25{
26 TLOG() << "NetworkReceiverModel created with DT! ID: " << conn_id.uid << " Addr: " << static_cast<void*>(this);
27 try {
28 get_receiver(std::chrono::milliseconds(1000));
29 } catch (ConnectionNotFound const& ex) {
30 TLOG() << "Initial connection attempt failed: " << ex;
31 }
32}
33
34template<typename Datatype>
36 : ReceiverConcept<Datatype>(other.m_conn.uid)
37 , m_with_callback(other.m_with_callback.load())
38 , m_callback(std::move(other.m_callback))
39 , m_event_loop_runner(std::move(other.m_event_loop_runner))
40 , m_network_receiver_ptr(std::move(other.m_network_receiver_ptr))
41{
42}
43
44template<typename Datatype>
45inline Datatype
47{
48 try {
49 return read_network<Datatype>(timeout);
50 } catch (ipm::ReceiveTimeoutExpired& ex) {
51 throw TimeoutExpired(ERS_HERE, this->id().uid, "receive", timeout.count(), ex);
52 }
53}
54
55template<typename Datatype>
56inline void
58{
59 std::lock_guard<std::mutex> lk(m_callback_mutex);
60 m_with_callback = false;
61 if (m_event_loop_runner != nullptr && m_event_loop_runner->joinable()) {
62 m_event_loop_runner->join();
63 m_event_loop_runner.reset(nullptr);
64 } else if (m_event_loop_runner != nullptr) {
65 TLOG() << "Event loop can't be closed!";
66 }
67 // remove function.
68}
69
70template<typename Datatype>
71inline void
73{
74 if (NetworkManager::get().is_pubsub_connection(this->m_conn)) {
75 std::dynamic_pointer_cast<ipm::Subscriber>(m_network_receiver_ptr)->subscribe(topic);
76 }
77}
78
79template<typename Datatype>
80inline void
82{
83 if (NetworkManager::get().is_pubsub_connection(this->m_conn)) {
84 std::dynamic_pointer_cast<ipm::Subscriber>(m_network_receiver_ptr)->unsubscribe(topic);
85 }
86}
87
88template<typename Datatype>
89inline void
91{
92 // get network resources
93 auto start = std::chrono::steady_clock::now();
94 while (m_network_receiver_ptr == nullptr &&
95 std::chrono::duration_cast<Receiver::timeout_t>(std::chrono::steady_clock::now() - start) < timeout) {
96
97 try {
98 m_network_receiver_ptr = NetworkManager::get().get_receiver(this->id());
99 } catch (ConnectionNotFound const& ex) {
100 m_network_receiver_ptr = nullptr;
101 std::this_thread::sleep_for(std::chrono::milliseconds(10));
102 }
103 }
104}
105
106template<typename Datatype>
107template<typename MessageType>
108inline typename std::enable_if<dunedaq::serialization::is_serializable<MessageType>::value, MessageType>::type
110{
111 std::lock_guard<std::mutex> lk(m_receive_mutex);
112 get_receiver(timeout);
113
114 if (m_network_receiver_ptr == nullptr) {
115 throw ConnectionInstanceNotFound(ERS_HERE, this->id().uid);
116 }
117
118 auto response = m_network_receiver_ptr->receive(timeout);
119 if (response.data.size() > 0) {
121 }
122
123 throw TimeoutExpired(ERS_HERE, this->id().uid, "network receive", timeout.count());
124 return MessageType();
125}
126
127template<typename Datatype>
128template<typename MessageType>
129inline typename std::enable_if<!dunedaq::serialization::is_serializable<MessageType>::value, MessageType>::type
131{
132 throw NetworkMessageNotSerializable(ERS_HERE, typeid(MessageType).name()); // NOLINT(runtime/rtti)
133 return MessageType();
134}
135
136template<typename Datatype>
137template<typename MessageType>
138inline
139 typename std::enable_if<dunedaq::serialization::is_serializable<MessageType>::value, std::optional<MessageType>>::type
141{
142 std::lock_guard<std::mutex> lk(m_receive_mutex);
143 get_receiver(timeout);
144 if (m_network_receiver_ptr == nullptr) {
145 TLOG() << ConnectionInstanceNotFound(ERS_HERE, this->id().uid);
146 return std::nullopt;
147 }
148
150 res = m_network_receiver_ptr->receive(timeout, ipm::Receiver::s_any_size, true);
151
152 if (res.data.size() > 0) {
153 return std::make_optional<MessageType>(dunedaq::serialization::deserialize<MessageType>(res.data));
154 }
155
156 return std::nullopt;
157}
158
159template<typename Datatype>
160template<typename MessageType>
161inline typename std::enable_if<!dunedaq::serialization::is_serializable<MessageType>::value,
162 std::optional<MessageType>>::type
164{
165 ers::error(NetworkMessageNotSerializable(ERS_HERE, typeid(MessageType).name())); // NOLINT(runtime/rtti)
166 return std::nullopt;
167}
168
169template<typename Datatype>
170template<typename MessageType>
171inline typename std::enable_if<dunedaq::serialization::is_serializable<MessageType>::value, void>::type
172NetworkReceiverModel<Datatype>::add_callback_impl(std::function<void(MessageType&)> callback)
173{
174 remove_callback();
175 {
176 std::lock_guard<std::mutex> lk(m_callback_mutex);
177 }
178 TLOG() << "Registering callback.";
179 m_callback = callback;
180 m_with_callback = true;
181 // start event loop (thread that calls when receive happens)
182 m_event_loop_runner = std::make_unique<std::thread>([&]() {
183 std::optional<Datatype> message;
184 while(m_with_callback.load() || message) {
185 try {
186 // 0 timeout when we are trying to stop
187 message = try_read_network<Datatype>(m_with_callback.load() ? std::chrono::milliseconds(20) : std::chrono::milliseconds(0));
188 if (message) {
189 m_callback(*message);
190 }
191 } catch (const ers::Issue&) {
192 ;
193 }
194 }
195 });
196}
197
198template<typename Datatype>
199template<typename MessageType>
200inline typename std::enable_if<!dunedaq::serialization::is_serializable<MessageType>::value, void>::type
201NetworkReceiverModel<Datatype>::add_callback_impl(std::function<void(MessageType&)>)
202{
203 throw NetworkMessageNotSerializable(ERS_HERE, typeid(MessageType).name()); // NOLINT(runtime/rtti)
204}
205
206} // namespace iomanager
207} // namespace dunedaq
#define ERS_HERE
static NetworkManager & get()
std::shared_ptr< ipm::Receiver > get_receiver(ConnectionId const &conn_id)
std::enable_if< dunedaq::serialization::is_serializable< MessageType >::value, std::optional< MessageType > >::type try_read_network(Receiver::timeout_t const &timeout)
void get_receiver(Receiver::timeout_t timeout)
void subscribe(std::string topic) override
std::enable_if< dunedaq::serialization::is_serializable< MessageType >::value, MessageType >::type read_network(Receiver::timeout_t const &timeout)
NetworkReceiverModel(ConnectionId const &conn_id)
void unsubscribe(std::string topic) override
std::enable_if< dunedaq::serialization::is_serializable< MessageType >::value, void >::type add_callback_impl(std::function< void(MessageType &)> callback)
Datatype receive(Receiver::timeout_t timeout) override
std::chrono::milliseconds timeout_t
Definition Receiver.hpp:28
static constexpr message_size_t s_any_size
Definition Receiver.hpp:80
Base class for any user define issue.
Definition Issue.hpp:69
#define TLOG(...)
Definition macro.hpp:22
T deserialize(const std::vector< CharType > &v)
Deserialize vector of bytes v into an instance of class T.
Including Qt Headers.
Unknown serialization Cannot deserialize message
void error(const Issue &issue)
Definition ers.hpp:81