4namespace datahandlinglibs {
14 if (!std::is_trivially_destructible<T>::value) {
15 std::size_t readIndex = readIndex_;
16 std::size_t endIndex = writeIndex_;
17 while (readIndex != endIndex) {
18 records_[readIndex].~T();
19 if (++readIndex == size_) {
25 if (intrinsic_allocator_) {
27 }
else if (numa_aware_) {
28#ifdef WITH_LIBNUMA_SUPPORT
29 numa_free(records_,
sizeof(T) * size_);
42 bool intrinsic_allocator,
43 std::size_t alignment_size)
48 if (numa_aware && numa_node < 8) {
49#ifdef WITH_LIBNUMA_SUPPORT
50 numa_set_preferred((
unsigned)numa_node);
51 #ifdef WITH_LIBNUMA_BIND_POLICY
52 numa_set_bind_policy(WITH_LIBNUMA_BIND_POLICY);
54 #ifdef WITH_LIBNUMA_STRICT_POLICY
55 numa_set_strict(WITH_LIBNUMA_STRICT_POLICY);
57 records_ =
static_cast<T*
>(numa_alloc_onnode(
sizeof(T) *
size, numa_node));
60 "NUMA allocation was requested but program was built without USE_LIBNUMA");
62 }
else if (intrinsic_allocator && alignment_size > 0) {
63 records_ =
static_cast<T*
>(_mm_malloc(
sizeof(T) *
size, alignment_size));
64 }
else if (!intrinsic_allocator && alignment_size > 0) {
65 records_ =
static_cast<T*
>(std::aligned_alloc(alignment_size,
sizeof(T) *
size));
66 }
else if (!numa_aware && !intrinsic_allocator && alignment_size == 0) {
68 records_ =
static_cast<T*
>(std::malloc(
sizeof(T) *
size));
76 numa_aware_ = numa_aware;
77 numa_node_ = numa_node;
78 intrinsic_allocator_ = intrinsic_allocator;
79 alignment_size_ = alignment_size;
87 std::unique_lock lk(prefill_mutex_);
88 prefill_cv_.wait(lk, [
this]{
return prefill_ready_; });
91 for (
size_t i = 0; i < size_ - 1; ++i) {
93 write_(std::move(element));
102 prefill_cv_.notify_one();
114 snprintf(tname, 16,
"%s-%d", prefiller_name_.c_str(), numa_node_);
115 auto handle = prefill_thread.native_handle();
116 pthread_setname_np(handle, tname);
118#ifdef WITH_LIBNUMA_SUPPORT
119 cpu_set_t affinitymask;
120 CPU_ZERO(&affinitymask);
121 struct bitmask *nodecpumask = numa_allocate_cpumask();
124 ret = numa_node_to_cpus(numa_node_, nodecpumask);
127 for (
int i=0; i< numa_num_configured_cpus(); ++i) {
128 if (numa_bitmask_isbitset(nodecpumask, i)) {
129 CPU_SET(i, &affinitymask);
132 ret = pthread_setaffinity_np(handle,
sizeof(cpu_set_t), &affinitymask);
134 numa_free_cpumask(nodecpumask);
139 std::lock_guard lk(prefill_mutex_);
140 prefill_ready_ =
true;
142 prefill_cv_.notify_one();
145 std::unique_lock lk(prefill_mutex_);
146 prefill_cv_.wait(lk, [
this]{
return prefill_done_; });
149 prefill_thread.join();
157 auto const currentWrite = writeIndex_.load(std::memory_order_relaxed);
158 auto nextRecord = currentWrite + 1;
159 if (nextRecord == size_) {
163 if (nextRecord != readIndex_.load(std::memory_order_acquire)) {
164 new (&records_[currentWrite]) T(std::move(record));
165 writeIndex_.store(nextRecord, std::memory_order_release);
179 auto const currentRead = readIndex_.load(std::memory_order_relaxed);
180 if (currentRead == writeIndex_.load(std::memory_order_acquire)) {
185 auto nextRecord = currentRead + 1;
186 if (nextRecord == size_) {
189 record = std::move(records_[currentRead]);
190 records_[currentRead].~T();
191 readIndex_.store(nextRecord, std::memory_order_release);
200 auto const currentRead = readIndex_.load(std::memory_order_relaxed);
201 assert(currentRead != writeIndex_.load(std::memory_order_acquire));
203 auto nextRecord = currentRead + 1;
204 if (nextRecord == size_) {
208 records_[currentRead].~T();
209 readIndex_.store(nextRecord, std::memory_order_release);
217 for (std::size_t i = 0; i < x; i++) {
227 return readIndex_.load(std::memory_order_acquire) == writeIndex_.load(std::memory_order_acquire);
235 auto nextRecord = writeIndex_.load(std::memory_order_acquire) + 1;
236 if (nextRecord == size_) {
239 if (nextRecord != readIndex_.load(std::memory_order_acquire)) {
256 int ret =
static_cast<int>(writeIndex_.load(std::memory_order_acquire)) -
257 static_cast<int>(readIndex_.load(std::memory_order_acquire));
259 ret +=
static_cast<int>(size_);
261 return static_cast<std::size_t
>(ret);
269 auto const currentRead = readIndex_.load(std::memory_order_relaxed);
270 if (currentRead == writeIndex_.load(std::memory_order_acquire)) {
273 return &records_[currentRead];
281 auto const currentWrite = writeIndex_.load(std::memory_order_relaxed);
282 if (currentWrite == readIndex_.load(std::memory_order_acquire)) {
285 int currentLast = currentWrite;
286 if (currentLast == 0) {
287 currentLast = size_ - 1;
291 return &records_[currentLast];
311 throw std::bad_alloc();
327 intrinsic_allocator_ =
false;
329 invalid_configuration_requested_ =
false;
330 prefill_ready_ =
false;
331 prefill_done_ =
false;
333 records_ =
static_cast<T*
>(std::malloc(
sizeof(T) * 2));
340template<
class... Args>
345 auto const currentWrite = writeIndex_.load(std::memory_order_relaxed);
346 auto nextRecord = currentWrite + 1;
347 if (nextRecord == size_) {
358 if (nextRecord != readIndex_.load(std::memory_order_acquire)) {
359 new (&records_[currentWrite]) T(std::forward<Args>(recordArgs)...);
360 writeIndex_.store(nextRecord, std::memory_order_release);
374 info.set_num_buffer_elements(this->occupancy());
375 this->publish(std::move(info));
uint32_t get_alignment_size() const
Get "alignment_size" attribute value.
bool get_preallocation() const
Get "preallocation" attribute value.
int16_t get_numa_node() const
Get "numa_node" attribute value.
uint32_t get_size() const
Get "size" attribute value.
bool get_intrinsic_allocator() const
Get "intrinsic_allocator" attribute value.
bool get_numa_aware() const
Get "numa_aware" attribute value.
FELIX Initialization std::string initerror FELIX queue timed std::string queuename Unexpected chunk size
SourceID[" << sourceid << "] Command daqdataformats::SourceID Readout Initialization std::string initerror Configuration std::string conferror GenericConfigurationError
std::size_t occupancy() const override
Occupancy of LB.
virtual void generate_opmon_data() override
void allocate_memory(std::size_t size, bool numa_aware, uint8_t numa_node=0, bool intrinsic_allocator=false, std::size_t alignment_size=0)
bool write_(Args &&... recordArgs)
const T * back() override
Get pointer to the back of the LB.
const T * front() override
Write referenced object into LB without moving it.
bool write(T &&record) override
Move referenced object into LB.
void conf(const appmodel::LatencyBuffer *cfg) override
Configure the LB.
void scrap(const nlohmann::json &) override
Unconfigure the LB.
void pop(std::size_t x)
Pop specified amount of elements from LB.
bool read(T &record) override
Move object from LB to referenced.