DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
IterableQueueModel.hxx
Go to the documentation of this file.
1// Declarations for IterableQueueModel
2
3namespace dunedaq {
4namespace datahandlinglibs {
5
6// Free allocated memory that is different for alignment strategies and allocation policies
7template<class T>
8void
10{
11 // We need to destruct anything that may still exist in our queue.
12 // (No real synchronization needed at destructor time: only one
13 // thread can be doing this.)
14 if (!std::is_trivially_destructible<T>::value) {
15 std::size_t readIndex = readIndex_;
16 std::size_t endIndex = writeIndex_;
17 while (readIndex != endIndex) {
18 records_[readIndex].~T();
19 if (++readIndex == size_) { // NOLINT(runtime/increment_decrement)
20 readIndex = 0;
21 }
22 }
23 }
24 // Different allocators require custom free functions
25 if (intrinsic_allocator_) {
26 _mm_free(records_);
27 } else if (numa_aware_) {
28#ifdef WITH_LIBNUMA_SUPPORT
29 numa_free(records_, sizeof(T) * size_);
30#endif
31 } else {
32 std::free(records_);
33 }
34}
35
36// Allocate memory based on different alignment strategies and allocation policies
37template<class T>
38void
40 bool numa_aware,
41 uint8_t numa_node, // NOLINT (build/unsigned)
42 bool intrinsic_allocator,
43 std::size_t alignment_size)
44{
45 assert(size >= 2);
46 // TODO: check for valid alignment sizes! | July-21-2021 | Roland Sipos | rsipos@cern.ch
47
48 if (numa_aware && numa_node < 8) { // numa allocator from libnuma; we get "numa_node >= 0" for free, given its datatype
49#ifdef WITH_LIBNUMA_SUPPORT
50 numa_set_preferred((unsigned)numa_node); // https://linux.die.net/man/3/numa_set_preferred
51 #ifdef WITH_LIBNUMA_BIND_POLICY
52 numa_set_bind_policy(WITH_LIBNUMA_BIND_POLICY); // https://linux.die.net/man/3/numa_set_bind_policy
53 #endif
54 #ifdef WITH_LIBNUMA_STRICT_POLICY
55 numa_set_strict(WITH_LIBNUMA_STRICT_POLICY); // https://linux.die.net/man/3/numa_set_strict
56 #endif
57 records_ = static_cast<T*>(numa_alloc_onnode(sizeof(T) * size, numa_node));
58#else
60 "NUMA allocation was requested but program was built without USE_LIBNUMA");
61#endif
62 } else if (intrinsic_allocator && alignment_size > 0) { // _mm allocator
63 records_ = static_cast<T*>(_mm_malloc(sizeof(T) * size, alignment_size));
64 } else if (!intrinsic_allocator && alignment_size > 0) { // std aligned allocator
65 records_ = static_cast<T*>(std::aligned_alloc(alignment_size, sizeof(T) * size));
66 } else if (!numa_aware && !intrinsic_allocator && alignment_size == 0) {
67 // Standard allocator
68 records_ = static_cast<T*>(std::malloc(sizeof(T) * size));
69
70 } else {
71 // Let it fail, as expected combination might be invalid
72 // records_ = static_cast<T*>(std::malloc(sizeof(T) * size_);
73 }
74
75 size_ = size;
76 numa_aware_ = numa_aware;
77 numa_node_ = numa_node;
78 intrinsic_allocator_ = intrinsic_allocator;
79 alignment_size_ = alignment_size;
80}
81
82template<class T>
83void
85{
86 // Wait until LB issues ready
87 std::unique_lock lk(prefill_mutex_);
88 prefill_cv_.wait(lk, [this]{ return prefill_ready_; });
89
90 // After wait, we are ready to force page-fault
91 for (size_t i = 0; i < size_ - 1; ++i) {
92 T element = T();
93 write_(std::move(element));
94 }
95 flush();
96
97 // Preallocation done
98 prefill_done_ = true;
99
100 // Manual unlock is done before notify: avoid waking up the waiting thread only to block again.
101 lk.unlock();
102 prefill_cv_.notify_one();
103}
104
105template<class T>
106void
108{
109 // Local prefiller thread
110 std::thread prefill_thread(&IterableQueueModel<T>::prefill_task, this);
111
112 // Tweak prefiller thread
113 char tname[16];
114 snprintf(tname, 16, "%s-%d", prefiller_name_.c_str(), numa_node_);
115 auto handle = prefill_thread.native_handle();
116 pthread_setname_np(handle, tname);
117
118#ifdef WITH_LIBNUMA_SUPPORT
119 cpu_set_t affinitymask;
120 CPU_ZERO(&affinitymask);
121 struct bitmask *nodecpumask = numa_allocate_cpumask();
122 int ret = 0;
123 // Get NODE CPU mask
124 ret = numa_node_to_cpus(numa_node_, nodecpumask);
125 assert(ret == 0);
126 // Apply corresponding NODE CPUs to affinity mask
127 for (int i=0; i< numa_num_configured_cpus(); ++i) {
128 if (numa_bitmask_isbitset(nodecpumask, i)) {
129 CPU_SET(i, &affinitymask);
130 }
131 }
132 ret = pthread_setaffinity_np(handle, sizeof(cpu_set_t), &affinitymask);
133 assert(ret == 0);
134 numa_free_cpumask(nodecpumask);
135#endif
136
137 // Trigger prefiller thread
138 {
139 std::lock_guard lk(prefill_mutex_);
140 prefill_ready_ = true;
141 }
142 prefill_cv_.notify_one();
143 // Wait for prefiller thread to finish
144 {
145 std::unique_lock lk(prefill_mutex_);
146 prefill_cv_.wait(lk, [this]{ return prefill_done_; });
147 }
148 // Join with prefiller thread
149 prefill_thread.join();
150}
151
152// Write element into the queue
153template<class T>
154bool
156{
157 auto const currentWrite = writeIndex_.load(std::memory_order_relaxed);
158 auto nextRecord = currentWrite + 1;
159 if (nextRecord == size_) {
160 nextRecord = 0;
161 }
162
163 if (nextRecord != readIndex_.load(std::memory_order_acquire)) {
164 new (&records_[currentWrite]) T(std::move(record));
165 writeIndex_.store(nextRecord, std::memory_order_release);
166 return true;
167 }
168
169 // queue is full
170 ++overflow_ctr;
171 return false;
172}
173
174// Read element from a queue (move or copy the value at the front of the queue to given variable)
175template<class T>
176bool
178{
179 auto const currentRead = readIndex_.load(std::memory_order_relaxed);
180 if (currentRead == writeIndex_.load(std::memory_order_acquire)) {
181 // queue is empty
182 return false;
183 }
184
185 auto nextRecord = currentRead + 1;
186 if (nextRecord == size_) {
187 nextRecord = 0;
188 }
189 record = std::move(records_[currentRead]);
190 records_[currentRead].~T();
191 readIndex_.store(nextRecord, std::memory_order_release);
192 return true;
193}
194
195// Pop element on front of queue
196template<class T>
197void
199{
200 auto const currentRead = readIndex_.load(std::memory_order_relaxed);
201 assert(currentRead != writeIndex_.load(std::memory_order_acquire));
202
203 auto nextRecord = currentRead + 1;
204 if (nextRecord == size_) {
205 nextRecord = 0;
206 }
207
208 records_[currentRead].~T();
209 readIndex_.store(nextRecord, std::memory_order_release);
210}
211
212// Pop number of elements (X) from the front of the queue
213template<class T>
214void
216{
217 for (std::size_t i = 0; i < x; i++) {
218 popFront();
219 }
220}
221
222// Returns true if the queue is empty
223template<class T>
224bool
226{
227 return readIndex_.load(std::memory_order_acquire) == writeIndex_.load(std::memory_order_acquire);
228}
229
230// Returns true if write index reached read index
231template<class T>
232bool
234{
235 auto nextRecord = writeIndex_.load(std::memory_order_acquire) + 1;
236 if (nextRecord == size_) {
237 nextRecord = 0;
238 }
239 if (nextRecord != readIndex_.load(std::memory_order_acquire)) {
240 return false;
241 }
242 // queue is full
243 return true;
244}
245
246// Returns a good-enough guess on current occupancy:
247// * If called by consumer, then true size may be more (because producer may
248// be adding items concurrently).
249// * If called by producer, then true size may be less (because consumer may
250// be removing items concurrently).
251// * It is undefined to call this from any other thread.
252template<class T>
253std::size_t
255{
256 int ret = static_cast<int>(writeIndex_.load(std::memory_order_acquire)) -
257 static_cast<int>(readIndex_.load(std::memory_order_acquire));
258 if (ret < 0) {
259 ret += static_cast<int>(size_);
260 }
261 return static_cast<std::size_t>(ret);
262}
263
264// Gives a pointer to the current read index
265template<class T>
266const T*
268{
269 auto const currentRead = readIndex_.load(std::memory_order_relaxed);
270 if (currentRead == writeIndex_.load(std::memory_order_acquire)) {
271 return nullptr;
272 }
273 return &records_[currentRead];
274}
275
276// Gives a pointer to the current write index
277template<class T>
278const T*
280{
281 auto const currentWrite = writeIndex_.load(std::memory_order_relaxed);
282 if (currentWrite == readIndex_.load(std::memory_order_acquire)) {
283 return nullptr;
284 }
285 int currentLast = currentWrite;
286 if (currentLast == 0) {
287 currentLast = size_ - 1;
288 } else {
289 currentLast--;
290 }
291 return &records_[currentLast];
292}
293
294// Configures the model
295template<class T>
296void
298{
299 assert(cfg->get_size() >= 2);
300 free_memory();
301
302 allocate_memory(cfg->get_size(),
303 cfg->get_numa_aware(),
304 cfg->get_numa_node(),
306 cfg->get_alignment_size());
307 readIndex_ = 0;
308 writeIndex_ = 0;
309
310 if (!records_) {
311 throw std::bad_alloc();
312 }
313
314 if (cfg->get_preallocation()) {
315 force_pagefault();
316 }
317}
318
319// Unconfigures the model
320template<class T>
321void
322IterableQueueModel<T>::scrap(const nlohmann::json& /*cfg*/)
323{
324 free_memory();
325 numa_aware_ = false;
326 numa_node_ = 0;
327 intrinsic_allocator_ = false;
328 alignment_size_ = 0;
329 invalid_configuration_requested_ = false;
330 prefill_ready_ = false;
331 prefill_done_ = false;
332 size_ = 2;
333 records_ = static_cast<T*>(std::malloc(sizeof(T) * 2));
334 readIndex_ = 0;
335 writeIndex_ = 0;
336}
337
338// Hidden original write implementation with signature difference. Only used for pre-allocation
339template<class T>
340template<class... Args>
341bool
342IterableQueueModel<T>::write_(Args&&... recordArgs)
343{
344 // const std::lock_guard<std::mutex> lock(m_mutex);
345 auto const currentWrite = writeIndex_.load(std::memory_order_relaxed);
346 auto nextRecord = currentWrite + 1;
347 if (nextRecord == size_) {
348 nextRecord = 0;
349 }
350 // if (nextRecord == readIndex_.load(std::memory_order_acquire)) {
351 // std::cout << "SPSC WARNING -> Queue is full! WRITE PASSES READ!!! \n";
352 //}
353 // new (&records_[currentWrite]) T(std::forward<Args>(recordArgs)...);
354 // writeIndex_.store(nextRecord, std::memory_order_release);
355 // return true;
356
357 // ORIGINAL:
358 if (nextRecord != readIndex_.load(std::memory_order_acquire)) {
359 new (&records_[currentWrite]) T(std::forward<Args>(recordArgs)...);
360 writeIndex_.store(nextRecord, std::memory_order_release);
361 return true;
362 }
363 // queue is full
364
365 ++overflow_ctr;
366
367 return false;
368}
369
370template<class T>
371void
374 info.set_num_buffer_elements(this->occupancy());
375 this->publish(std::move(info));
376
377}
378
379} // namespace datahandlinglibs
380} // namespace dunedaq
#define ERS_HERE
uint32_t get_alignment_size() const
Get "alignment_size" attribute value.
bool get_preallocation() const
Get "preallocation" attribute value.
int16_t get_numa_node() const
Get "numa_node" attribute value.
uint32_t get_size() const
Get "size" attribute value.
bool get_intrinsic_allocator() const
Get "intrinsic_allocator" attribute value.
bool get_numa_aware() const
Get "numa_aware" attribute value.
Including Qt Headers.
FELIX Initialization std::string initerror FELIX queue timed std::string queuename Unexpected chunk size
SourceID[" << sourceid << "] Command daqdataformats::SourceID Readout Initialization std::string initerror Configuration std::string conferror GenericConfigurationError
std::size_t occupancy() const override
Occupancy of LB.
void allocate_memory(std::size_t size, bool numa_aware, uint8_t numa_node=0, bool intrinsic_allocator=false, std::size_t alignment_size=0)
const T * back() override
Get pointer to the back of the LB.
const T * front() override
Write referenced object into LB without moving it.
bool write(T &&record) override
Move referenced object into LB.
void conf(const appmodel::LatencyBuffer *cfg) override
Configure the LB.
void scrap(const nlohmann::json &) override
Unconfigure the LB.
void pop(std::size_t x)
Pop specified amount of elements from LB.
bool read(T &record) override
Move object from LB to referenced.