DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
QueueReceiverModel.hxx
Go to the documentation of this file.
1
5
6#include "logging/Logging.hpp"
8
9#include <atomic>
10#include <memory>
11#include <optional>
12#include <string>
13#include <thread>
14#include <typeinfo>
15#include <utility>
16
17namespace dunedaq::iomanager {
18
19template<typename Datatype>
21 : ReceiverConcept<Datatype>(request)
22{
23 TLOG() << "QueueReceiverModel created with DT! Addr: " << this;
24 // get queue ref from queueregistry based on conn_id
25 // std::string sink_name = conn_id to sink_name;
26 // m_source = std::make_unique<appfwk::DAQSource<Datatype>>(sink_name);
27 m_queue = QueueRegistry::get().get_queue<Datatype>(request.uid);
28 TLOG() << "QueueReceiverModel m_queue=" << static_cast<void*>(m_queue.get());
29}
30
31template<typename Datatype>
33 : ReceiverConcept<Datatype>(other.m_conn.uid)
34 , m_callback(std::move(other.m_callback))
35 , m_event_loop_runner(std::move(other.m_event_loop_runner))
36 , m_queue(std::move(other.m_queue))
37{
38}
39
40template<typename Datatype>
41inline Datatype
43{
44 if (m_event_loop_runner != nullptr) {
45 TLOG() << "QueueReceiver model is equipped with callback! Ignoring receive call.";
46 throw ReceiveCallbackConflict(ERS_HERE, this->id().uid);
47 }
48 if (m_queue == nullptr) {
49 throw ConnectionInstanceNotFound(ERS_HERE, this->id().uid);
50 }
51 // TLOG() << "Hand off data...";
52 Datatype dt;
53 try {
54 m_queue->pop(dt, timeout);
55 } catch (QueueTimeoutExpired& ex) {
56 throw TimeoutExpired(ERS_HERE, this->id().uid, "pop", timeout.count(), ex);
57 }
58 return dt;
59 // if (m_queue->write(
60}
61
62template<typename Datatype>
63inline std::optional<Datatype>
65{
66 if (m_event_loop_runner != nullptr) {
67 TLOG() << "QueueReceiver model is equipped with callback! Ignoring receive call.";
68 ers::error(ReceiveCallbackConflict(ERS_HERE, this->id().uid));
69 return std::nullopt;
70 }
71 if (m_queue == nullptr) {
72 ers::error(ConnectionInstanceNotFound(ERS_HERE, this->id().uid));
73 return std::nullopt;
74 }
75 // TLOG() << "Hand off data...";
76 Datatype dt;
77 auto ret = m_queue->try_pop(dt, timeout);
78 if (ret) {
79 return std::make_optional(std::move(dt));
80 }
81 return std::nullopt;
82 // if (m_queue->write(
83}
84
85template<typename Datatype>
86inline void
87QueueReceiverModel<Datatype>::add_callback(std::function<void(Datatype&)> callback)
88{
89 remove_callback();
90 TLOG() << "Registering callback.";
91 m_callback = callback;
92 // start event loop (thread that calls when receive happens)
93 m_event_loop_runner = std::make_unique<std::jthread>([&](std::stop_token token) {
94 Datatype dt;
95 bool ret = true;
96 while (!token.stop_requested() || ret) {
97 // TLOG() << "Take data from q then invoke callback...";
98 ret = m_queue->try_pop(dt, token.stop_requested() ? std::chrono::milliseconds(0)
99 : std::chrono::milliseconds(1));
100 if (ret) {
101 m_callback(dt);
102 }
103 }
104 });
105 auto handle = m_event_loop_runner->native_handle();
106 std::string name = "Q_" + this->id().uid;
107 name.resize(15);
108 auto rc = pthread_setname_np(handle, name.c_str());
109 if (rc != 0) {
110 std::ostringstream s;
111 s << "The name " << name << " provided for the thread is too long.";
112 ers::warning(utilities::ThreadingIssue(ERS_HERE, s.str()));
113 }
114}
115
116template<typename Datatype>
117inline void
119{
120 if (m_event_loop_runner != nullptr && m_event_loop_runner->joinable()) {
121 m_event_loop_runner->request_stop();
122 m_event_loop_runner->join();
123 m_event_loop_runner.reset(nullptr);
124 } else if (m_event_loop_runner != nullptr) {
125 TLOG() << "Event loop can't be closed!";
126 }
127 // remove function.
128}
129
130} // namespace dunedaq::iomanager
#define ERS_HERE
QueueReceiverModel(ConnectionId const &request)
std::shared_ptr< Queue< Datatype > > m_queue
void add_callback(std::function< void(Datatype &)> callback) override
std::optional< Datatype > try_receive(Receiver::timeout_t timeout) override
Datatype receive(Receiver::timeout_t timeout) override
static QueueRegistry & get()
Get a handle to the QueueRegistry.
std::shared_ptr< Queue< T > > get_queue(const std::string &name)
Get a handle to a Queue.
std::chrono::milliseconds timeout_t
Definition Receiver.hpp:27
#define TLOG(...)
Definition macro.hpp:22
void warning(const Issue &issue)
Definition ers.hpp:115
void error(const Issue &issue)
Definition ers.hpp:81