DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
ProcessorMetricCollector.cpp
Go to the documentation of this file.
1
12#include <unordered_map>
13#include <vector>
14#include <string>
15#include <immintrin.h>
16
17namespace tpglibs {
18
19template class ProcessorMetricCollector<__m256i>;
20
21template<typename signal_t>
22std::vector<AbstractProcessor<signal_t>*> ProcessorMetricCollector<signal_t>::_get_attached_processors() {
23 return m_attached_processors;
24}
25
26template<typename signal_t>
27std::vector<ProcessorMetricInformation> ProcessorMetricCollector<signal_t>::_get_processor_metric_table() {
28 return m_processor_metric_table;
29}
30
31template<typename signal_t>
32std::vector<std::vector<std::vector<int16_t>>> ProcessorMetricCollector<signal_t>::_get_processor_casted_data_table() {
33 return m_processor_casted_data_table;
34}
35
36template<typename signal_t>
37void ProcessorMetricCollector<signal_t>::configure(const std::vector<std::pair<std::string, nlohmann::json>> configs,
38 const std::vector<std::pair<dunedaq::trgdataformats::channel_t, int16_t>> channel_plane_numbers,
39 uint8_t num_pipelines)
40{
41 // Stop any existing thread first - use proper synchronization
42 if (m_collector_thread.joinable()) {
43 m_stop_flag.store(true, std::memory_order_relaxed);
44 // Give the thread a chance to see the stop flag
45 std::this_thread::yield();
46 m_collector_thread.join();
47 }
48
49 // Reset all state for fresh start
50 m_signal_collect.store(false, std::memory_order_relaxed);
51 m_stop_flag.store(false, std::memory_order_relaxed);
52 m_attach_counter = 0;
53
54 // Clear all data structures
55 m_metrics.clear();
56
57 // create a fixed size array for reference to all the processors
58 // configs are pairs of (processor name, specific configs)
59 size_t n_processors = configs.size();
60 m_attached_processors = std::vector<AbstractProcessor<signal_t>*>(n_processors * num_pipelines);
61
62 m_processor_metric_table = std::vector<ProcessorMetricInformation>(n_processors * num_pipelines, ProcessorMetricInformation{0, {}});
63
64 m_processor_metric_collection_table = {};
65
66 m_processor_casted_data_table = {};
67
68 m_channel_numbers = std::vector<dunedaq::trgdataformats::channel_t>(channel_plane_numbers.size(), 0);
69
70 for (size_t i = 0; i < channel_plane_numbers.size(); i++) {
71 m_channel_numbers[i] = channel_plane_numbers[i].first;
72 m_metrics[m_channel_numbers[i]] = std::vector<std::pair<std::string, int16_t>>();
73 }
74}
75
76template<typename signal_t>
78 size_t proc_id = 0;
79 for (auto processor_ptr : m_attached_processors) {
80 if (!processor_ptr) continue;
81 ProcessorMetricArray items = processor_ptr->read_from_metric_store_buffer();
82 for (size_t i = 0; i < items.m_size; i++) {
83 m_processor_metric_collection_table[proc_id][i] = items.m_data[i];
84 }
85 proc_id++;
86 }
87}
88
89template<typename signal_t>
91 for (size_t i = 0; i < m_processor_metric_collection_table.size(); i++) {
92 auto & raw = m_processor_metric_collection_table[i];
93 for (size_t j = 0; j < raw.size(); j++) {
94 int16_t out[16];
95 _mm256_storeu_si256(reinterpret_cast<__m256i*>(&out), raw[j]);
96 for (size_t k = 0; k < 16; k++) {
97 m_processor_casted_data_table[i][j][k] = out[k];
98 }
99 }
100 }
101}
102
103template<typename signal_t>
105 // TODO: signal that a collection cycle should occur
106 m_signal_collect.store(true, std::memory_order_relaxed);
107 m_signal_collect.notify_one();
108 // this->collect_metrics_from_attached_processors();
109 // this->cast_metrics_from_raw_type();
110}
111
112template<typename signal_t>
114 // TODO: main loop for metric collection thread
115 // Check if thread is already running
116 m_stop_flag.store(false, std::memory_order_release);
117 if (m_collector_thread.joinable()) {
118 return; // Thread already running
119 }
120
121 m_collector_thread = std::thread([this]() {
122 while (!this->m_stop_flag.load(std::memory_order_acquire)) {
123 // wait on collect signal
124 m_signal_collect.wait(false, std::memory_order_relaxed);
125
126 // when woken up, check if stop flag is set
127
128 if (this->m_stop_flag.load(std::memory_order_acquire)) {
129 break;
130 }
131
132 // If this is signaled to collect metrics
133 this->collect_metrics_from_attached_processors();
134
135 this->m_signal_collect.store(false, std::memory_order_relaxed);
136 // After collection, we reset the collect flag to false.
137 // Note that when signal_collect() is called at a far higher rate then possible, ultimately collection
138 // happens at the highest possible rate, not necessarily the set rate
139
140 // perform any post processings on the collected metrics as needed
141 this->cast_metrics_from_raw_type();
142 this->convert_into_channel_metric_value();
143
144 }
145 });
146}
147
148template<typename signal_t>
149std::vector<std::vector<signal_t>>
151 // TODO: return collected metrics
152 return m_processor_metric_collection_table;
153}
154
155template<typename signal_t>
157 // TODO: stop the collection thread
158 m_stop_flag.store(true, std::memory_order_release);
159 m_signal_collect.store(true, std::memory_order_release);
160 // make sure stop flag is set
161 while (!this->m_stop_flag.load(std::memory_order_acquire)) {
162 std::this_thread::yield();
163 }
164 // notify the thread to wake up
165 m_signal_collect.notify_one();
166 if (m_collector_thread.joinable()) {
167 m_collector_thread.join();
168 }
169}
170
171template<typename signal_t>
173 std::scoped_lock lock(m_mutex);
174 std::vector<int16_t> mcount(m_channel_numbers.size(), 0);
175
176 for (size_t pid = 0; pid < m_processor_casted_data_table.size(); pid++) {
177 for (size_t mid = 0; mid < m_processor_casted_data_table[pid].size(); mid++) {
178 auto responsible_channels_start = m_processor_metric_table[pid].m_pipeline_id * 16;
179 size_t local_cid_in_table=0;
180 for (size_t cid = responsible_channels_start; cid < responsible_channels_start + 16; cid++) {
181 auto mnames = m_processor_metric_table[pid].m_names_of_metrics;
182 m_metrics[m_channel_numbers[cid]][mcount[cid]].second = m_processor_casted_data_table[pid][mid][local_cid_in_table];
183 m_metrics[m_channel_numbers[cid]][mcount[cid]].first = m_processor_metric_table[pid].m_names_of_metrics[mid];
184 local_cid_in_table++;
185 mcount[cid]++;
186 }
187 }
188 }
189}
190
191template<typename signal_t>
192std::unordered_map<dunedaq::trgdataformats::channel_t, std::vector<std::pair<std::string, int16_t>>> ProcessorMetricCollector<signal_t>::get_metrics() {
193 return m_metrics;
194}
195
196template<typename signal_t>
200
201template<typename signal_t>
205
206} // namespace tpglibs
std::vector< std::vector< signal_t > > _get_retrieved_processor_metrics() const
std::unordered_map< dunedaq::trgdataformats::channel_t, std::vector< std::pair< std::string, int16_t > > > get_metrics()
std::vector< std::vector< std::vector< int16_t > > > _get_processor_casted_data_table()
std::vector< ProcessorMetricInformation > _get_processor_metric_table()
void configure(const std::vector< std::pair< std::string, nlohmann::json > > configs, const std::vector< std::pair< dunedaq::trgdataformats::channel_t, int16_t > > channel_plane_numbers, uint8_t num_pipelines)
std::vector< AbstractProcessor< signal_t > * > _get_attached_processors()
Dynamic array of processor metrics, templated on signal type.
signal_type_t * m_data
Pointer to contiguous metric data.
std::size_t m_size
Number of metrics in the array.