19template<
typename Datatype>
23 TLOG() <<
"QueueReceiverModel created with DT! Addr: " <<
this;
28 TLOG() <<
"QueueReceiverModel m_queue=" <<
static_cast<void*
>(
m_queue.get());
31template<
typename Datatype>
34 , m_callback(
std::move(other.m_callback))
35 , m_event_loop_runner(
std::move(other.m_event_loop_runner))
36 , m_queue(
std::move(other.m_queue))
40template<
typename Datatype>
44 if (m_event_loop_runner !=
nullptr) {
45 TLOG() <<
"QueueReceiver model is equipped with callback! Ignoring receive call.";
46 throw ReceiveCallbackConflict(
ERS_HERE, this->
id().uid);
48 if (m_queue ==
nullptr) {
49 throw ConnectionInstanceNotFound(
ERS_HERE, this->
id().uid);
54 m_queue->pop(dt, timeout);
55 }
catch (QueueTimeoutExpired& ex) {
62template<
typename Datatype>
63inline std::optional<Datatype>
66 if (m_event_loop_runner !=
nullptr) {
67 TLOG() <<
"QueueReceiver model is equipped with callback! Ignoring receive call.";
71 if (m_queue ==
nullptr) {
77 auto ret = m_queue->try_pop(dt, timeout);
79 return std::make_optional(std::move(dt));
85template<
typename Datatype>
90 TLOG() <<
"Registering callback.";
91 m_callback = callback;
93 m_event_loop_runner = std::make_unique<std::jthread>([&](std::stop_token token) {
96 while (!token.stop_requested() || ret) {
98 ret = m_queue->try_pop(dt, token.stop_requested() ? std::chrono::milliseconds(0)
99 : std::chrono::milliseconds(1));
105 auto handle = m_event_loop_runner->native_handle();
106 std::string name =
"Q_" + this->id().uid;
108 auto rc = pthread_setname_np(handle, name.c_str());
110 std::ostringstream s;
111 s <<
"The name " << name <<
" provided for the thread is too long.";
116template<
typename Datatype>
120 if (m_event_loop_runner !=
nullptr && m_event_loop_runner->joinable()) {
121 m_event_loop_runner->request_stop();
122 m_event_loop_runner->join();
123 m_event_loop_runner.reset(
nullptr);
124 }
else if (m_event_loop_runner !=
nullptr) {
125 TLOG() <<
"Event loop can't be closed!";
void remove_callback() override
QueueReceiverModel(ConnectionId const &request)
std::shared_ptr< Queue< Datatype > > m_queue
void add_callback(std::function< void(Datatype &)> callback) override
std::optional< Datatype > try_receive(Receiver::timeout_t timeout) override
Datatype receive(Receiver::timeout_t timeout) override
static QueueRegistry & get()
Get a handle to the QueueRegistry.
std::shared_ptr< Queue< T > > get_queue(const std::string &name)
Get a handle to a Queue.
std::chrono::milliseconds timeout_t
void warning(const Issue &issue)
void error(const Issue &issue)