DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
QueueReceiverModel.hxx
Go to the documentation of this file.
1
5
6#include "logging/Logging.hpp"
7
8#include <atomic>
9#include <memory>
10#include <optional>
11#include <string>
12#include <thread>
13#include <typeinfo>
14#include <utility>
15
16namespace dunedaq {
17namespace iomanager {
18
19template<typename Datatype>
21 : ReceiverConcept<Datatype>(request)
22{
23 TLOG() << "QueueReceiverModel created with DT! Addr: " << this;
24 // get queue ref from queueregistry based on conn_id
25 // std::string sink_name = conn_id to sink_name;
26 // m_source = std::make_unique<appfwk::DAQSource<Datatype>>(sink_name);
27 m_queue = QueueRegistry::get().get_queue<Datatype>(request.uid);
28 TLOG() << "QueueReceiverModel m_queue=" << static_cast<void*>(m_queue.get());
29}
30
31template<typename Datatype>
33 : ReceiverConcept<Datatype>(other.m_conn.uid)
34 , m_with_callback(other.m_with_callback.load())
35 , m_callback(std::move(other.m_callback))
36 , m_event_loop_runner(std::move(other.m_event_loop_runner))
37 , m_queue(std::move(other.m_queue))
38{
39}
40
41template<typename Datatype>
42inline Datatype
44{
45 if (m_with_callback) {
46 TLOG() << "QueueReceiver model is equipped with callback! Ignoring receive call.";
47 throw ReceiveCallbackConflict(ERS_HERE, this->id().uid);
48 }
49 if (m_queue == nullptr) {
50 throw ConnectionInstanceNotFound(ERS_HERE, this->id().uid);
51 }
52 // TLOG() << "Hand off data...";
53 Datatype dt;
54 try {
55 m_queue->pop(dt, timeout);
56 } catch (QueueTimeoutExpired& ex) {
57 throw TimeoutExpired(ERS_HERE, this->id().uid, "pop", timeout.count(), ex);
58 }
59 return dt;
60 // if (m_queue->write(
61}
62
63template<typename Datatype>
64inline std::optional<Datatype>
66{
67 if (m_with_callback) {
68 TLOG() << "QueueReceiver model is equipped with callback! Ignoring receive call.";
69 ers::error(ReceiveCallbackConflict(ERS_HERE, this->id().uid));
70 return std::nullopt;
71 }
72 if (m_queue == nullptr) {
73 ers::error(ConnectionInstanceNotFound(ERS_HERE, this->id().uid));
74 return std::nullopt;
75 }
76 // TLOG() << "Hand off data...";
77 Datatype dt;
78 auto ret = m_queue->try_pop(dt, timeout);
79 if (ret) {
80 return std::make_optional(std::move(dt));
81 }
82 return std::nullopt;
83 // if (m_queue->write(
84}
85
86template<typename Datatype>
87inline void
88QueueReceiverModel<Datatype>::add_callback(std::function<void(Datatype&)> callback)
89{
90 remove_callback();
91 TLOG() << "Registering callback.";
92 m_callback = callback;
93 m_with_callback = true;
94 // start event loop (thread that calls when receive happens)
95 m_event_loop_runner = std::make_unique<std::thread>([&]() {
96 Datatype dt;
97 bool ret = true;
98 while (m_with_callback.load() || ret) {
99 // TLOG() << "Take data from q then invoke callback...";
100 ret = m_queue->try_pop(dt, m_with_callback.load() ? std::chrono::milliseconds(1) : std::chrono::milliseconds(0));
101 if (ret) {
102 m_callback(dt);
103 }
104 }
105 });
106}
107
108template<typename Datatype>
109inline void
111{
112 m_with_callback = false;
113 if (m_event_loop_runner != nullptr && m_event_loop_runner->joinable()) {
114 m_event_loop_runner->join();
115 m_event_loop_runner.reset(nullptr);
116 } else if (m_event_loop_runner != nullptr) {
117 TLOG() << "Event loop can't be closed!";
118 }
119 // remove function.
120}
121
122} // namespace iomanager
123} // namespace dunedaq
#define ERS_HERE
QueueReceiverModel(ConnectionId const &request)
std::shared_ptr< Queue< Datatype > > m_queue
void add_callback(std::function< void(Datatype &)> callback) override
std::optional< Datatype > try_receive(Receiver::timeout_t timeout) override
Datatype receive(Receiver::timeout_t timeout) override
static QueueRegistry & get()
Get a handle to the QueueRegistry.
std::shared_ptr< Queue< T > > get_queue(const std::string &name)
Get a handle to a Queue.
std::chrono::milliseconds timeout_t
Definition Receiver.hpp:28
#define TLOG(...)
Definition macro.hpp:22
Including Qt Headers.
void error(const Issue &issue)
Definition ers.hpp:81