DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
ProcessorInternalStateBufferManager.hpp
Go to the documentation of this file.
1
11
12#include <string>
13#include <vector>
14#include <memory>
15#include <atomic>
16#include <immintrin.h>
17#include <array>
18#include <cstdint>
19#include <mm_malloc.h>
20
21#ifndef TPGLIBS_PROCESSORINTERNALSTATEBUFFERMANAGER_HPP_
22#define TPGLIBS_PROCESSORINTERNALSTATEBUFFERMANAGER_HPP_
23
24namespace tpglibs {
25
31 template <typename T>
33 public:
35 using signal_t = T;
36
39
42
48
54
57
63
65 void clear() {
66 for (auto& buf : m_store_buffers) { _mm_free(buf.m_data); }
67 }
68
69 protected:
70
75 void allocate_buffers(size_t buffer_size);
76
81 void allocate_cast_buffers(size_t) {}; // Do nothing for the generic template
82
85
86 private:
88 std::vector<std::shared_ptr<signal_t>> m_internal_state_item_ptrs;
89
92
95
97 std::atomic<ProcessorMetricArray<std::array<int16_t,16>>*> m_cast_active_buffer{ &m_cast_store_buffers[0] };
98
100 std::atomic<ProcessorMetricArray<signal_t>*> m_write_buffer = &m_store_buffers[0];
101
103 std::atomic<ProcessorMetricArray<signal_t>*> m_read_buffer = &m_store_buffers[0];
104
106 std::atomic<uint16_t> m_write_seq{0};
107
109 std::atomic<uint16_t> m_last_read_seq{0};
110
113 };
114
115 // Template function implementations
116 template <typename T>
119
120 template <typename T>
124
125 template <typename T>
127 // obtain the number of internal state items
128 auto num_items = registry->get_number_of_requested_internal_states();
129 allocate_buffers(num_items);
130 allocate_cast_buffers(num_items);
131 // Writer starts with buffer 0, reader starts with buffer 1 (they must be different!)
132 m_write_buffer.store(&m_store_buffers[0], std::memory_order_release);
133 m_read_buffer.store(&m_store_buffers[1], std::memory_order_release);
134 m_cast_active_buffer.store(&m_cast_store_buffers[0], std::memory_order_release);
135 // reset sequence counters
136 m_write_seq.store(0, std::memory_order_release);
137 m_last_read_seq.store(0, std::memory_order_release);
138 // obtain the pointers to the internal state items
139 m_internal_state_item_ptrs = registry->get_all_requested_internal_state_item_ptrs();
140 }
141
142 template <typename T>
144 for (auto& buf : m_store_buffers) {
145 buf.m_size = buffer_size;
146 buf.m_data = static_cast<T*>(
147 _mm_malloc(buf.m_size * sizeof(T), alignof(T))
148 );
149 }
150 }
151
152 template <typename T>
154 // Increment seq to indicate write start (becomes odd)
155 m_write_seq.fetch_add(1, std::memory_order_release);
156
157 auto write_ptr = m_write_buffer.load(std::memory_order_acquire);
158
159 // Write to write buffer
160 for (size_t i = 0; i < m_internal_state_item_ptrs.size(); i++) {
161 // Check for nullptr before dereferencing (handles invalid state names)
162 if (m_internal_state_item_ptrs[i] != nullptr) {
163 write_ptr->m_data[i] = *m_internal_state_item_ptrs[i];
164 } else {
165 // If pointer is null, write a zeroed value
166 write_ptr->m_data[i] = T{};
167 }
168 }
169
170 // Increment seq to indicate write complete (becomes even)
171 m_write_seq.fetch_add(1, std::memory_order_release);
172 }
173
174 template <typename T>
176 // Wait until no write is in progress
177 uint16_t current_write_seq;
178 do {
179 current_write_seq = m_write_seq.load(std::memory_order_acquire);
180 } while (current_write_seq & 1); // spin if writer is mid-write (odd seq number)
181
182 // Check if there's new data since last read
183 uint16_t last_read = m_last_read_seq.load(std::memory_order_acquire);
184
185 if (current_write_seq != last_read && current_write_seq > 0) {
186 // New data available - swap the buffers between reader and writer
187 auto current_read = m_read_buffer.load(std::memory_order_acquire);
188 auto current_write = m_write_buffer.load(std::memory_order_acquire);
189
190 // Swap: reader gets what writer just finished, writer gets what reader was using
191 m_read_buffer.store(current_write, std::memory_order_release);
192 m_write_buffer.store(current_read, std::memory_order_release);
193
194 // Update last read sequence
195 m_last_read_seq.store(current_write_seq, std::memory_order_release);
196 }
197
198 // Return data from read buffer
199 auto read_ptr = m_read_buffer.load(std::memory_order_acquire);
200 return *read_ptr;
201 }
202
203 // Template specializations
204 // Specialization for __m256i since it has additional cast buffers.
205 template<>
207 // free double buffer for temp values
208 for (auto& buf : m_store_buffers) { _mm_free(buf.m_data); }
209 // free double buffer for casted values
210 for (auto& buf : m_cast_store_buffers) { _mm_free(buf.m_data); }
211 }
212
213 // Specialization for __m256i -> std::array<int16_t, 16> cast
214 template <>
216 // First, get the raw data using switch_buffer_and_read
217 auto raw_data = switch_buffer_and_read();
218
219 auto* cast_free = m_cast_active_buffer.load(std::memory_order_acquire);
220
221 // Cast each __m256i to std::array<int16_t, 16>
222 for (size_t i = 0; i < raw_data.m_size; ++i) {
223 // Cast and save to cast buffer
224 _mm256_store_si256(
225 reinterpret_cast<__m256i*>(cast_free->m_data[i].data()),
226 raw_data.m_data[i]
227 );
228 }
229
230 // Flip the active cast buffer
231
232 auto* next = (cast_free == &m_cast_store_buffers[0]) ? &m_cast_store_buffers[1] : &m_cast_store_buffers[0];
233 m_cast_active_buffer.store(next, std::memory_order_release);
234
235 return *cast_free;
236 }
237
238 // Specialization for __m256i
239 template<>
241 for (auto& buf : m_cast_store_buffers) {
242 buf.m_size = n;
243 buf.m_data = static_cast<std::array<int16_t,16>*>(
244 _mm_malloc(n * sizeof(std::array<int16_t,16>), 32)
245 );
246 }
247 m_cast_active_buffer.store(&m_cast_store_buffers[0], std::memory_order_release);
248 }
249
250 // Specialization for std::array<int16_t, 16> -> std::array<int16_t, 16> cast (trivial)
251 template <>
253 // First, get the raw data using switch_buffer_and_read
254 auto raw_data = switch_buffer_and_read();
255
256 // For std::array<int16_t, 16>, the cast is trivial - just return the data as-is
257 return raw_data;
258 }
259
260} // namespace tpglibs
261
262#endif // TPGLIBS_PROCESSORINTERNALSTATEBUFFERMANAGER_HPP_
Manages the internal state storage buffers for a processor.
std::atomic< ProcessorMetricArray< signal_t > * > m_write_buffer
The write buffer pointer (buffer writer currently uses).
void clear()
clear all buffers and deallocate memory.
ProcessorMetricArray< std::array< int16_t, 16 > > m_cast_store_buffers[2]
The double buffers for storing the internal state data casted to std::array<int16_t,...
std::atomic< uint16_t > m_last_read_seq
The last sequence number that was read.
void configure_from_registry(ProcessorInternalStateNameRegistry< signal_t > *registry)
Configure and allocate correct buffer storage given the configuration string.
std::atomic< uint16_t > m_write_seq
The sequence number for writes (odd=writing, even=complete).
std::vector< std::shared_ptr< signal_t > > m_internal_state_item_ptrs
The vector of pointers to the internal state items.
std::atomic< ProcessorMetricArray< std::array< int16_t, 16 > > * > m_cast_active_buffer
The active buffer for the casted data.
T signal_t
Signal type to use. Generally __m256i or std::array<int16_t, 16>;.
void allocate_cast_buffers(size_t)
Allocate the correct size for the double buffer read and write buffers for the casted data.
std::atomic< ProcessorMetricArray< signal_t > * > m_read_buffer
The read buffer pointer (buffer reader currently uses).
void switch_active_buffer()
Switch the active buffer.
void allocate_buffers(size_t buffer_size)
Allocate the correct size for the double buffer read and write buffers.
ProcessorMetricArray< std::array< int16_t, 16 > > switch_buffer_and_read_casted()
Read from the inactive buffer and cast to std::array<int16_t, 16>.
ProcessorMetricArray< signal_t > switch_buffer_and_read()
Read from the inactive buffer.
ProcessorMetricArray< signal_t > m_store_buffers[2]
The double buffers for storing the internal state data.
size_t get_number_of_requested_internal_states()
Get the number of requested internal states.
std::vector< std::shared_ptr< signal_t > > get_all_requested_internal_state_item_ptrs()
Get a vector of pointers to all internal state items.
Dynamic array of processor metrics, templated on signal type.