DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
StdDeQueue.hxx
Go to the documentation of this file.
1
2#include "ers/ers.hpp"
3
4namespace dunedaq::iomanager {
5
6template<class T>
7StdDeQueue<T>::StdDeQueue(const std::string& name, size_t capacity)
8 : Queue<T>(name)
9 , m_deque()
10 , m_capacity(capacity)
11 , m_size(0)
12{
13 assert(m_deque.max_size() > this->get_capacity());
14}
15
16template<class T>
17void
18StdDeQueue<T>::push(value_t&& object_to_push, const duration_t& timeout)
19{
20
21 auto start_time = std::chrono::steady_clock::now();
22 std::unique_lock<std::mutex> lk(m_mutex, std::defer_lock);
23
24 this->try_lock_for(lk, timeout);
25
26 auto time_to_wait_for_space = (start_time + timeout) - std::chrono::steady_clock::now();
27
28 if (time_to_wait_for_space.count() > 0) {
29 m_no_longer_full.wait_for(lk, time_to_wait_for_space, [&]() { return this->can_push(); });
30 }
31
32 if (this->can_push()) {
33 m_deque.push_back(std::move(object_to_push));
34 m_size++;
35 m_no_longer_empty.notify_one();
36 } else {
37 throw QueueTimeoutExpired(
38 ERS_HERE, this->get_name(), "push", std::chrono::duration_cast<std::chrono::milliseconds>(timeout).count());
39 }
40}
41
42template<class T>
43void
44StdDeQueue<T>::pop(T& val, const duration_t& timeout)
45{
46
47 auto start_time = std::chrono::steady_clock::now();
48 std::unique_lock<std::mutex> lk(m_mutex, std::defer_lock);
49
50 this->try_lock_for(lk, timeout);
51
52 auto time_to_wait_for_data = (start_time + timeout) - std::chrono::steady_clock::now();
53
54 if (time_to_wait_for_data.count() > 0) {
55 m_no_longer_empty.wait_for(lk, time_to_wait_for_data, [&]() { return this->can_pop(); });
56 }
57
58 if (this->can_pop()) {
59 val = std::move(m_deque.front());
60 m_size--;
61 m_deque.pop_front();
62 m_no_longer_full.notify_one();
63 } else {
64 throw QueueTimeoutExpired(
65 ERS_HERE, this->get_name(), "pop", std::chrono::duration_cast<std::chrono::milliseconds>(timeout).count());
66 }
67}
68
69template<class T>
70bool
71StdDeQueue<T>::try_push(value_t&& object_to_push, const duration_t& timeout)
72{
73
74 auto start_time = std::chrono::steady_clock::now();
75 std::unique_lock<std::mutex> lk(m_mutex, std::defer_lock);
76
77 this->try_lock_for(lk, timeout);
78
79 auto time_to_wait_for_space = (start_time + timeout) - std::chrono::steady_clock::now();
80
81 if (time_to_wait_for_space.count() > 0) {
82 m_no_longer_full.wait_for(lk, time_to_wait_for_space, [&]() { return this->can_push(); });
83 }
84
85 if (this->can_push()) {
86 m_deque.push_back(std::move(object_to_push));
87 m_size++;
88 m_no_longer_empty.notify_one();
89 return true;
90 } else {
91 ers::error(QueueTimeoutExpired(
92 ERS_HERE, this->get_name(), "push", std::chrono::duration_cast<std::chrono::milliseconds>(timeout).count()));
93 return false;
94 }
95}
96
97template<class T>
98bool
99StdDeQueue<T>::try_pop(T& val, const duration_t& timeout)
100{
101
102 auto start_time = std::chrono::steady_clock::now();
103 std::unique_lock<std::mutex> lk(m_mutex, std::defer_lock);
104
105 this->try_lock_for(lk, timeout);
106
107 auto time_to_wait_for_data = (start_time + timeout) - std::chrono::steady_clock::now();
108
109 if (time_to_wait_for_data.count() > 0) {
110 m_no_longer_empty.wait_for(lk, time_to_wait_for_data, [&]() { return this->can_pop(); });
111 }
112
113 if (this->can_pop()) {
114 val = std::move(m_deque.front());
115 m_size--;
116 m_deque.pop_front();
117 m_no_longer_full.notify_one();
118 return true;
119 } else {
120 ers::error(QueueTimeoutExpired(
121 ERS_HERE, this->get_name(), "pop", std::chrono::duration_cast<std::chrono::milliseconds>(timeout).count()));
122 return false;
123 }
124}
125
126// This try_lock_for() function was written because while objects of
127// type std::timed_mutex have their own try_lock_for functions, the
128// std::condition_variable::wait_for functions used in this class's push
129// and pop operations require an std::mutex
130
131template<class T>
132void
133StdDeQueue<T>::try_lock_for(std::unique_lock<std::mutex>& lk, const duration_t& timeout)
134{
135 assert(!lk.owns_lock());
136
137 auto start_time = std::chrono::steady_clock::now();
138 auto ret = lk.try_lock();
139
140 if ((!ret || !lk.owns_lock()) && timeout.count() > 0) {
141
142 int approximate_number_of_retries = 5;
143 duration_t pause_between_tries = duration_t(timeout.count() / approximate_number_of_retries);
144
145 while (std::chrono::steady_clock::now() < start_time + timeout) {
146 std::this_thread::sleep_for(pause_between_tries);
147 ret = lk.try_lock();
148 if (ret && lk.owns_lock()) {
149 break;
150 }
151 }
152 }
153
154 if (!lk.owns_lock()) {
155 throw QueueTimeoutExpired(
156 ERS_HERE, this->get_name(), "lock mutex", std::chrono::duration_cast<std::chrono::milliseconds>(timeout).count());
157 }
158}
159
160} // namespace dunedaq::iomanager
#define ERS_HERE
Implementations of the Queue class are responsible for relaying data between DAQModules within a DAQ ...
Definition Queue.hpp:40
void pop(value_t &val, const duration_t &) override
Pop the first value off of the queue.
std::deque< value_t > m_deque
void push(value_t &&, const duration_t &) override
Push a value onto the Queue.
void try_lock_for(std::unique_lock< std::mutex > &, const duration_t &)
StdDeQueue(const std::string &name, size_t capacity)
StdDeQueue Constructor.
Definition StdDeQueue.hxx:7
bool try_pop(value_t &val, const duration_t &) override
typename Queue< T >::duration_t duration_t
Type used for expressing timeouts.
bool try_push(value_t &&, const duration_t &) override
T value_t
Type of data stored in the StdDeQueue.
Cannot add TPSet with start_time
void error(const Issue &issue)
Definition ers.hpp:81