21#ifndef TPGLIBS_PROCESSORINTERNALSTATEBUFFERMANAGER_HPP_
22#define TPGLIBS_PROCESSORINTERNALSTATEBUFFERMANAGER_HPP_
116 template <
typename T>
120 template <
typename T>
125 template <
typename T>
129 allocate_buffers(num_items);
130 allocate_cast_buffers(num_items);
132 m_write_buffer.store(&m_store_buffers[0], std::memory_order_release);
133 m_read_buffer.store(&m_store_buffers[1], std::memory_order_release);
134 m_cast_active_buffer.store(&m_cast_store_buffers[0], std::memory_order_release);
136 m_write_seq.store(0, std::memory_order_release);
137 m_last_read_seq.store(0, std::memory_order_release);
142 template <
typename T>
144 for (
auto& buf : m_store_buffers) {
145 buf.m_size = buffer_size;
146 buf.m_data =
static_cast<T*
>(
147 _mm_malloc(buf.m_size *
sizeof(T),
alignof(T))
152 template <
typename T>
155 m_write_seq.fetch_add(1, std::memory_order_release);
157 auto write_ptr = m_write_buffer.load(std::memory_order_acquire);
160 for (
size_t i = 0; i < m_internal_state_item_ptrs.size(); i++) {
162 if (m_internal_state_item_ptrs[i] !=
nullptr) {
163 write_ptr->m_data[i] = *m_internal_state_item_ptrs[i];
166 write_ptr->m_data[i] = T{};
171 m_write_seq.fetch_add(1, std::memory_order_release);
174 template <
typename T>
177 uint16_t current_write_seq;
179 current_write_seq = m_write_seq.load(std::memory_order_acquire);
180 }
while (current_write_seq & 1);
183 uint16_t last_read = m_last_read_seq.load(std::memory_order_acquire);
185 if (current_write_seq != last_read && current_write_seq > 0) {
187 auto current_read = m_read_buffer.load(std::memory_order_acquire);
188 auto current_write = m_write_buffer.load(std::memory_order_acquire);
191 m_read_buffer.store(current_write, std::memory_order_release);
192 m_write_buffer.store(current_read, std::memory_order_release);
195 m_last_read_seq.store(current_write_seq, std::memory_order_release);
199 auto read_ptr = m_read_buffer.load(std::memory_order_acquire);
208 for (
auto& buf : m_store_buffers) { _mm_free(buf.m_data); }
210 for (
auto& buf : m_cast_store_buffers) { _mm_free(buf.m_data); }
217 auto raw_data = switch_buffer_and_read();
219 auto* cast_free = m_cast_active_buffer.load(std::memory_order_acquire);
222 for (
size_t i = 0; i < raw_data.m_size; ++i) {
225 reinterpret_cast<__m256i*
>(cast_free->m_data[i].data()),
232 auto* next = (cast_free == &m_cast_store_buffers[0]) ? &m_cast_store_buffers[1] : &m_cast_store_buffers[0];
233 m_cast_active_buffer.store(next, std::memory_order_release);
241 for (
auto& buf : m_cast_store_buffers) {
243 buf.m_data =
static_cast<std::array<int16_t,16>*
>(
244 _mm_malloc(n *
sizeof(std::array<int16_t,16>), 32)
247 m_cast_active_buffer.store(&m_cast_store_buffers[0], std::memory_order_release);
254 auto raw_data = switch_buffer_and_read();
Manages the internal state storage buffers for a processor.
std::atomic< ProcessorMetricArray< signal_t > * > m_write_buffer
The write buffer pointer (buffer writer currently uses).
void clear()
clear all buffers and deallocate memory.
ProcessorMetricArray< std::array< int16_t, 16 > > m_cast_store_buffers[2]
The double buffers for storing the internal state data casted to std::array<int16_t,...
std::atomic< uint16_t > m_last_read_seq
The last sequence number that was read.
void configure_from_registry(ProcessorInternalStateNameRegistry< signal_t > *registry)
Configure and allocate correct buffer storage given the configuration string.
void write_to_active_buffer()
Write to the active buffer.
std::atomic< uint16_t > m_write_seq
The sequence number for writes (odd=writing, even=complete).
std::vector< std::shared_ptr< signal_t > > m_internal_state_item_ptrs
The vector of pointers to the internal state items.
std::atomic< ProcessorMetricArray< std::array< int16_t, 16 > > * > m_cast_active_buffer
The active buffer for the casted data.
T signal_t
Signal type to use. Generally __m256i or std::array<int16_t, 16>;.
~ProcessorInternalStateBufferManager()
Destructor.
void allocate_cast_buffers(size_t)
Allocate the correct size for the double buffer read and write buffers for the casted data.
std::atomic< ProcessorMetricArray< signal_t > * > m_read_buffer
The read buffer pointer (buffer reader currently uses).
size_t m_buffer_size
size of each buffer.
void switch_active_buffer()
Switch the active buffer.
void allocate_buffers(size_t buffer_size)
Allocate the correct size for the double buffer read and write buffers.
ProcessorMetricArray< std::array< int16_t, 16 > > switch_buffer_and_read_casted()
Read from the inactive buffer and cast to std::array<int16_t, 16>.
ProcessorMetricArray< signal_t > switch_buffer_and_read()
Read from the inactive buffer.
ProcessorMetricArray< signal_t > m_store_buffers[2]
The double buffers for storing the internal state data.
ProcessorInternalStateBufferManager()
Constructor.
Registry of internal state names.
size_t get_number_of_requested_internal_states()
Get the number of requested internal states.
std::vector< std::shared_ptr< signal_t > > get_all_requested_internal_state_item_ptrs()
Get a vector of pointers to all internal state items.
Dynamic array of processor metrics, templated on signal type.