DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
IterableQueueModel.hpp
Go to the documentation of this file.
1
18// @author Bo Hu (bhu@fb.com)
19// @author Jordan DeLong (delong.j@fb.com)
20
21// Modification by Roland Sipos and Florian Till Groetschla
22// for DUNE-DAQ software framework
23
24#ifndef DATAHANDLINGLIBS_INCLUDE_DATAHANDLINGLIBS_MODELS_ITERABLEQUEUEMODEL_HPP_
25#define DATAHANDLINGLIBS_INCLUDE_DATAHANDLINGLIBS_MODELS_ITERABLEQUEUEMODEL_HPP_
26
29
31
32#include "logging/Logging.hpp"
33
34#include <folly/lang/Align.h>
35
36#include <atomic>
37#include <cassert>
38#include <cstddef>
39#include <cstdlib>
40#include <cxxabi.h>
41#include <iomanip>
42#include <iostream>
43#include <limits>
44#include <memory>
45#include <mutex>
46#include <new>
47#include <stdexcept>
48#include <thread>
49#include <type_traits>
50#include <utility>
51
52#include <xmmintrin.h>
53
54#ifdef WITH_LIBNUMA_SUPPORT
55#include <numa.h>
56#endif
57
58namespace dunedaq {
59namespace datahandlinglibs {
60
70template<class T>
72{
73 typedef T value_type;
74
77
78 // Default constructor
81 , numa_aware_(false)
82 , numa_node_(0)
86 , prefill_ready_(false)
87 , prefill_done_(false)
88 , size_(2)
89 , records_(static_cast<T*>(std::malloc(sizeof(T) * 2)))
90 , readIndex_(0)
91 , writeIndex_(0)
92 {}
93
94 // Explicit constructor with size
95 explicit IterableQueueModel(std::size_t size) // size must be >= 2
96 : LatencyBufferConcept<T>() // NOLINT(build/unsigned)
97 , numa_aware_(false)
98 , numa_node_(0)
100 , alignment_size_(0)
102 , prefill_ready_(false)
103 , prefill_done_(false)
104 , size_(size)
105 , records_(static_cast<T*>(std::malloc(sizeof(T) * size)))
106 , readIndex_(0)
107 , writeIndex_(0)
108 {
109 assert(size >= 2);
110 if (!records_) {
111 throw std::bad_alloc();
112 }
113#if 0
114 ptrlogger = std::thread([&](){
115 while(true) {
116 auto const currentRead = readIndex_.load(std::memory_order_relaxed);
117 auto const currentWrite = writeIndex_.load(std::memory_order_relaxed);
118 TLOG() << "BEG:" << std::hex << &records_[0] << " END:" << &records_[size] << std::dec
119 << " R:" << currentRead << " - W:" << currentWrite
120 << " OFLOW:" << overflow_ctr;
121 std::this_thread::sleep_for(std::chrono::milliseconds(100));
122 }
123 });
124#endif
125 }
126
127 // Constructor with alignment strategies
128 IterableQueueModel(std::size_t size, // size must be >= 2
129 bool numa_aware = false,
130 uint8_t numa_node = 0, // NOLINT (build/unsigned)
131 bool intrinsic_allocator = false,
132 std::size_t alignment_size = 0)
133 : LatencyBufferConcept<T>() // NOLINT(build/unsigned)
134 , numa_aware_(numa_aware)
135 , numa_node_(numa_node)
136 , intrinsic_allocator_(intrinsic_allocator)
137 , alignment_size_(alignment_size)
139 , prefill_ready_(false)
140 , prefill_done_(false)
141 , size_(size)
142 , readIndex_(0)
143 , writeIndex_(0)
144 {
145 assert(size >= 2);
146 allocate_memory(size, numa_aware, numa_node, intrinsic_allocator, alignment_size);
147
148 if (!records_) {
149 throw std::bad_alloc();
150 }
151#if 0
152 ptrlogger = std::thread([&](){
153 while(true) {
154 auto const currentRead = readIndex_.load(std::memory_order_relaxed);
155 auto const currentWrite = writeIndex_.load(std::memory_order_relaxed);
156 TLOG() << "BEG:" << std::hex << &records_[0] << " END:" << &records_[size] << std::dec
157 << " R:" << currentRead << " - W:" << currentWrite
158 << " OFLOW:" << overflow_ctr;
159 std::this_thread::sleep_for(std::chrono::milliseconds(100));
160 }
161 });
162#endif
163 }
164
165 // Destructor
167
168 // Free allocated memory that is different for alignment strategies and allocation policies
169 void free_memory();
170
171 // Allocate memory based on different alignment strategies and allocation policies
172 void allocate_memory(std::size_t size,
173 bool numa_aware /*= false*/,
174 uint8_t numa_node = 0, // NOLINT (build/unsigned)
175 bool intrinsic_allocator = false,
176 std::size_t alignment_size = 0);
177
178 void allocate_memory(std::size_t size) override { allocate_memory(size,false); }
179
180 // Task that fills up the LB.
181 void prefill_task();
182
183 // Issue fre-fill task
184 void force_pagefault();
185
186 // Write element into the queue
187 bool write(T&& record) override;
188
189 // Read element from a queue (move or copy the value at the front of the queue to given variable)
190 bool read(T& record) override;
191
192 // Pop element on front of queue
193 void popFront();
194
195 // Pop number of elements (X) from the front of the queue
196 void pop(std::size_t x);
197
198 // Returns true if the queue is empty
199 bool isEmpty() const;
200
201 // Returns true if write index reached read index
202 bool isFull() const;
203
204 // Returns a good-enough guess on current occupancy:
205 // * If called by consumer, then true size may be more (because producer may
206 // be adding items concurrently).
207 // * If called by producer, then true size may be less (because consumer may
208 // be removing items concurrently).
209 // * It is undefined to call this from any other thread.
210 std::size_t occupancy() const override;
211
212 // The size of the underlying buffer, not the amount of usable slots
213 std::size_t size() const { return size_; }
214
215 // Maximum number of items in the queue.
216 std::size_t capacity() const { return size_ - 1; }
217
218 // Gives a pointer to the current read index
219 const T* front() override;
220
221 // Gives a pointer to the current write index
222 const T* back() override;
223
224 // Gives a pointer to the first available slot of the queue
225 T* start_of_buffer() { return &records_[0]; }
226
227 // Gives a pointer to the last available slot of the queue
228 T* end_of_buffer() { return &records_[size_]; }
229
230 // Configures the model
231 void conf(const appmodel::LatencyBuffer* cfg) override;
232
233 // Unconfigures the model
234 void scrap(const nlohmann::json& /*cfg*/) override;
235
236 // Flushes the elements from the queue
237 void flush() override { pop(occupancy()); }
238
239 // Returns the current memory alignment size
240 std::size_t get_alignment_size() { return alignment_size_; }
241
242 // Iterator for elements in the queue
243 struct Iterator
244 {
245 using iterator_category = std::forward_iterator_tag;
246 using difference_type = std::ptrdiff_t;
247 using value_type = T;
248 using pointer = T*;
249 using reference = T&;
250
251 Iterator(IterableQueueModel<T>& queue, uint32_t index) // NOLINT(build/unsigned)
252 : m_queue(queue)
253 , m_index(index)
254 {}
255
256 reference operator*() const { return m_queue.records_[m_index]; }
257 pointer operator->() { return &m_queue.records_[m_index]; }
258 Iterator& operator++() // NOLINT(runtime/increment_decrement) :)
259 {
260 if (good()) {
261 m_index++;
262 if (m_index == m_queue.size_) {
263 m_index = 0;
264 }
265 }
266 if (!good()) {
267 m_index = std::numeric_limits<uint32_t>::max(); // NOLINT(build/unsigned)
268 }
269 return *this;
270 }
271 Iterator operator++(int amount) // NOLINT(runtime/increment_decrement) :)
272 {
273 Iterator tmp = *this;
274 for (int i = 0; i < amount; ++i) {
275 ++(*this);
276 }
277 return tmp;
278 }
279 friend bool operator==(const Iterator& a, const Iterator& b) { return a.m_index == b.m_index; }
280 friend bool operator!=(const Iterator& a, const Iterator& b) { return a.m_index != b.m_index; }
281
282 bool good()
283 {
284 auto const currentRead = m_queue.readIndex_.load(std::memory_order_relaxed);
285 auto const currentWrite = m_queue.writeIndex_.load(std::memory_order_relaxed);
286 return (*this != m_queue.end()) &&
287 ((m_index >= currentRead && m_index < currentWrite) ||
288 (m_index >= currentRead && currentWrite < currentRead) ||
289 (currentWrite < currentRead && m_index < currentRead && m_index < currentWrite));
290 }
291
292 uint32_t get_index() { return m_index; } // NOLINT(build/unsigned)
293
294 private:
296 uint32_t m_index; // NOLINT(build/unsigned)
297 };
298
300 {
301 auto const currentRead = readIndex_.load(std::memory_order_relaxed);
302 if (currentRead == writeIndex_.load(std::memory_order_acquire)) {
303 // queue is empty
304 return end();
305 }
306 return Iterator(*this, currentRead);
307 }
308
310 {
311 return Iterator(*this, std::numeric_limits<uint32_t>::max()); // NOLINT(build/unsigned)
312 }
313
314protected:
315 virtual void generate_opmon_data() override;
316
317 // Hidden original write implementation with signature difference. Only used for pre-allocation
318 template<class... Args>
319 bool write_(Args&&... recordArgs);
320
321 // Counter for failed writes, due to the fact the queue is full
322 std::atomic<int> overflow_ctr{ 0 };
323
324 // NUMA awareness and aligned allocator usage configuration
326 uint8_t numa_node_; // NOLINT (build/unsigned)
328 std::size_t alignment_size_;
330
331 // Pre-fill and page-fault internal thread control
332 std::string prefiller_name_{"lbpfn"};
333 std::mutex prefill_mutex_;
334 std::condition_variable prefill_cv_;
337
338 // Ptr logger for debugging
339 std::thread ptrlogger;
340
341 // Underlying buffer with padding:
342 // * hardware_destructive_interference_size is set to 128.
343 // * (Assuming cache line size of 64, so we use a cache line pair size of 128)
344 char pad0_[folly::hardware_destructive_interference_size]; // NOLINT(runtime/arrays)
345 uint32_t size_; // NOLINT(build/unsigned)
347 alignas(
348 folly::hardware_destructive_interference_size) std::atomic<unsigned int> readIndex_; // NOLINT(build/unsigned)
349 alignas(
350 folly::hardware_destructive_interference_size) std::atomic<unsigned int> writeIndex_; // NOLINT(build/unsigned)
351 char pad1_[folly::hardware_destructive_interference_size - sizeof(writeIndex_)]; // NOLINT(runtime/arrays)
352};
353
354} // namespace datahandlinglibs
355} // namespace dunedaq
356
357// Declarations
359
360#endif // DATAHANDLINGLIBS_INCLUDE_DATAHANDLINGLIBS_MODELS_ITERABLEQUEUEMODEL_HPP_
#define TLOG(...)
Definition macro.hpp:22
Including Qt Headers.
friend bool operator!=(const Iterator &a, const Iterator &b)
friend bool operator==(const Iterator &a, const Iterator &b)
Iterator(IterableQueueModel< T > &queue, uint32_t index)
char pad0_[folly::hardware_destructive_interference_size]
void flush() override
Flush all elements from the latency buffer.
std::size_t occupancy() const override
Occupancy of LB.
void allocate_memory(std::size_t size, bool numa_aware, uint8_t numa_node=0, bool intrinsic_allocator=false, std::size_t alignment_size=0)
const T * back() override
Get pointer to the back of the LB.
const T * front() override
Write referenced object into LB without moving it.
char pad1_[folly::hardware_destructive_interference_size - sizeof(writeIndex_)]
IterableQueueModel(std::size_t size, bool numa_aware=false, uint8_t numa_node=0, bool intrinsic_allocator=false, std::size_t alignment_size=0)
bool write(T &&record) override
Move referenced object into LB.
void conf(const appmodel::LatencyBuffer *cfg) override
Configure the LB.
IterableQueueModel(const IterableQueueModel &)=delete
IterableQueueModel & operator=(const IterableQueueModel &)=delete
void scrap(const nlohmann::json &) override
Unconfigure the LB.
void pop(std::size_t x)
Pop specified amount of elements from LB.
bool read(T &record) override
Move object from LB to referenced.