20template<
typename Datatype>
24 TLOG() <<
"NetworkReceiverModel created with DT! ID: " << conn_id.
uid <<
" Addr: " <<
static_cast<void*
>(
this);
28 TLOG() <<
"Initial connection attempt failed: " << ex;
32template<
typename Datatype>
41template<
typename Datatype>
47 }
catch (ipm::ReceiveTimeoutExpired& ex) {
52template<
typename Datatype>
61 TLOG() <<
"Event loop can't be closed!";
67template<
typename Datatype>
76template<
typename Datatype>
85template<
typename Datatype>
90 auto start = std::chrono::steady_clock::now();
92 std::chrono::duration_cast<Receiver::timeout_t>(std::chrono::steady_clock::now() - start) < timeout) {
98 std::this_thread::sleep_for(std::chrono::milliseconds(10));
103template<
typename Datatype>
104template<
typename MessageType>
105inline typename std::enable_if<serialization::is_serializable<MessageType>::value, MessageType>
::type
112 throw ConnectionInstanceNotFound(
ERS_HERE, this->
id().uid);
116 if (response.data.size() > 0) {
117 return serialization::deserialize<MessageType>(response.data);
121 return MessageType();
124template<
typename Datatype>
125template<
typename MessageType>
126inline typename std::enable_if<!serialization::is_serializable<MessageType>::value, MessageType>
::type
129 throw NetworkMessageNotSerializable(
ERS_HERE,
typeid(MessageType).name());
130 return MessageType();
133template<
typename Datatype>
134template<
typename MessageType>
135inline typename std::enable_if<serialization::is_serializable<MessageType>::value, std::optional<MessageType>>
::type
148 if (res.
data.size() > 0) {
149 return std::make_optional<MessageType>(serialization::deserialize<MessageType>(res.
data));
155template<
typename Datatype>
156template<
typename MessageType>
157inline typename std::enable_if<!serialization::is_serializable<MessageType>::value, std::optional<MessageType>>
::type
164template<
typename Datatype>
165template<
typename MessageType>
166inline typename std::enable_if<serialization::is_serializable<MessageType>::value,
void>
::type
174 TLOG() <<
"Registering callback.";
179 std::optional<Datatype> message;
180 while (!token.stop_requested() || message) {
184 : std::chrono::milliseconds(20));
195 std::string name =
"N_" + this->
id().uid;
197 auto rc = pthread_setname_np(handle, name.c_str());
199 std::ostringstream s;
200 s <<
"The name " << name <<
" provided for the thread is too long.";
205template<
typename Datatype>
206template<
typename MessageType>
207inline typename std::enable_if<!serialization::is_serializable<MessageType>::value,
void>
::type
210 throw NetworkMessageNotSerializable(
ERS_HERE,
typeid(MessageType).name());
static NetworkManager & get()
std::shared_ptr< ipm::Receiver > get_receiver(ConnectionId const &conn_id)
void get_receiver(Receiver::timeout_t timeout)
void subscribe(std::string topic) override
std::enable_if< serialization::is_serializable< MessageType >::value, void >::type add_callback_impl(std::function< void(MessageType &)> callback)
std::function< void(Datatype &)> m_callback
std::enable_if< serialization::is_serializable< MessageType >::value, MessageType >::type read_network(Receiver::timeout_t const &timeout)
NetworkReceiverModel(ConnectionId const &conn_id)
std::unique_ptr< std::jthread > m_event_loop_runner
std::mutex m_receive_mutex
void unsubscribe(std::string topic) override
std::shared_ptr< ipm::Receiver > m_network_receiver_ptr
std::enable_if< serialization::is_serializable< MessageType >::value, std::optional< MessageType > >::type try_read_network(Receiver::timeout_t const &timeout)
std::mutex m_callback_mutex
void remove_callback() override
Datatype receive(Receiver::timeout_t timeout) override
ReceiverConcept(ConnectionId const &conn_id)
std::chrono::milliseconds timeout_t
static constexpr message_size_t s_any_size
Base class for any user define issue.
#define TLOG_DEBUG(lvl,...)
void warning(const Issue &issue)
void error(const Issue &issue)