22template<
typename Datatype>
26 TLOG() <<
"NetworkReceiverModel created with DT! ID: " << conn_id.
uid <<
" Addr: " <<
static_cast<void*
>(
this);
30 TLOG() <<
"Initial connection attempt failed: " << ex;
34template<
typename Datatype>
37 , m_with_callback(other.m_with_callback.load())
38 , m_callback(
std::move(other.m_callback))
39 , m_event_loop_runner(
std::move(other.m_event_loop_runner))
40 , m_network_receiver_ptr(
std::move(other.m_network_receiver_ptr))
44template<
typename Datatype>
49 return read_network<Datatype>(timeout);
50 }
catch (ipm::ReceiveTimeoutExpired& ex) {
55template<
typename Datatype>
59 std::lock_guard<std::mutex> lk(m_callback_mutex);
60 m_with_callback =
false;
61 if (m_event_loop_runner !=
nullptr && m_event_loop_runner->joinable()) {
62 m_event_loop_runner->join();
63 m_event_loop_runner.reset(
nullptr);
64 }
else if (m_event_loop_runner !=
nullptr) {
65 TLOG() <<
"Event loop can't be closed!";
70template<
typename Datatype>
75 std::dynamic_pointer_cast<ipm::Subscriber>(m_network_receiver_ptr)->subscribe(topic);
79template<
typename Datatype>
84 std::dynamic_pointer_cast<ipm::Subscriber>(m_network_receiver_ptr)->unsubscribe(topic);
88template<
typename Datatype>
93 auto start = std::chrono::steady_clock::now();
94 while (m_network_receiver_ptr ==
nullptr &&
95 std::chrono::duration_cast<Receiver::timeout_t>(std::chrono::steady_clock::now() - start) < timeout) {
100 m_network_receiver_ptr =
nullptr;
101 std::this_thread::sleep_for(std::chrono::milliseconds(10));
106template<
typename Datatype>
107template<
typename MessageType>
108inline typename std::enable_if<dunedaq::serialization::is_serializable<MessageType>::value, MessageType>::type
111 std::lock_guard<std::mutex> lk(m_receive_mutex);
112 get_receiver(timeout);
114 if (m_network_receiver_ptr ==
nullptr) {
115 throw ConnectionInstanceNotFound(
ERS_HERE, this->
id().uid);
118 auto response = m_network_receiver_ptr->receive(timeout);
119 if (response.data.size() > 0) {
124 return MessageType();
127template<
typename Datatype>
128template<
typename MessageType>
129inline typename std::enable_if<!dunedaq::serialization::is_serializable<MessageType>::value, MessageType>::type
132 throw NetworkMessageNotSerializable(
ERS_HERE,
typeid(MessageType).name());
133 return MessageType();
136template<
typename Datatype>
137template<
typename MessageType>
139 typename std::enable_if<dunedaq::serialization::is_serializable<MessageType>::value, std::optional<MessageType>>::type
142 std::lock_guard<std::mutex> lk(m_receive_mutex);
143 get_receiver(timeout);
144 if (m_network_receiver_ptr ==
nullptr) {
145 TLOG() << ConnectionInstanceNotFound(
ERS_HERE, this->
id().uid);
152 if (res.
data.size() > 0) {
159template<
typename Datatype>
160template<
typename MessageType>
161inline typename std::enable_if<!dunedaq::serialization::is_serializable<MessageType>::value,
162 std::optional<MessageType>>::type
169template<
typename Datatype>
170template<
typename MessageType>
171inline typename std::enable_if<dunedaq::serialization::is_serializable<MessageType>::value,
void>::type
176 std::lock_guard<std::mutex> lk(m_callback_mutex);
178 TLOG() <<
"Registering callback.";
179 m_callback = callback;
180 m_with_callback =
true;
182 m_event_loop_runner = std::make_unique<std::thread>([&]() {
183 std::optional<Datatype>
message;
184 while(m_with_callback.load() || message) {
187 message = try_read_network<Datatype>(m_with_callback.load() ? std::chrono::milliseconds(20) : std::chrono::milliseconds(0));
189 m_callback(*message);
198template<
typename Datatype>
199template<
typename MessageType>
200inline typename std::enable_if<!dunedaq::serialization::is_serializable<MessageType>::value,
void>::type
203 throw NetworkMessageNotSerializable(
ERS_HERE,
typeid(MessageType).name());
static NetworkManager & get()
std::shared_ptr< ipm::Receiver > get_receiver(ConnectionId const &conn_id)
std::enable_if< dunedaq::serialization::is_serializable< MessageType >::value, std::optional< MessageType > >::type try_read_network(Receiver::timeout_t const &timeout)
void get_receiver(Receiver::timeout_t timeout)
void subscribe(std::string topic) override
std::enable_if< dunedaq::serialization::is_serializable< MessageType >::value, MessageType >::type read_network(Receiver::timeout_t const &timeout)
NetworkReceiverModel(ConnectionId const &conn_id)
void unsubscribe(std::string topic) override
std::enable_if< dunedaq::serialization::is_serializable< MessageType >::value, void >::type add_callback_impl(std::function< void(MessageType &)> callback)
void remove_callback() override
Datatype receive(Receiver::timeout_t timeout) override
std::chrono::milliseconds timeout_t
static constexpr message_size_t s_any_size
Base class for any user define issue.
T deserialize(const std::vector< CharType > &v)
Deserialize vector of bytes v into an instance of class T.
Unknown serialization Cannot deserialize message
void error(const Issue &issue)