DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
dunedaq::iomanager::QueueReceiverModel< Datatype > Class Template Reference

#include <QueueReceiverModel.hpp>

Inheritance diagram for dunedaq::iomanager::QueueReceiverModel< Datatype >:
[legend]
Collaboration diagram for dunedaq::iomanager::QueueReceiverModel< Datatype >:
[legend]

Public Member Functions

 QueueReceiverModel (ConnectionId const &request)
 QueueReceiverModel (QueueReceiverModel &&other)
 ~QueueReceiverModel ()
Datatype receive (Receiver::timeout_t timeout) override
std::optional< Datatype > try_receive (Receiver::timeout_t timeout) override
bool data_pending () override
void add_callback (std::function< void(Datatype &)> callback) override
void remove_callback () override
void subscribe (std::string) override
void unsubscribe (std::string) override
Public Member Functions inherited from dunedaq::iomanager::ReceiverConcept< Datatype >
 ReceiverConcept (ConnectionId const &conn_id)
Public Member Functions inherited from dunedaq::iomanager::Receiver
 Receiver (ConnectionId const &this_conn)
virtual ~Receiver ()=default
ConnectionId id () const
Public Member Functions inherited from dunedaq::utilities::NamedObject
 NamedObject (const std::string &name)
 NamedObject Constructor.
 NamedObject (NamedObject const &)=delete
 NamedObject is not copy-constructible.
 NamedObject (NamedObject &&)=default
 NamedObject is move-constructible.
NamedObjectoperator= (NamedObject const &)=delete
 NamedObject is not copy-assignable.
NamedObjectoperator= (NamedObject &&)=default
 NamedObject is move-assignable.
virtual ~NamedObject ()=default
 Default virtual destructor.
const std::string & get_name () const final
 Get the name of this NamedObejct.
Public Member Functions inherited from dunedaq::utilities::Named
 Named ()=default
 Named Constructor.
 Named (Named const &)=delete
 Named is not copy-constructible.
 Named (Named &&)=default
 Named is move-constructible.
Namedoperator= (Named const &)=delete
 Named is not copy-assignable.
Namedoperator= (Named &&)=default
 Named is move-assignable.
virtual ~Named ()=default
 Default virtual destructor.

Private Attributes

std::function< void(Datatype &)> m_callback
std::unique_ptr< std::jthread > m_event_loop_runner
std::shared_ptr< Queue< Datatype > > m_queue

Additional Inherited Members

Public Types inherited from dunedaq::iomanager::Receiver
using timeout_t = std::chrono::milliseconds
Static Public Attributes inherited from dunedaq::iomanager::Receiver
static constexpr timeout_t s_block = timeout_t::max()
static constexpr timeout_t s_no_block = timeout_t::zero()
Protected Attributes inherited from dunedaq::iomanager::Receiver
ConnectionId m_conn

Detailed Description

template<typename Datatype>
class dunedaq::iomanager::QueueReceiverModel< Datatype >

Definition at line 27 of file QueueReceiverModel.hpp.

Constructor & Destructor Documentation

◆ QueueReceiverModel() [1/2]

template<typename Datatype>
dunedaq::iomanager::QueueReceiverModel< Datatype >::QueueReceiverModel ( ConnectionId const & request)
inlineexplicit

Definition at line 20 of file QueueReceiverModel.hxx.

22{
23 TLOG() << "QueueReceiverModel created with DT! Addr: " << this;
24 // get queue ref from queueregistry based on conn_id
25 // std::string sink_name = conn_id to sink_name;
26 // m_source = std::make_unique<appfwk::DAQSource<Datatype>>(sink_name);
28 TLOG() << "QueueReceiverModel m_queue=" << static_cast<void*>(m_queue.get());
29}
std::shared_ptr< Queue< Datatype > > m_queue
static QueueRegistry & get()
Get a handle to the QueueRegistry.
std::shared_ptr< Queue< T > > get_queue(const std::string &name)
Get a handle to a Queue.
ReceiverConcept(ConnectionId const &conn_id)
Definition Receiver.hpp:50
#define TLOG(...)
Definition macro.hpp:22

◆ QueueReceiverModel() [2/2]

template<typename Datatype>
dunedaq::iomanager::QueueReceiverModel< Datatype >::QueueReceiverModel ( QueueReceiverModel< Datatype > && other)
inline

◆ ~QueueReceiverModel()

template<typename Datatype>
dunedaq::iomanager::QueueReceiverModel< Datatype >::~QueueReceiverModel ( )
inline

Definition at line 34 of file QueueReceiverModel.hpp.

Member Function Documentation

◆ add_callback()

template<typename Datatype>
void dunedaq::iomanager::QueueReceiverModel< Datatype >::add_callback ( std::function< void(Datatype &)> callback)
inlineoverridevirtual

Implements dunedaq::iomanager::ReceiverConcept< Datatype >.

Definition at line 87 of file QueueReceiverModel.hxx.

