DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
dunedaq::iomanager::StdDeQueue< T > Class Template Reference

A Queue Implementation that uses a std::deque as its backend. More...

#include <StdDeQueue.hpp>

Inheritance diagram for dunedaq::iomanager::StdDeQueue< T >:
[legend]
Collaboration diagram for dunedaq::iomanager::StdDeQueue< T >:
[legend]

Public Types

using value_t = T
 Type of data stored in the StdDeQueue.
 
using duration_t = typename Queue<T>::duration_t
 Type used for expressing timeouts.
 
- Public Types inherited from dunedaq::iomanager::Queue< T >
using value_t = T
 Type stored in the Queue.
 
using duration_t = std::chrono::milliseconds
 Base duration type for timeouts.
 
- Public Types inherited from dunedaq::opmonlib::MonitorableObject
using NodePtr = std::weak_ptr<MonitorableObject>
 
using NewNodePtr = std::shared_ptr<MonitorableObject>
 
using ElementId = std::string
 

Public Member Functions

 StdDeQueue (const std::string &name, size_t capacity)
 StdDeQueue Constructor.
 
bool can_pop () const noexcept override
 Determine whether the Queue may be popped from.
 
void pop (value_t &val, const duration_t &) override
 Pop the first value off of the queue.
 
bool try_pop (value_t &val, const duration_t &) override
 
bool can_push () const noexcept override
 Determine whether the Queue may be pushed onto.
 
void push (value_t &&, const duration_t &) override
 Push a value onto the Queue.
 
bool try_push (value_t &&, const duration_t &) override
 
size_t get_capacity () const override
 Get the capacity (max size) of the queue.
 
size_t get_num_elements () const override
 
 StdDeQueue (const StdDeQueue &)=delete
 StdDeQueue is not copy-constructible.
 
StdDeQueueoperator= (const StdDeQueue &)=delete
 StdDeQueue is not copy-assignable.
 
 StdDeQueue (StdDeQueue &&)=delete
 StdDeQueue is not move-constructible.
 
StdDeQueueoperator= (StdDeQueue &&)=delete
 StdDeQueue is not move-assignable.
 
- Public Member Functions inherited from dunedaq::iomanager::Queue< T >
 Queue (const std::string &name)
 Queue Constructor.
 
- Public Member Functions inherited from dunedaq::iomanager::QueueBase
 QueueBase (const std::string &name)
 QueueBase Constructor.
 
- Public Member Functions inherited from dunedaq::utilities::NamedObject
 NamedObject (const std::string &name)
 NamedObject Constructor.
 
 NamedObject (NamedObject const &)=delete
 NamedObject is not copy-constructible.
 
 NamedObject (NamedObject &&)=default
 NamedObject is move-constructible.
 
NamedObjectoperator= (NamedObject const &)=delete
 NamedObject is not copy-assignable.
 
NamedObjectoperator= (NamedObject &&)=default
 NamedObject is move-assignable.
 
virtual ~NamedObject ()=default
 Default virtual destructor.
 
const std::string & get_name () const final
 Get the name of this NamedObejct.
 
- Public Member Functions inherited from dunedaq::utilities::Named
 Named ()=default
 Named Constructor.
 
 Named (Named const &)=delete
 Named is not copy-constructible.
 
 Named (Named &&)=default
 Named is move-constructible.
 
Namedoperator= (Named const &)=delete
 Named is not copy-assignable.
 
Namedoperator= (Named &&)=default
 Named is move-assignable.
 
virtual ~Named ()=default
 Default virtual destructor.
 
- Public Member Functions inherited from dunedaq::opmonlib::MonitorableObject
 MonitorableObject (const MonitorableObject &)=delete
 
MonitorableObjectoperator= (const MonitorableObject &)=delete
 
 MonitorableObject (MonitorableObject &&)=delete
 
MonitorableObjectoperator= (MonitorableObject &&)=delete
 
virtual ~MonitorableObject ()=default
 
auto get_opmon_id () const noexcept
 
auto get_opmon_level () const noexcept
 

Private Member Functions

void try_lock_for (std::unique_lock< std::mutex > &, const duration_t &)
 

Private Attributes

std::deque< value_tm_deque
 
size_t m_capacity
 
std::atomic< size_t > m_size = 0
 
std::mutex m_mutex
 
