19template<
typename Datatype>
23 TLOG() <<
"QueueReceiverModel created with DT! Addr: " <<
this;
28 TLOG() <<
"QueueReceiverModel m_queue=" <<
static_cast<void*
>(
m_queue.get());
31template<
typename Datatype>
34 , m_with_callback(other.m_with_callback.load())
35 , m_callback(
std::move(other.m_callback))
36 , m_event_loop_runner(
std::move(other.m_event_loop_runner))
37 , m_queue(
std::move(other.m_queue))
41template<
typename Datatype>
45 if (m_with_callback) {
46 TLOG() <<
"QueueReceiver model is equipped with callback! Ignoring receive call.";
47 throw ReceiveCallbackConflict(
ERS_HERE, this->
id().uid);
49 if (m_queue ==
nullptr) {
50 throw ConnectionInstanceNotFound(
ERS_HERE, this->
id().uid);
55 m_queue->pop(dt, timeout);
56 }
catch (QueueTimeoutExpired& ex) {
63template<
typename Datatype>
64inline std::optional<Datatype>
67 if (m_with_callback) {
68 TLOG() <<
"QueueReceiver model is equipped with callback! Ignoring receive call.";
72 if (m_queue ==
nullptr) {
78 auto ret = m_queue->try_pop(dt, timeout);
80 return std::make_optional(std::move(dt));
86template<
typename Datatype>
91 TLOG() <<
"Registering callback.";
92 m_callback = callback;
93 m_with_callback =
true;
95 m_event_loop_runner = std::make_unique<std::thread>([&]() {
98 while (m_with_callback.load() || ret) {
100 ret = m_queue->try_pop(dt, m_with_callback.load() ? std::chrono::milliseconds(1) : std::chrono::milliseconds(0));
108template<
typename Datatype>
112 m_with_callback =
false;
113 if (m_event_loop_runner !=
nullptr && m_event_loop_runner->joinable()) {
114 m_event_loop_runner->join();
115 m_event_loop_runner.reset(
nullptr);
116 }
else if (m_event_loop_runner !=
nullptr) {
117 TLOG() <<
"Event loop can't be closed!";
void remove_callback() override
QueueReceiverModel(ConnectionId const &request)
std::shared_ptr< Queue< Datatype > > m_queue
void add_callback(std::function< void(Datatype &)> callback) override
std::optional< Datatype > try_receive(Receiver::timeout_t timeout) override
Datatype receive(Receiver::timeout_t timeout) override
static QueueRegistry & get()
Get a handle to the QueueRegistry.
std::shared_ptr< Queue< T > > get_queue(const std::string &name)
Get a handle to a Queue.
std::chrono::milliseconds timeout_t
void error(const Issue &issue)