10 , m_capacity(capacity)
13 assert(
m_deque.max_size() > this->get_capacity());
21 auto start_time = std::chrono::steady_clock::now();
22 std::unique_lock<std::mutex> lk(m_mutex, std::defer_lock);
24 this->try_lock_for(lk, timeout);
26 auto time_to_wait_for_space = (
start_time + timeout) - std::chrono::steady_clock::now();
28 if (time_to_wait_for_space.count() > 0) {
29 m_no_longer_full.wait_for(lk, time_to_wait_for_space, [&]() {
return this->can_push(); });
32 if (this->can_push()) {
33 m_deque.push_back(std::move(object_to_push));
35 m_no_longer_empty.notify_one();
37 throw QueueTimeoutExpired(
38 ERS_HERE, this->get_name(),
"push", std::chrono::duration_cast<std::chrono::milliseconds>(timeout).count());
47 auto start_time = std::chrono::steady_clock::now();
48 std::unique_lock<std::mutex> lk(m_mutex, std::defer_lock);
50 this->try_lock_for(lk, timeout);
52 auto time_to_wait_for_data = (
start_time + timeout) - std::chrono::steady_clock::now();
54 if (time_to_wait_for_data.count() > 0) {
55 m_no_longer_empty.wait_for(lk, time_to_wait_for_data, [&]() {
return this->can_pop(); });
58 if (this->can_pop()) {
59 val = std::move(m_deque.front());
62 m_no_longer_full.notify_one();
64 throw QueueTimeoutExpired(
65 ERS_HERE, this->get_name(),
"pop", std::chrono::duration_cast<std::chrono::milliseconds>(timeout).count());
74 auto start_time = std::chrono::steady_clock::now();
75 std::unique_lock<std::mutex> lk(m_mutex, std::defer_lock);
77 this->try_lock_for(lk, timeout);
79 auto time_to_wait_for_space = (
start_time + timeout) - std::chrono::steady_clock::now();
81 if (time_to_wait_for_space.count() > 0) {
82 m_no_longer_full.wait_for(lk, time_to_wait_for_space, [&]() {
return this->can_push(); });
85 if (this->can_push()) {
86 m_deque.push_back(std::move(object_to_push));
88 m_no_longer_empty.notify_one();
92 ERS_HERE, this->get_name(),
"push", std::chrono::duration_cast<std::chrono::milliseconds>(timeout).count()));
102 auto start_time = std::chrono::steady_clock::now();
103 std::unique_lock<std::mutex> lk(m_mutex, std::defer_lock);
105 this->try_lock_for(lk, timeout);
107 auto time_to_wait_for_data = (
start_time + timeout) - std::chrono::steady_clock::now();
109 if (time_to_wait_for_data.count() > 0) {
110 m_no_longer_empty.wait_for(lk, time_to_wait_for_data, [&]() {
return this->can_pop(); });
113 if (this->can_pop()) {
114 val = std::move(m_deque.front());
117 m_no_longer_full.notify_one();
121 ERS_HERE, this->get_name(),
"pop", std::chrono::duration_cast<std::chrono::milliseconds>(timeout).count()));
135 assert(!lk.owns_lock());
137 auto start_time = std::chrono::steady_clock::now();
138 auto ret = lk.try_lock();
140 if ((!ret || !lk.owns_lock()) && timeout.count() > 0) {
142 int approximate_number_of_retries = 5;
145 while (std::chrono::steady_clock::now() <
start_time + timeout) {
146 std::this_thread::sleep_for(pause_between_tries);
148 if (ret && lk.owns_lock()) {
154 if (!lk.owns_lock()) {
155 throw QueueTimeoutExpired(
156 ERS_HERE, this->get_name(),
"lock mutex", std::chrono::duration_cast<std::chrono::milliseconds>(timeout).count());
Implementations of the Queue class are responsible for relaying data between DAQModules within a DAQ ...
void pop(value_t &val, const duration_t &) override
Pop the first value off of the queue.
std::deque< value_t > m_deque
void push(value_t &&, const duration_t &) override
Push a value onto the Queue.
void try_lock_for(std::unique_lock< std::mutex > &, const duration_t &)
StdDeQueue(const std::string &name, size_t capacity)
StdDeQueue Constructor.
bool try_pop(value_t &val, const duration_t &) override
typename Queue< T >::duration_t duration_t
Type used for expressing timeouts.
bool try_push(value_t &&, const duration_t &) override
T value_t
Type of data stored in the StdDeQueue.
Cannot add TPSet with start_time
void error(const Issue &issue)