88{
90 TLOG() << "Registering callback.";
92 // start event loop (thread that calls when receive happens)
95 bool ret = true;
96 while (!token.stop_requested() || ret) {
97 // TLOG() << "Take data from q then invoke callback...";
98 ret = m_queue->try_pop(dt, token.stop_requested() ? std::chrono::milliseconds(0)
100 if (ret) {
101 m_callback(dt);
102 }
103 }
104 });
105 auto handle = m_event_loop_runner->native_handle();
106 std::string name = "Q_" + this->id().uid;
107 name.resize(15);
108 auto rc = pthread_setname_np(handle, name.c_str());
109 if (rc != 0) {
111 s << "The name " << name << " provided for the thread is too long.";
113 }
114}
ConnectionId id() const
Definition Receiver.hpp:39
void warning(const Issue &issue)
Definition ers.hpp:115

◆ data_pending()

template<typename Datatype>
bool dunedaq::iomanager::QueueReceiverModel< Datatype >::data_pending ( )
inlineoverridevirtual

Implements dunedaq::iomanager::ReceiverConcept< Datatype >.

Definition at line 40 of file QueueReceiverModel.hpp.

40{ return m_queue && m_queue->can_pop(); }

◆ receive()

template<typename Datatype>
Datatype dunedaq::iomanager::QueueReceiverModel< Datatype >::receive ( Receiver::timeout_t timeout)
inlineoverridevirtual

Implements dunedaq::iomanager::ReceiverConcept< Datatype >.

Definition at line 42 of file QueueReceiverModel.hxx.

43{
44 if (m_event_loop_runner != nullptr) {
45 TLOG() << "QueueReceiver model is equipped with callback! Ignoring receive call.";
46 throw ReceiveCallbackConflict(ERS_HERE, this->id().uid);
47 }
48 if (m_queue == nullptr) {
49 throw ConnectionInstanceNotFound(ERS_HERE, this->id().uid);
50 }
51 // TLOG() << "Hand off data...";
53 try {
54 m_queue->pop(dt, timeout);
55 } catch (QueueTimeoutExpired& ex) {
56 throw TimeoutExpired(ERS_HERE, this->id().uid, "pop", timeout.count(), ex);
57 }
58 return dt;
59 // if (m_queue->write(
60}

◆ remove_callback()

template<typename Datatype>
void dunedaq::iomanager::QueueReceiverModel< Datatype >::remove_callback ( )
inlineoverridevirtual

Implements dunedaq::iomanager::ReceiverConcept< Datatype >.

Definition at line 118 of file QueueReceiverModel.hxx.

119{
120 if (m_event_loop_runner != nullptr && m_event_loop_runner->joinable()) {
121 m_event_loop_runner->request_stop();
122 m_event_loop_runner->join();
123 m_event_loop_runner.reset(nullptr);
124 } else if (m_event_loop_runner != nullptr) {
125 TLOG() << "Event loop can't be closed!";
126 }
127 // remove function.
128}

◆ subscribe()

template<typename Datatype>
void dunedaq::iomanager::QueueReceiverModel< Datatype >::subscribe ( std::string )
inlineoverridevirtual

Implements dunedaq::iomanager::ReceiverConcept< Datatype >.

Definition at line 47 of file QueueReceiverModel.hpp.

47{}

◆ try_receive()

template<typename Datatype>
std::optional< Datatype > dunedaq::iomanager::QueueReceiverModel< Datatype >::try_receive ( Receiver::timeout_t timeout)
inlineoverridevirtual

Implements dunedaq::iomanager::ReceiverConcept< Datatype >.

Definition at line 64 of file QueueReceiverModel.hxx.

65{
66 if (m_event_loop_runner != nullptr) {
67 TLOG() << "QueueReceiver model is equipped with callback! Ignoring receive call.";
69 return std::nullopt;
70 }
71 if (m_queue == nullptr) {
73 return std::nullopt;
74 }
75 // TLOG() << "Hand off data...";
77 auto ret = m_queue->try_pop(dt, timeout);
78 if (ret) {
80 }
81 return std::nullopt;
82 // if (m_queue->write(
83}
void error(const Issue &issue)
Definition ers.hpp:81

◆ unsubscribe()

template<typename Datatype>
void dunedaq::iomanager::QueueReceiverModel< Datatype >::unsubscribe ( std::string )
inlineoverridevirtual

Implements dunedaq::iomanager::ReceiverConcept< Datatype >.

Definition at line 48 of file QueueReceiverModel.hpp.

48{}

Member Data Documentation

◆ m_callback

template<typename Datatype>
std::function<void(Datatype&)> dunedaq::iomanager::QueueReceiverModel< Datatype >::m_callback
private

Definition at line 51 of file QueueReceiverModel.hpp.

◆ m_event_loop_runner

template<typename Datatype>
std::unique_ptr<std::jthread> dunedaq::iomanager::QueueReceiverModel< Datatype >::m_event_loop_runner
private

Definition at line 52 of file QueueReceiverModel.hpp.

◆ m_queue

template<typename Datatype>
std::shared_ptr<Queue<Datatype> > dunedaq::iomanager::QueueReceiverModel< Datatype >::m_queue
private

Definition at line 53 of file QueueReceiverModel.hpp.


The documentation for this class was generated from the following files: