14 if (!std::is_trivially_destructible<T>::value) {
17 while (readIndex != endIndex) {
19 if (++readIndex ==
size_) {
28#ifdef WITH_LIBNUMA_SUPPORT
42 bool intrinsic_allocator,
43 std::size_t alignment_size)
48 if (numa_aware && numa_node < 8) {
49#ifdef WITH_LIBNUMA_SUPPORT
50 numa_set_preferred((
unsigned)numa_node);
51 #ifdef WITH_LIBNUMA_BIND_POLICY
52 numa_set_bind_policy(WITH_LIBNUMA_BIND_POLICY);
54 #ifdef WITH_LIBNUMA_STRICT_POLICY
55 numa_set_strict(WITH_LIBNUMA_STRICT_POLICY);
57 records_ =
static_cast<T*
>(numa_alloc_onnode(
sizeof(T) *
size, numa_node));
60 "NUMA allocation was requested but program was built without USE_LIBNUMA");
62 }
else if (intrinsic_allocator && alignment_size > 0) {
63 records_ =
static_cast<T*
>(_mm_malloc(
sizeof(T) *
size, alignment_size));
64 }
else if (!intrinsic_allocator && alignment_size > 0) {
65 records_ =
static_cast<T*
>(std::aligned_alloc(alignment_size,
sizeof(T) *
size));
66 }
else if (!numa_aware && !intrinsic_allocator && alignment_size == 0) {
68 records_ =
static_cast<T*
>(std::malloc(
sizeof(T) *
size));
91 for (
size_t i = 0; i <
size_ - 1; ++i) {
93 write_(std::move(element));
115 auto handle = prefill_thread.native_handle();
116 pthread_setname_np(handle, tname);
118#ifdef WITH_LIBNUMA_SUPPORT
119 cpu_set_t affinitymask;
120 CPU_ZERO(&affinitymask);
121 struct bitmask *nodecpumask = numa_allocate_cpumask();
124 ret = numa_node_to_cpus(
numa_node_, nodecpumask);
127 for (
int i=0; i< numa_num_configured_cpus(); ++i) {
128 if (numa_bitmask_isbitset(nodecpumask, i)) {
129 CPU_SET(i, &affinitymask);
132 ret = pthread_setaffinity_np(handle,
sizeof(cpu_set_t), &affinitymask);
134 numa_free_cpumask(nodecpumask);
149 prefill_thread.join();
157 auto const currentWrite =
writeIndex_.load(std::memory_order_relaxed);
158 auto nextRecord = currentWrite + 1;
159 if (nextRecord ==
size_) {
163 if (nextRecord !=
readIndex_.load(std::memory_order_acquire)) {
164 new (&
records_[currentWrite]) T(std::move(record));
165 writeIndex_.store(nextRecord, std::memory_order_release);
179 auto const currentRead =
readIndex_.load(std::memory_order_relaxed);
180 if (currentRead ==
writeIndex_.load(std::memory_order_acquire)) {
185 auto nextRecord = currentRead + 1;
186 if (nextRecord ==
size_) {
189 record = std::move(
records_[currentRead]);
191 readIndex_.store(nextRecord, std::memory_order_release);
200 auto const currentRead =
readIndex_.load(std::memory_order_relaxed);
201 assert(currentRead !=
writeIndex_.load(std::memory_order_acquire));
203 auto nextRecord = currentRead + 1;
204 if (nextRecord ==
size_) {
209 readIndex_.store(nextRecord, std::memory_order_release);
217 for (std::size_t i = 0; i < x; i++) {
235 auto nextRecord =
writeIndex_.load(std::memory_order_acquire) + 1;
236 if (nextRecord ==
size_) {
239 if (nextRecord !=
readIndex_.load(std::memory_order_acquire)) {
256 int ret =
static_cast<int>(
writeIndex_.load(std::memory_order_acquire)) -
257 static_cast<int>(
readIndex_.load(std::memory_order_acquire));
259 ret +=
static_cast<int>(
size_);
261 return static_cast<std::size_t
>(ret);
269 auto const currentRead =
readIndex_.load(std::memory_order_relaxed);
270 if (currentRead ==
writeIndex_.load(std::memory_order_acquire)) {
281 auto const currentWrite =
writeIndex_.load(std::memory_order_relaxed);
282 if (currentWrite ==
readIndex_.load(std::memory_order_acquire)) {
285 int currentLast = currentWrite;
286 if (currentLast == 0) {
287 currentLast =
size_ - 1;
311 throw std::bad_alloc();
333 records_ =
static_cast<T*
>(std::malloc(
sizeof(T) * 2));
340template<
class... Args>
345 auto const currentWrite =
writeIndex_.load(std::memory_order_relaxed);
346 auto nextRecord = currentWrite + 1;
347 if (nextRecord ==
size_) {
358 if (nextRecord !=
readIndex_.load(std::memory_order_acquire)) {
359 new (&
records_[currentWrite]) T(std::forward<Args>(recordArgs)...);
360 writeIndex_.store(nextRecord, std::memory_order_release);
374 info.set_num_buffer_elements(this->
occupancy());
375 this->
publish(std::move(info));
uint32_t get_alignment_size() const
Get "alignment_size" attribute value.
bool get_preallocation() const
Get "preallocation" attribute value.
int16_t get_numa_node() const
Get "numa_node" attribute value.
uint32_t get_size() const
Get "size" attribute value.
bool get_intrinsic_allocator() const
Get "intrinsic_allocator" attribute value.
bool get_numa_aware() const
Get "numa_aware" attribute value.
void publish(google::protobuf::Message &&, CustomOrigin &&co={}, OpMonLevel l=to_level(EntryOpMonLevel::kDefault)) const noexcept
SourceID[" << sourceid << "] Command daqdataformats::SourceID Readout Initialization std::string initerror Configuration std::string conferror GenericConfigurationError
void flush() override
Flush all elements from the latency buffer.
std::atomic< int > overflow_ctr
std::size_t occupancy() const override
Occupancy of LB.
virtual void generate_opmon_data() override
std::mutex prefill_mutex_
std::atomic< unsigned int > readIndex_
void allocate_memory(std::size_t size, bool numa_aware, uint8_t numa_node=0, bool intrinsic_allocator=false, std::size_t alignment_size=0)
std::condition_variable prefill_cv_
bool write_(Args &&... recordArgs)
const T * back() override
Get pointer to the back of the LB.
const T * front() override
Write referenced object into LB without moving it.
std::size_t alignment_size_
bool invalid_configuration_requested_
std::string prefiller_name_
bool write(T &&record) override
Move referenced object into LB.
void conf(const appmodel::LatencyBuffer *cfg) override
Configure the LB.
bool intrinsic_allocator_
void pop(std::size_t x)
Pop specified amount of elements from LB.
std::atomic< unsigned int > writeIndex_
void scrap(const appfwk::DAQModule::CommandData_t &) override
Unconfigure the LB.
bool read(T &record) override
Move object from LB to referenced.