DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
dunedaq::datahandlinglibs::IterableQueueModel< T > Struct Template Reference

#include <IterableQueueModel.hpp>

Inheritance diagram for dunedaq::datahandlinglibs::IterableQueueModel< T >:
[legend]
Collaboration diagram for dunedaq::datahandlinglibs::IterableQueueModel< T >:
[legend]

Classes

struct  Iterator
 

Public Types

typedef T value_type
 
- Public Types inherited from dunedaq::opmonlib::MonitorableObject
using NodePtr = std::weak_ptr<MonitorableObject>
 
using NewNodePtr = std::shared_ptr<MonitorableObject>
 
using ElementId = std::string
 

Public Member Functions

 IterableQueueModel (const IterableQueueModel &)=delete
 
IterableQueueModeloperator= (const IterableQueueModel &)=delete
 
 IterableQueueModel ()
 
 IterableQueueModel (std::size_t size)
 
 IterableQueueModel (std::size_t size, bool numa_aware=false, uint8_t numa_node=0, bool intrinsic_allocator=false, std::size_t alignment_size=0)
 
 ~IterableQueueModel ()
 
void free_memory ()
 
void allocate_memory (std::size_t size, bool numa_aware, uint8_t numa_node=0, bool intrinsic_allocator=false, std::size_t alignment_size=0)
 
void allocate_memory (std::size_t size) override
 
void prefill_task ()
 
void force_pagefault ()
 
bool write (T &&record) override
 Move referenced object into LB.
 
bool read (T &record) override
 Move object from LB to referenced.
 
void popFront ()
 
void pop (std::size_t x)
 Pop specified amount of elements from LB.
 
bool isEmpty () const
 
bool isFull () const
 
std::size_t occupancy () const override
 Occupancy of LB.
 
std::size_t size () const
 
std::size_t capacity () const
 
const T * front () override
 Write referenced object into LB without moving it.
 
const T * back () override
 Get pointer to the back of the LB.
 
T * start_of_buffer ()
 
T * end_of_buffer ()
 
void conf (const appmodel::LatencyBuffer *cfg) override
 Configure the LB.
 
void scrap (const nlohmann::json &) override
 Unconfigure the LB.
 
void flush () override
 Flush all elements from the latency buffer.
 
std::size_t get_alignment_size ()
 
Iterator begin ()
 
Iterator end ()
 
- Public Member Functions inherited from dunedaq::datahandlinglibs::LatencyBufferConcept< T >
 LatencyBufferConcept ()
 
virtual ~LatencyBufferConcept ()
 
 LatencyBufferConcept (const LatencyBufferConcept &)=delete
 LatencyBufferConcept is not copy-constructible.
 
LatencyBufferConceptoperator= (const LatencyBufferConcept &)=delete
 LatencyBufferConcept is not copy-assginable.
 
 LatencyBufferConcept (LatencyBufferConcept &&)=delete
 LatencyBufferConcept is not move-constructible.
 
LatencyBufferConceptoperator= (LatencyBufferConcept &&)=delete
 LatencyBufferConcept is not move-assignable.
 
virtual void allocate_memory (size_t)=0
 Whether or not the buffer is allocatable. false by default.
 
- Public Member Functions inherited from dunedaq::opmonlib::MonitorableObject
 MonitorableObject (const MonitorableObject &)=delete
 
MonitorableObjectoperator= (const MonitorableObject &)=delete
 
 MonitorableObject (MonitorableObject &&)=delete
 
MonitorableObjectoperator= (MonitorableObject &&)=delete
 
virtual ~MonitorableObject ()=default
 
auto get_opmon_id () const noexcept
 
auto get_opmon_level () const noexcept
 

Protected Member Functions

virtual void generate_opmon_data () override
 
template<class... Args>
bool write_ (Args &&... recordArgs)
 
- Protected Member Functions inherited from dunedaq::opmonlib::MonitorableObject
 MonitorableObject ()=default
 
void register_node (ElementId name, NewNodePtr)
 