std::condition_variable m_no_longer_full
 
std::condition_variable m_no_longer_empty
 

Additional Inherited Members

- Static Public Member Functions inherited from dunedaq::opmonlib::MonitorableObject
static bool publishable_metric (OpMonLevel entry, OpMonLevel system) noexcept
 
- Protected Member Functions inherited from dunedaq::iomanager::QueueBase
void generate_opmon_data () override
 Method to retrieve information (occupancy) from queues.
 
- Protected Member Functions inherited from dunedaq::opmonlib::MonitorableObject
 MonitorableObject ()=default
 
void register_node (ElementId name, NewNodePtr)
 
void publish (google::protobuf::Message &&, CustomOrigin &&co={}, OpMonLevel l=to_level(EntryOpMonLevel::kDefault)) const noexcept
 

Detailed Description

template<class T>
class dunedaq::iomanager::StdDeQueue< T >

A Queue Implementation that uses a std::deque as its backend.

Template Parameters
TData Type to be stored in the std::deque

Definition at line 37 of file StdDeQueue.hpp.

Member Typedef Documentation

◆ duration_t

template<class T >
using dunedaq::iomanager::StdDeQueue< T >::duration_t = typename Queue<T>::duration_t

Type used for expressing timeouts.

Definition at line 41 of file StdDeQueue.hpp.

◆ value_t

template<class T >
using dunedaq::iomanager::StdDeQueue< T >::value_t = T

Type of data stored in the StdDeQueue.

Definition at line 40 of file StdDeQueue.hpp.

Constructor & Destructor Documentation

◆ StdDeQueue() [1/3]

template<class T >
dunedaq::iomanager::StdDeQueue< T >::StdDeQueue ( const std::string & name,
size_t capacity )
explicit

StdDeQueue Constructor.

Parameters
nameName of this StdDeQueue instance

Definition at line 7 of file StdDeQueue.hxx.

8 : Queue<T>(name)
9 , m_deque()
10 , m_capacity(capacity)
11 , m_size(0)
12{
13 assert(m_deque.max_size() > this->get_capacity());
14}
Queue(const std::string &name)
Queue Constructor.
Definition Queue.hpp:49
std::deque< value_t > m_deque
std::atomic< size_t > m_size

◆ StdDeQueue() [2/3]

template<class T >
dunedaq::iomanager::StdDeQueue< T >::StdDeQueue ( const StdDeQueue< T > & )
delete

StdDeQueue is not copy-constructible.

◆ StdDeQueue() [3/3]

template<class T >
dunedaq::iomanager::StdDeQueue< T >::StdDeQueue ( StdDeQueue< T > && )
delete

StdDeQueue is not move-constructible.

Member Function Documentation

◆ can_pop()

template<class T >
bool dunedaq::iomanager::StdDeQueue< T >::can_pop ( ) const
inlineoverridevirtualnoexcept

Determine whether the Queue may be popped from.

Returns
True if the queue is not empty, false if it is

Reimplemented from dunedaq::iomanager::Queue< T >.

Definition at line 49 of file StdDeQueue.hpp.

49{ return this->get_num_elements() > 0; }
size_t get_num_elements() const override

◆ can_push()

template<class T >
bool dunedaq::iomanager::StdDeQueue< T >::can_push ( ) const
inlineoverridevirtualnoexcept

Determine whether the Queue may be pushed onto.

Returns
True if the queue is not full, false if it is

Reimplemented from dunedaq::iomanager::Queue< T >.

Definition at line 53 of file StdDeQueue.hpp.

53{ return this->get_num_elements() < this->get_capacity(); }
size_t get_capacity() const override
Get the capacity (max size) of the queue.

◆ get_capacity()

template<class T >
size_t dunedaq::iomanager::StdDeQueue< T >::get_capacity ( ) const
inlineoverridevirtual

Get the capacity (max size) of the queue.

Returns
size_t capacity

Implements dunedaq::iomanager::QueueBase.

Definition at line 57 of file StdDeQueue.hpp.

57{ return m_capacity; }

◆ get_num_elements()

template<class T >
size_t dunedaq::iomanager::StdDeQueue< T >::get_num_elements ( ) const
inlineoverridevirtual

Implements dunedaq::iomanager::QueueBase.

Definition at line 59 of file StdDeQueue.hpp.

59{ return m_size.load(std::memory_order_acquire); }

◆ operator=() [1/2]

template<class T >
StdDeQueue & dunedaq::iomanager::StdDeQueue< T >::operator= ( const StdDeQueue< T > & )
delete

StdDeQueue is not copy-assignable.

◆ operator=() [2/2]

template<class T >
StdDeQueue & dunedaq::iomanager::StdDeQueue< T >::operator= ( StdDeQueue< T > && )
delete

StdDeQueue is not move-assignable.

◆ pop()

template<class T >
void dunedaq::iomanager::StdDeQueue< T >::pop ( value_t & val,
const duration_t & timeout )
overridevirtual

Pop the first value off of the queue.

Parameters
valReference to the value that is popped from the queue
timeoutTimeout for the pop operation

This is a pure virtual function If pop takes longer than the timeout, implementations should throw an exception

Implements dunedaq::iomanager::Queue< T >.

Definition at line 44 of file StdDeQueue.hxx.

45{
46
47 auto start_time = std::chrono::steady_clock::now();
48 std::unique_lock<std::mutex> lk(m_mutex, std::defer_lock);
49
50 this->try_lock_for(lk, timeout);
51
52 auto time_to_wait_for_data = (start_time + timeout) - std::chrono::steady_clock::now();
53
54 if (time_to_wait_for_data.count() > 0) {
55 m_no_longer_empty.wait_for(lk, time_to_wait_for_data, [&]() { return this->can_pop(); });
56 }
57
58 if (this->can_pop()) {
59 val = std::move(m_deque.front());
60 m_size--;
61 m_deque.pop_front();
62 m_no_longer_full.notify_one();
63 } else {
64 throw QueueTimeoutExpired(
65 ERS_HERE, this->get_name(), "pop", std::chrono::duration_cast<std::chrono::milliseconds>(timeout).count());
66 }
67}
#define ERS_HERE
void try_lock_for(std::unique_lock< std::mutex > &, const duration_t &)
bool can_pop() const noexcept override
Determine whether the Queue may be popped from.
std::condition_variable m_no_longer_full
std::condition_variable m_no_longer_empty
const std::string & get_name() const final
Get the name of this NamedObejct.
Cannot add TPSet with start_time

◆ push()

template<class T >
void dunedaq::iomanager::StdDeQueue< T >::push ( value_t && val,
const duration_t & timeout )
overridevirtual

Push a value onto the Queue.

Parameters
valValue to push (rvalue)
timeoutTimeout for the push operation.

This is a pure virtual function. If push takes longer than the timeout, implementations should throw an exception.

Implements dunedaq::iomanager::Queue< T >.

Definition at line 18 of file StdDeQueue.hxx.

19{
20
21 auto start_time = std::chrono::steady_clock::now();
22 std::unique_lock<std::mutex> lk(m_mutex, std::defer_lock);
23
24 this->try_lock_for(lk, timeout);
25
26 auto time_to_wait_for_space = (start_time + timeout) - std::chrono::steady_clock::now();
27
28 if (time_to_wait_for_space.count() > 0) {
29 m_no_longer_full.wait_for(lk, time_to_wait_for_space, [&]() { return this->can_push(); });
30 }
31
32 if (this->can_push()) {
33 m_deque.push_back(std::move(object_to_push));
34 m_size++;
35 m_no_longer_empty.notify_one();
36 } else {
37 throw QueueTimeoutExpired(
38 ERS_HERE, this->get_name(), "push", std::chrono::duration_cast<std::chrono::milliseconds>(timeout).count());
39 }
40}
bool can_push() const noexcept override
Determine whether the Queue may be pushed onto.

◆ try_lock_for()

template<class T >
void dunedaq::iomanager::StdDeQueue< T >::try_lock_for ( std::unique_lock< std::mutex > & lk,
const duration_t & timeout )
private

Definition at line 133 of file StdDeQueue.hxx.

134{
135 assert(!lk.owns_lock());
136
137 auto start_time = std::chrono::steady_clock::now();
138 auto ret = lk.try_lock();
139
140 if ((!ret || !lk.owns_lock()) && timeout.count() > 0) {
141
142 int approximate_number_of_retries = 5;
143 duration_t pause_between_tries = duration_t(timeout.count() / approximate_number_of_retries);
144
145 while (std::chrono::steady_clock::now() < start_time + timeout) {
146 std::this_thread::sleep_for(pause_between_tries);
147 ret = lk.try_lock();
148 if (ret && lk.owns_lock()) {
149 break;
150 }
151 }
152 }
153
154 if (!lk.owns_lock()) {
155 throw QueueTimeoutExpired(
156 ERS_HERE, this->get_name(), "lock mutex", std::chrono::duration_cast<std::chrono::milliseconds>(timeout).count());
157 }
158}
typename Queue< T >::duration_t duration_t
Type used for expressing timeouts.

◆ try_pop()

template<class T >
bool dunedaq::iomanager::StdDeQueue< T >::try_pop ( value_t & val,
const duration_t & timeout )
overridevirtual

Implements dunedaq::iomanager::Queue< T >.

Definition at line 99 of file StdDeQueue.hxx.

100{
101
102 auto start_time = std::chrono::steady_clock::now();
103 std::unique_lock<std::mutex> lk(m_mutex, std::defer_lock);
104
105 this->try_lock_for(lk, timeout);
106
107 auto time_to_wait_for_data = (start_time + timeout) - std::chrono::steady_clock::now();
108
109 if (time_to_wait_for_data.count() > 0) {
110 m_no_longer_empty.wait_for(lk, time_to_wait_for_data, [&]() { return this->can_pop(); });
111 }
112
113 if (this->can_pop()) {
114 val = std::move(m_deque.front());
115 m_size--;
116 m_deque.pop_front();
117 m_no_longer_full.notify_one();
118 return true;
119 } else {
120 ers::error(QueueTimeoutExpired(
121 ERS_HERE, this->get_name(), "pop", std::chrono::duration_cast<std::chrono::milliseconds>(timeout).count()));
122 return false;
123 }
124}
void error(const Issue &issue)
Definition ers.hpp:81

◆ try_push()

template<class T >
bool dunedaq::iomanager::StdDeQueue< T >::try_push ( value_t && object_to_push,
const duration_t & timeout )
overridevirtual

Implements dunedaq::iomanager::Queue< T >.

Definition at line 71 of file StdDeQueue.hxx.

72{
73
74 auto start_time = std::chrono::steady_clock::now();
75 std::unique_lock<std::mutex> lk(m_mutex, std::defer_lock);
76
77 this->try_lock_for(lk, timeout);
78
79 auto time_to_wait_for_space = (start_time + timeout) - std::chrono::steady_clock::now();
80
81 if (time_to_wait_for_space.count() > 0) {
82 m_no_longer_full.wait_for(lk, time_to_wait_for_space, [&]() { return this->can_push(); });
83 }
84
85 if (this->can_push()) {
86 m_deque.push_back(std::move(object_to_push));
87 m_size++;
88 m_no_longer_empty.notify_one();
89 return true;
90 } else {
91 ers::error(QueueTimeoutExpired(
92 ERS_HERE, this->get_name(), "push", std::chrono::duration_cast<std::chrono::milliseconds>(timeout).count()));
93 return false;
94 }
95}

Member Data Documentation

◆ m_capacity

template<class T >
size_t dunedaq::iomanager::StdDeQueue< T >::m_capacity
private

Definition at line 73 of file StdDeQueue.hpp.

◆ m_deque

template<class T >
std::deque<value_t> dunedaq::iomanager::StdDeQueue< T >::m_deque
private

Definition at line 72 of file StdDeQueue.hpp.

◆ m_mutex

template<class T >
std::mutex dunedaq::iomanager::StdDeQueue< T >::m_mutex
private

Definition at line 76 of file StdDeQueue.hpp.

◆ m_no_longer_empty

template<class T >
std::condition_variable dunedaq::iomanager::StdDeQueue< T >::m_no_longer_empty
private

Definition at line 78 of file StdDeQueue.hpp.

◆ m_no_longer_full

template<class T >
std::condition_variable dunedaq::iomanager::StdDeQueue< T >::m_no_longer_full
private

Definition at line 77 of file StdDeQueue.hpp.

◆ m_size

template<class T >
std::atomic<size_t> dunedaq::iomanager::StdDeQueue< T >::m_size = 0
private

Definition at line 74 of file StdDeQueue.hpp.


The documentation for this class was generated from the following files: