24#ifndef DATAHANDLINGLIBS_INCLUDE_DATAHANDLINGLIBS_MODELS_ITERABLEQUEUEMODEL_HPP_
25#define DATAHANDLINGLIBS_INCLUDE_DATAHANDLINGLIBS_MODELS_ITERABLEQUEUEMODEL_HPP_
34#include <folly/lang/Align.h>
54#ifdef WITH_LIBNUMA_SUPPORT
59namespace datahandlinglibs {
89 ,
records_(static_cast<T*>(
std::malloc(sizeof(T) * 2)))
111 throw std::bad_alloc();
116 auto const currentRead =
readIndex_.load(std::memory_order_relaxed);
117 auto const currentWrite =
writeIndex_.load(std::memory_order_relaxed);
119 <<
" R:" << currentRead <<
" - W:" << currentWrite
121 std::this_thread::sleep_for(std::chrono::milliseconds(100));
129 bool numa_aware =
false,
130 uint8_t numa_node = 0,
131 bool intrinsic_allocator =
false,
132 std::size_t alignment_size = 0)
149 throw std::bad_alloc();
154 auto const currentRead =
readIndex_.load(std::memory_order_relaxed);
155 auto const currentWrite =
writeIndex_.load(std::memory_order_relaxed);
157 <<
" R:" << currentRead <<
" - W:" << currentWrite
159 std::this_thread::sleep_for(std::chrono::milliseconds(100));
174 uint8_t numa_node = 0,
175 bool intrinsic_allocator =
false,
176 std::size_t alignment_size = 0);
187 bool write(T&& record)
override;
190 bool read(T& record)
override;
196 void pop(std::size_t x);
219 const T*
front()
override;
222 const T*
back()
override;
234 void scrap(
const nlohmann::json& )
override;
267 m_index = std::numeric_limits<uint32_t>::max();
274 for (
int i = 0; i < amount; ++i) {
284 auto const currentRead =
m_queue.readIndex_.load(std::memory_order_relaxed);
285 auto const currentWrite =
m_queue.writeIndex_.load(std::memory_order_relaxed);
286 return (*
this !=
m_queue.end()) &&
288 (
m_index >= currentRead && currentWrite < currentRead) ||
289 (currentWrite < currentRead &&
m_index < currentRead &&
m_index < currentWrite));
301 auto const currentRead =
readIndex_.load(std::memory_order_relaxed);
302 if (currentRead ==
writeIndex_.load(std::memory_order_acquire)) {
306 return Iterator(*
this, currentRead);
311 return Iterator(*
this, std::numeric_limits<uint32_t>::max());
318 template<
class... Args>
319 bool write_(Args&&... recordArgs);
344 char pad0_[folly::hardware_destructive_interference_size];
348 folly::hardware_destructive_interference_size) std::atomic<unsigned int>
readIndex_;
350 folly::hardware_destructive_interference_size) std::atomic<unsigned int>
writeIndex_;
std::forward_iterator_tag iterator_category
friend bool operator!=(const Iterator &a, const Iterator &b)
reference operator*() const
Iterator operator++(int amount)
IterableQueueModel< T > & m_queue
std::ptrdiff_t difference_type
friend bool operator==(const Iterator &a, const Iterator &b)
Iterator(IterableQueueModel< T > &queue, uint32_t index)
char pad0_[folly::hardware_destructive_interference_size]
std::size_t capacity() const
std::size_t get_alignment_size()
void flush() override
Flush all elements from the latency buffer.
IterableQueueModel(std::size_t size)
std::atomic< int > overflow_ctr
std::size_t occupancy() const override
Occupancy of LB.
virtual void generate_opmon_data() override
std::mutex prefill_mutex_
std::atomic< unsigned int > readIndex_
void allocate_memory(std::size_t size, bool numa_aware, uint8_t numa_node=0, bool intrinsic_allocator=false, std::size_t alignment_size=0)
std::condition_variable prefill_cv_
bool write_(Args &&... recordArgs)
const T * back() override
Get pointer to the back of the LB.
const T * front() override
Write referenced object into LB without moving it.
char pad1_[folly::hardware_destructive_interference_size - sizeof(writeIndex_)]
std::size_t alignment_size_
bool invalid_configuration_requested_
IterableQueueModel(std::size_t size, bool numa_aware=false, uint8_t numa_node=0, bool intrinsic_allocator=false, std::size_t alignment_size=0)
std::string prefiller_name_
bool write(T &&record) override
Move referenced object into LB.
void conf(const appmodel::LatencyBuffer *cfg) override
Configure the LB.
IterableQueueModel(const IterableQueueModel &)=delete
IterableQueueModel & operator=(const IterableQueueModel &)=delete
bool intrinsic_allocator_
void scrap(const nlohmann::json &) override
Unconfigure the LB.
void pop(std::size_t x)
Pop specified amount of elements from LB.
std::atomic< unsigned int > writeIndex_
bool read(T &record) override
Move object from LB to referenced.
void allocate_memory(std::size_t size) override