void publish (google::protobuf::Message &&, CustomOrigin &&co={}, OpMonLevel l=to_level(EntryOpMonLevel::kDefault)) const noexcept
 

Protected Attributes

std::atomic< int > overflow_ctr { 0 }
 
bool numa_aware_
 
uint8_t numa_node_
 
bool intrinsic_allocator_
 
std::size_t alignment_size_
 
bool invalid_configuration_requested_
 
std::string prefiller_name_ {"lbpfn"}
 
std::mutex prefill_mutex_
 
std::condition_variable prefill_cv_
 
bool prefill_ready_
 
bool prefill_done_
 
std::thread ptrlogger
 
char pad0_ [folly::hardware_destructive_interference_size]
 
uint32_t size_
 
T * records_
 
std::atomic< unsigned int > readIndex_
 
std::atomic< unsigned int > writeIndex_
 
char pad1_ [folly::hardware_destructive_interference_size - sizeof(writeIndex_)]
 

Additional Inherited Members

- Static Public Member Functions inherited from dunedaq::opmonlib::MonitorableObject
static bool publishable_metric (OpMonLevel entry, OpMonLevel system) noexcept
 

Detailed Description

template<class T>
struct dunedaq::datahandlinglibs::IterableQueueModel< T >

IterableQueueModel is a one producer and one consumer queue without locks. Modified version of the folly::ProducerConsumerQueue via adding a readPtr function. Requires well defined and followed constraints on the consumer side.

Also, note that the number of usable slots in the queue at any given time is actually (size-1), so if you start with an empty queue, isFull() will return true after size-1 insertions.

Definition at line 71 of file IterableQueueModel.hpp.

Member Typedef Documentation

◆ value_type

template<class T >
T dunedaq::datahandlinglibs::IterableQueueModel< T >::value_type

Definition at line 73 of file IterableQueueModel.hpp.

Constructor & Destructor Documentation

◆ IterableQueueModel() [1/4]

template<class T >
dunedaq::datahandlinglibs::IterableQueueModel< T >::IterableQueueModel ( const IterableQueueModel< T > & )
delete

◆ IterableQueueModel() [2/4]

template<class T >
dunedaq::datahandlinglibs::IterableQueueModel< T >::IterableQueueModel ( )
inline

Definition at line 79 of file IterableQueueModel.hpp.

81 , numa_aware_(false)
82 , numa_node_(0)
86 , prefill_ready_(false)
87 , prefill_done_(false)
88 , size_(2)
89 , records_(static_cast<T*>(std::malloc(sizeof(T) * 2)))
90 , readIndex_(0)
91 , writeIndex_(0)
92 {}

◆ IterableQueueModel() [3/4]

template<class T >
dunedaq::datahandlinglibs::IterableQueueModel< T >::IterableQueueModel ( std::size_t size)
inlineexplicit

Definition at line 95 of file IterableQueueModel.hpp.

96 : LatencyBufferConcept<T>() // NOLINT(build/unsigned)
97 , numa_aware_(false)
98 , numa_node_(0)
100 , alignment_size_(0)
102 , prefill_ready_(false)
103 , prefill_done_(false)
104 , size_(size)
105 , records_(static_cast<T*>(std::malloc(sizeof(T) * size)))
106 , readIndex_(0)
107 , writeIndex_(0)
108 {
109 assert(size >= 2);
110 if (!records_) {
111 throw std::bad_alloc();
112 }
113#if 0
114 ptrlogger = std::thread([&](){
115 while(true) {
116 auto const currentRead = readIndex_.load(std::memory_order_relaxed);
117 auto const currentWrite = writeIndex_.load(std::memory_order_relaxed);
118 TLOG() << "BEG:" << std::hex << &records_[0] << " END:" << &records_[size] << std::dec
119 << " R:" << currentRead << " - W:" << currentWrite
120 << " OFLOW:" << overflow_ctr;
121 std::this_thread::sleep_for(std::chrono::milliseconds(100));
122 }
123 });
124#endif
125 }
#define TLOG(...)
Definition macro.hpp:22

◆ IterableQueueModel() [4/4]

template<class T >
dunedaq::datahandlinglibs::IterableQueueModel< T >::IterableQueueModel ( std::size_t size,
bool numa_aware = false,
uint8_t numa_node = 0,
bool intrinsic_allocator = false,
std::size_t alignment_size = 0 )
inline

Definition at line 128 of file IterableQueueModel.hpp.

133 : LatencyBufferConcept<T>() // NOLINT(build/unsigned)
134 , numa_aware_(numa_aware)
135 , numa_node_(numa_node)
136 , intrinsic_allocator_(intrinsic_allocator)
137 , alignment_size_(alignment_size)
139 , prefill_ready_(false)
140 , prefill_done_(false)
141 , size_(size)
142 , readIndex_(0)
143 , writeIndex_(0)
144 {
145 assert(size >= 2);
146 allocate_memory(size, numa_aware, numa_node, intrinsic_allocator, alignment_size);
147
148 if (!records_) {
149 throw std::bad_alloc();
150 }
151#if 0
152 ptrlogger = std::thread([&](){
153 while(true) {
154 auto const currentRead = readIndex_.load(std::memory_order_relaxed);
155 auto const currentWrite = writeIndex_.load(std::memory_order_relaxed);
156 TLOG() << "BEG:" << std::hex << &records_[0] << " END:" << &records_[size] << std::dec
157 << " R:" << currentRead << " - W:" << currentWrite
158 << " OFLOW:" << overflow_ctr;
159 std::this_thread::sleep_for(std::chrono::milliseconds(100));
160 }
161 });
162#endif
163 }
void allocate_memory(std::size_t size, bool numa_aware, uint8_t numa_node=0, bool intrinsic_allocator=false, std::size_t alignment_size=0)

◆ ~IterableQueueModel()

Member Function Documentation

◆ allocate_memory() [1/2]

template<class T >
void dunedaq::datahandlinglibs::IterableQueueModel< T >::allocate_memory ( std::size_t size)
inlineoverride

Definition at line 178 of file IterableQueueModel.hpp.

178{ allocate_memory(size,false); }

◆ allocate_memory() [2/2]

template<class T >
void dunedaq::datahandlinglibs::IterableQueueModel< T >::allocate_memory ( std::size_t size,
bool numa_aware,
uint8_t numa_node = 0,
bool intrinsic_allocator = false,
std::size_t alignment_size = 0 )

Definition at line 39 of file IterableQueueModel.hxx.

44{
45 assert(size >= 2);
46 // TODO: check for valid alignment sizes! | July-21-2021 | Roland Sipos | rsipos@cern.ch
47
48 if (numa_aware && numa_node < 8) { // numa allocator from libnuma; we get "numa_node >= 0" for free, given its datatype
49#ifdef WITH_LIBNUMA_SUPPORT
50 numa_set_preferred((unsigned)numa_node); // https://linux.die.net/man/3/numa_set_preferred
51 #ifdef WITH_LIBNUMA_BIND_POLICY
52 numa_set_bind_policy(WITH_LIBNUMA_BIND_POLICY); // https://linux.die.net/man/3/numa_set_bind_policy
53 #endif
54 #ifdef WITH_LIBNUMA_STRICT_POLICY
55 numa_set_strict(WITH_LIBNUMA_STRICT_POLICY); // https://linux.die.net/man/3/numa_set_strict
56 #endif
57 records_ = static_cast<T*>(numa_alloc_onnode(sizeof(T) * size, numa_node));
58#else
60 "NUMA allocation was requested but program was built without USE_LIBNUMA");
61#endif
62 } else if (intrinsic_allocator && alignment_size > 0) { // _mm allocator
63 records_ = static_cast<T*>(_mm_malloc(sizeof(T) * size, alignment_size));
64 } else if (!intrinsic_allocator && alignment_size > 0) { // std aligned allocator
65 records_ = static_cast<T*>(std::aligned_alloc(alignment_size, sizeof(T) * size));
66 } else if (!numa_aware && !intrinsic_allocator && alignment_size == 0) {
67 // Standard allocator
68 records_ = static_cast<T*>(std::malloc(sizeof(T) * size));
69
70 } else {
71 // Let it fail, as expected combination might be invalid
72 // records_ = static_cast<T*>(std::malloc(sizeof(T) * size_);
73 }
74
75 size_ = size;
76 numa_aware_ = numa_aware;
77 numa_node_ = numa_node;
78 intrinsic_allocator_ = intrinsic_allocator;
79 alignment_size_ = alignment_size;
80}
#define ERS_HERE
SourceID[" << sourceid << "] Command daqdataformats::SourceID Readout Initialization std::string initerror Configuration std::string conferror GenericConfigurationError

◆ back()

template<class T >
const T * dunedaq::datahandlinglibs::IterableQueueModel< T >::back ( )
overridevirtual

Get pointer to the back of the LB.

Implements dunedaq::datahandlinglibs::LatencyBufferConcept< T >.

Definition at line 279 of file IterableQueueModel.hxx.

280{
281 auto const currentWrite = writeIndex_.load(std::memory_order_relaxed);
282 if (currentWrite == readIndex_.load(std::memory_order_acquire)) {
283 return nullptr;
284 }
285 int currentLast = currentWrite;
286 if (currentLast == 0) {
287 currentLast = size_ - 1;
288 } else {
289 currentLast--;
290 }
291 return &records_[currentLast];
292}

◆ begin()

template<class T >
Iterator dunedaq::datahandlinglibs::IterableQueueModel< T >::begin ( )
inline

Definition at line 299 of file IterableQueueModel.hpp.

300 {
301 auto const currentRead = readIndex_.load(std::memory_order_relaxed);
302 if (currentRead == writeIndex_.load(std::memory_order_acquire)) {
303 // queue is empty
304 return end();
305 }
306 return Iterator(*this, currentRead);
307 }

◆ capacity()

template<class T >
std::size_t dunedaq::datahandlinglibs::IterableQueueModel< T >::capacity ( ) const
inline

Definition at line 216 of file IterableQueueModel.hpp.

216{ return size_ - 1; }

◆ conf()

template<class T >
void dunedaq::datahandlinglibs::IterableQueueModel< T >::conf ( const appmodel::LatencyBuffer * conf)
overridevirtual

Configure the LB.

Implements dunedaq::datahandlinglibs::LatencyBufferConcept< T >.

Definition at line 297 of file IterableQueueModel.hxx.

298{
299 assert(cfg->get_size() >= 2);
300 free_memory();
301
302 allocate_memory(cfg->get_size(),
303 cfg->get_numa_aware(),
304 cfg->get_numa_node(),
305 cfg->get_intrinsic_allocator(),
306 cfg->get_alignment_size());
307 readIndex_ = 0;
308 writeIndex_ = 0;
309
310 if (!records_) {
311 throw std::bad_alloc();
312 }
313
314 if (cfg->get_preallocation()) {
316 }
317}

◆ end()

template<class T >
Iterator dunedaq::datahandlinglibs::IterableQueueModel< T >::end ( )
inline

Definition at line 309 of file IterableQueueModel.hpp.

310 {
311 return Iterator(*this, std::numeric_limits<uint32_t>::max()); // NOLINT(build/unsigned)
312 }

◆ end_of_buffer()

template<class T >
T * dunedaq::datahandlinglibs::IterableQueueModel< T >::end_of_buffer ( )
inline

Definition at line 228 of file IterableQueueModel.hpp.

228{ return &records_[size_]; }

◆ flush()

template<class T >
void dunedaq::datahandlinglibs::IterableQueueModel< T >::flush ( )
inlineoverridevirtual

Flush all elements from the latency buffer.

Implements dunedaq::datahandlinglibs::LatencyBufferConcept< T >.

Definition at line 237 of file IterableQueueModel.hpp.

237{ pop(occupancy()); }
std::size_t occupancy() const override
Occupancy of LB.
void pop(std::size_t x)
Pop specified amount of elements from LB.

◆ force_pagefault()

template<class T >
void dunedaq::datahandlinglibs::IterableQueueModel< T >::force_pagefault ( )

Definition at line 107 of file IterableQueueModel.hxx.

108{
109 // Local prefiller thread
110 std::thread prefill_thread(&IterableQueueModel<T>::prefill_task, this);
111
112 // Tweak prefiller thread
113 char tname[16];
114 snprintf(tname, 16, "%s-%d", prefiller_name_.c_str(), numa_node_);
115 auto handle = prefill_thread.native_handle();
116 pthread_setname_np(handle, tname);
117
118#ifdef WITH_LIBNUMA_SUPPORT
119 cpu_set_t affinitymask;
120 CPU_ZERO(&affinitymask);
121 struct bitmask *nodecpumask = numa_allocate_cpumask();
122 int ret = 0;
123 // Get NODE CPU mask
124 ret = numa_node_to_cpus(numa_node_, nodecpumask);
125 assert(ret == 0);
126 // Apply corresponding NODE CPUs to affinity mask
127 for (int i=0; i< numa_num_configured_cpus(); ++i) {
128 if (numa_bitmask_isbitset(nodecpumask, i)) {
129 CPU_SET(i, &affinitymask);
130 }
131 }
132 ret = pthread_setaffinity_np(handle, sizeof(cpu_set_t), &affinitymask);
133 assert(ret == 0);
134 numa_free_cpumask(nodecpumask);
135#endif
136
137 // Trigger prefiller thread
138 {
139 std::lock_guard lk(prefill_mutex_);
140 prefill_ready_ = true;
141 }
142 prefill_cv_.notify_one();
143 // Wait for prefiller thread to finish
144 {
145 std::unique_lock lk(prefill_mutex_);
146 prefill_cv_.wait(lk, [this]{ return prefill_done_; });
147 }
148 // Join with prefiller thread
149 prefill_thread.join();
150}

◆ free_memory()

template<class T >
void dunedaq::datahandlinglibs::IterableQueueModel< T >::free_memory ( )

Definition at line 9 of file IterableQueueModel.hxx.

10{
11 // We need to destruct anything that may still exist in our queue.
12 // (No real synchronization needed at destructor time: only one
13 // thread can be doing this.)
14 if (!std::is_trivially_destructible<T>::value) {
15 std::size_t readIndex = readIndex_;
16 std::size_t endIndex = writeIndex_;
17 while (readIndex != endIndex) {
18 records_[readIndex].~T();
19 if (++readIndex == size_) { // NOLINT(runtime/increment_decrement)
20 readIndex = 0;
21 }
22 }
23 }
24 // Different allocators require custom free functions
26 _mm_free(records_);
27 } else if (numa_aware_) {
28#ifdef WITH_LIBNUMA_SUPPORT
29 numa_free(records_, sizeof(T) * size_);
30#endif
31 } else {
32 std::free(records_);
33 }
34}

◆ front()

template<class T >
const T * dunedaq::datahandlinglibs::IterableQueueModel< T >::front ( )
overridevirtual

Write referenced object into LB without moving it.

Get pointer to the front of the LB

Implements dunedaq::datahandlinglibs::LatencyBufferConcept< T >.

Definition at line 267 of file IterableQueueModel.hxx.

268{
269 auto const currentRead = readIndex_.load(std::memory_order_relaxed);
270 if (currentRead == writeIndex_.load(std::memory_order_acquire)) {
271 return nullptr;
272 }
273 return &records_[currentRead];
274}

◆ generate_opmon_data()

template<class T >
void dunedaq::datahandlinglibs::IterableQueueModel< T >::generate_opmon_data ( )
overrideprotectedvirtual

Hook for customisable pubblication. The function can throw, exception will be caught by the monitoring thread

Reimplemented from dunedaq::opmonlib::MonitorableObject.

Definition at line 372 of file IterableQueueModel.hxx.

372 {
373 opmon::LatencyBufferInfo info;
374 info.set_num_buffer_elements(this->occupancy());
375 this->publish(std::move(info));
376
377}
void publish(google::protobuf::Message &&, CustomOrigin &&co={}, OpMonLevel l=to_level(EntryOpMonLevel::kDefault)) const noexcept

◆ get_alignment_size()

template<class T >
std::size_t dunedaq::datahandlinglibs::IterableQueueModel< T >::get_alignment_size ( )
inline

Definition at line 240 of file IterableQueueModel.hpp.

240{ return alignment_size_; }

◆ isEmpty()

template<class T >
bool dunedaq::datahandlinglibs::IterableQueueModel< T >::isEmpty ( ) const

Definition at line 225 of file IterableQueueModel.hxx.

226{
227 return readIndex_.load(std::memory_order_acquire) == writeIndex_.load(std::memory_order_acquire);
228}

◆ isFull()

template<class T >
bool dunedaq::datahandlinglibs::IterableQueueModel< T >::isFull ( ) const

Definition at line 233 of file IterableQueueModel.hxx.

234{
235 auto nextRecord = writeIndex_.load(std::memory_order_acquire) + 1;
236 if (nextRecord == size_) {
237 nextRecord = 0;
238 }
239 if (nextRecord != readIndex_.load(std::memory_order_acquire)) {
240 return false;
241 }
242 // queue is full
243 return true;
244}

◆ occupancy()

template<class T >
std::size_t dunedaq::datahandlinglibs::IterableQueueModel< T >::occupancy ( ) const
overridevirtual

Occupancy of LB.

Implements dunedaq::datahandlinglibs::LatencyBufferConcept< T >.

Definition at line 254 of file IterableQueueModel.hxx.

255{
256 int ret = static_cast<int>(writeIndex_.load(std::memory_order_acquire)) -
257 static_cast<int>(readIndex_.load(std::memory_order_acquire));
258 if (ret < 0) {
259 ret += static_cast<int>(size_);
260 }
261 return static_cast<std::size_t>(ret);
262}

◆ operator=()

template<class T >
IterableQueueModel & dunedaq::datahandlinglibs::IterableQueueModel< T >::operator= ( const IterableQueueModel< T > & )
delete

◆ pop()

template<class T >
void dunedaq::datahandlinglibs::IterableQueueModel< T >::pop ( std::size_t amount)
virtual

Pop specified amount of elements from LB.

Implements dunedaq::datahandlinglibs::LatencyBufferConcept< T >.

Definition at line 215 of file IterableQueueModel.hxx.

216{
217 for (std::size_t i = 0; i < x; i++) {
218 popFront();
219 }
220}

◆ popFront()

template<class T >
void dunedaq::datahandlinglibs::IterableQueueModel< T >::popFront ( )

Definition at line 198 of file IterableQueueModel.hxx.

199{
200 auto const currentRead = readIndex_.load(std::memory_order_relaxed);
201 assert(currentRead != writeIndex_.load(std::memory_order_acquire));
202
203 auto nextRecord = currentRead + 1;
204 if (nextRecord == size_) {
205 nextRecord = 0;
206 }
207
208 records_[currentRead].~T();
209 readIndex_.store(nextRecord, std::memory_order_release);
210}

◆ prefill_task()

template<class T >
void dunedaq::datahandlinglibs::IterableQueueModel< T >::prefill_task ( )

Definition at line 84 of file IterableQueueModel.hxx.

85{
86 // Wait until LB issues ready
87 std::unique_lock lk(prefill_mutex_);
88 prefill_cv_.wait(lk, [this]{ return prefill_ready_; });
89
90 // After wait, we are ready to force page-fault
91 for (size_t i = 0; i < size_ - 1; ++i) {
92 T element = T();
93 write_(std::move(element));
94 }
95 flush();
96
97 // Preallocation done
98 prefill_done_ = true;
99
100 // Manual unlock is done before notify: avoid waking up the waiting thread only to block again.
101 lk.unlock();
102 prefill_cv_.notify_one();
103}
void flush() override
Flush all elements from the latency buffer.

◆ read()

template<class T >
bool dunedaq::datahandlinglibs::IterableQueueModel< T >::read ( T & element)
overridevirtual

Move object from LB to referenced.

Implements dunedaq::datahandlinglibs::LatencyBufferConcept< T >.

Definition at line 177 of file IterableQueueModel.hxx.

178{
179 auto const currentRead = readIndex_.load(std::memory_order_relaxed);
180 if (currentRead == writeIndex_.load(std::memory_order_acquire)) {
181 // queue is empty
182 return false;
183 }
184
185 auto nextRecord = currentRead + 1;
186 if (nextRecord == size_) {
187 nextRecord = 0;
188 }
189 record = std::move(records_[currentRead]);
190 records_[currentRead].~T();
191 readIndex_.store(nextRecord, std::memory_order_release);
192 return true;
193}

◆ scrap()

template<class T >
void dunedaq::datahandlinglibs::IterableQueueModel< T >::scrap ( const nlohmann::json & cfg)
overridevirtual

Unconfigure the LB.

Implements dunedaq::datahandlinglibs::LatencyBufferConcept< T >.

Definition at line 322 of file IterableQueueModel.hxx.

323{
324 free_memory();
325 numa_aware_ = false;
326 numa_node_ = 0;
327 intrinsic_allocator_ = false;
328 alignment_size_ = 0;
330 prefill_ready_ = false;
331 prefill_done_ = false;
332 size_ = 2;
333 records_ = static_cast<T*>(std::malloc(sizeof(T) * 2));
334 readIndex_ = 0;
335 writeIndex_ = 0;
336}

◆ size()

template<class T >
std::size_t dunedaq::datahandlinglibs::IterableQueueModel< T >::size ( ) const
inline

Definition at line 213 of file IterableQueueModel.hpp.

213{ return size_; }

◆ start_of_buffer()

template<class T >
T * dunedaq::datahandlinglibs::IterableQueueModel< T >::start_of_buffer ( )
inline

Definition at line 225 of file IterableQueueModel.hpp.

225{ return &records_[0]; }

◆ write()

template<class T >
bool dunedaq::datahandlinglibs::IterableQueueModel< T >::write ( T && element)
overridevirtual

Move referenced object into LB.

Implements dunedaq::datahandlinglibs::LatencyBufferConcept< T >.

Definition at line 155 of file IterableQueueModel.hxx.

156{
157 auto const currentWrite = writeIndex_.load(std::memory_order_relaxed);
158 auto nextRecord = currentWrite + 1;
159 if (nextRecord == size_) {
160 nextRecord = 0;
161 }
162
163 if (nextRecord != readIndex_.load(std::memory_order_acquire)) {
164 new (&records_[currentWrite]) T(std::move(record));
165 writeIndex_.store(nextRecord, std::memory_order_release);
166 return true;
167 }
168
169 // queue is full
170 ++overflow_ctr;
171 return false;
172}

◆ write_()

template<class T >
template<class... Args>
bool dunedaq::datahandlinglibs::IterableQueueModel< T >::write_ ( Args &&... recordArgs)
protected

Definition at line 342 of file IterableQueueModel.hxx.

343{
344 // const std::lock_guard<std::mutex> lock(m_mutex);
345 auto const currentWrite = writeIndex_.load(std::memory_order_relaxed);
346 auto nextRecord = currentWrite + 1;
347 if (nextRecord == size_) {
348 nextRecord = 0;
349 }
350 // if (nextRecord == readIndex_.load(std::memory_order_acquire)) {
351 // std::cout << "SPSC WARNING -> Queue is full! WRITE PASSES READ!!! \n";
352 //}
353 // new (&records_[currentWrite]) T(std::forward<Args>(recordArgs)...);
354 // writeIndex_.store(nextRecord, std::memory_order_release);
355 // return true;
356
357 // ORIGINAL:
358 if (nextRecord != readIndex_.load(std::memory_order_acquire)) {
359 new (&records_[currentWrite]) T(std::forward<Args>(recordArgs)...);
360 writeIndex_.store(nextRecord, std::memory_order_release);
361 return true;
362 }
363 // queue is full
364
365 ++overflow_ctr;
366
367 return false;
368}

Member Data Documentation

◆ alignment_size_

template<class T >
std::size_t dunedaq::datahandlinglibs::IterableQueueModel< T >::alignment_size_
protected

Definition at line 328 of file IterableQueueModel.hpp.

◆ intrinsic_allocator_

template<class T >
bool dunedaq::datahandlinglibs::IterableQueueModel< T >::intrinsic_allocator_
protected

Definition at line 327 of file IterableQueueModel.hpp.

◆ invalid_configuration_requested_

template<class T >
bool dunedaq::datahandlinglibs::IterableQueueModel< T >::invalid_configuration_requested_
protected

Definition at line 329 of file IterableQueueModel.hpp.

◆ numa_aware_

template<class T >
bool dunedaq::datahandlinglibs::IterableQueueModel< T >::numa_aware_
protected

Definition at line 325 of file IterableQueueModel.hpp.

◆ numa_node_

template<class T >
uint8_t dunedaq::datahandlinglibs::IterableQueueModel< T >::numa_node_
protected

Definition at line 326 of file IterableQueueModel.hpp.

◆ overflow_ctr

template<class T >
std::atomic<int> dunedaq::datahandlinglibs::IterableQueueModel< T >::overflow_ctr { 0 }
protected

Definition at line 322 of file IterableQueueModel.hpp.

322{ 0 };

◆ pad0_

template<class T >
char dunedaq::datahandlinglibs::IterableQueueModel< T >::pad0_[folly::hardware_destructive_interference_size]
protected

Definition at line 344 of file IterableQueueModel.hpp.

◆ pad1_

template<class T >
char dunedaq::datahandlinglibs::IterableQueueModel< T >::pad1_[folly::hardware_destructive_interference_size - sizeof(writeIndex_)]
protected

Definition at line 351 of file IterableQueueModel.hpp.

◆ prefill_cv_

template<class T >
std::condition_variable dunedaq::datahandlinglibs::IterableQueueModel< T >::prefill_cv_
protected

Definition at line 334 of file IterableQueueModel.hpp.

◆ prefill_done_

template<class T >
bool dunedaq::datahandlinglibs::IterableQueueModel< T >::prefill_done_
protected

Definition at line 336 of file IterableQueueModel.hpp.

◆ prefill_mutex_

template<class T >
std::mutex dunedaq::datahandlinglibs::IterableQueueModel< T >::prefill_mutex_
protected

Definition at line 333 of file IterableQueueModel.hpp.

◆ prefill_ready_

template<class T >
bool dunedaq::datahandlinglibs::IterableQueueModel< T >::prefill_ready_
protected

Definition at line 335 of file IterableQueueModel.hpp.

◆ prefiller_name_

template<class T >
std::string dunedaq::datahandlinglibs::IterableQueueModel< T >::prefiller_name_ {"lbpfn"}
protected

Definition at line 332 of file IterableQueueModel.hpp.

332{"lbpfn"};

◆ ptrlogger

template<class T >
std::thread dunedaq::datahandlinglibs::IterableQueueModel< T >::ptrlogger
protected

Definition at line 339 of file IterableQueueModel.hpp.

◆ readIndex_

template<class T >
std::atomic<unsigned int> dunedaq::datahandlinglibs::IterableQueueModel< T >::readIndex_
protected

Definition at line 348 of file IterableQueueModel.hpp.

◆ records_

template<class T >
T* dunedaq::datahandlinglibs::IterableQueueModel< T >::records_
protected

Definition at line 346 of file IterableQueueModel.hpp.

◆ size_

template<class T >
uint32_t dunedaq::datahandlinglibs::IterableQueueModel< T >::size_
protected

Definition at line 345 of file IterableQueueModel.hpp.

◆ writeIndex_

template<class T >
std::atomic<unsigned int> dunedaq::datahandlinglibs::IterableQueueModel< T >::writeIndex_
protected

Definition at line 350 of file IterableQueueModel.hpp.


The documentation for this struct was generated from the following files: