18template<
typename Datatype>
22 TLOG() <<
"QueueReceiverModel created with DT! Addr: " <<
this;
27 TLOG() <<
"QueueReceiverModel m_queue=" <<
static_cast<void*
>(
m_queue.get());
30template<
typename Datatype>
33 , m_with_callback(other.m_with_callback.load())
34 , m_callback(
std::move(other.m_callback))
35 , m_event_loop_runner(
std::move(other.m_event_loop_runner))
36 , m_queue(
std::move(other.m_queue))
40template<
typename Datatype>
44 if (m_with_callback) {
45 TLOG() <<
"QueueReceiver model is equipped with callback! Ignoring receive call.";
46 throw ReceiveCallbackConflict(
ERS_HERE, this->
id().uid);
48 if (m_queue ==
nullptr) {
49 throw ConnectionInstanceNotFound(
ERS_HERE, this->
id().uid);
54 m_queue->pop(dt, timeout);
55 }
catch (QueueTimeoutExpired& ex) {
62template<
typename Datatype>
63inline std::optional<Datatype>
66 if (m_with_callback) {
67 TLOG() <<
"QueueReceiver model is equipped with callback! Ignoring receive call.";
71 if (m_queue ==
nullptr) {
77 auto ret = m_queue->try_pop(dt, timeout);
79 return std::make_optional(std::move(dt));
85template<
typename Datatype>
90 TLOG() <<
"Registering callback.";
91 m_callback = callback;
92 m_with_callback =
true;
94 m_event_loop_runner = std::make_unique<std::thread>([&]() {
97 while (m_with_callback.load() || ret) {
99 ret = m_queue->try_pop(dt, m_with_callback.load() ? std::chrono::milliseconds(1) : std::chrono::milliseconds(0));
107template<
typename Datatype>
111 m_with_callback =
false;
112 if (m_event_loop_runner !=
nullptr && m_event_loop_runner->joinable()) {
113 m_event_loop_runner->join();
114 m_event_loop_runner.reset(
nullptr);
115 }
else if (m_event_loop_runner !=
nullptr) {
116 TLOG() <<
"Event loop can't be closed!";
void remove_callback() override
QueueReceiverModel(ConnectionId const &request)
std::shared_ptr< Queue< Datatype > > m_queue
void add_callback(std::function< void(Datatype &)> callback) override
std::optional< Datatype > try_receive(Receiver::timeout_t timeout) override
Datatype receive(Receiver::timeout_t timeout) override
static QueueRegistry & get()
Get a handle to the QueueRegistry.
std::shared_ptr< Queue< T > > get_queue(const std::string &name)
Get a handle to a Queue.
std::chrono::milliseconds timeout_t
void error(const Issue &issue)