DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
tpglibs::ProcessorMetricCollector< T > Class Template Reference

#include <ProcessorMetricCollector.hpp>

Public Types

using signal_t = T
 

Public Member Functions

void attach_processor (AbstractProcessor< signal_t > &processor, size_t pipeline_id)
 
void configure (const std::vector< std::pair< std::string, nlohmann::json > > configs, const std::vector< std::pair< dunedaq::trgdataformats::channel_t, int16_t > > channel_plane_numbers, uint8_t num_pipelines)
 
std::vector< AbstractProcessor< signal_t > * > _get_attached_processors ()
 
std::vector< ProcessorMetricInformation_get_processor_metric_table ()
 
std::vector< std::vector< std::vector< int16_t > > > _get_processor_casted_data_table ()
 
void signal_collect ()
 
void run ()
 
std::vector< std::vector< signal_t > > _get_retrieved_processor_metrics () const
 
std::unordered_map< dunedaq::trgdataformats::channel_t, std::vector< std::pair< std::string, int16_t > > > get_metrics ()
 
void stop ()
 
 ProcessorMetricCollector ()
 
void lock_metric_modify ()
 
void unlock_metric_modify ()
 

Private Member Functions

void cast_metrics_from_raw_type ()
 
void collect_metrics_from_attached_processors ()
 
void convert_into_channel_metric_value ()
 

Private Attributes

std::vector< AbstractProcessor< signal_t > * > m_attached_processors
 
std::vector< ProcessorMetricInformationm_processor_metric_table
 
std::vector< std::vector< std::vector< int16_t > > > m_processor_casted_data_table
 
std::vector< std::vector< signal_t > > m_processor_metric_collection_table
 
std::vector< dunedaq::trgdataformats::channel_tm_channel_numbers
 
std::unordered_map< dunedaq::trgdataformats::channel_t, std::vector< std::pair< std::string, int16_t > > > m_metrics
 
std::mutex m_mutex
 
std::atomic< bool > m_signal_collect {false}
 
std::thread m_collector_thread
 
std::atomic< bool > m_stop_flag {false}
 
size_t m_attach_counter = 0
 

Detailed Description

template<typename T>
class tpglibs::ProcessorMetricCollector< T >

Definition at line 36 of file ProcessorMetricCollector.hpp.

Member Typedef Documentation

◆ signal_t

template<typename T >
using tpglibs::ProcessorMetricCollector< T >::signal_t = T

Definition at line 39 of file ProcessorMetricCollector.hpp.

Constructor & Destructor Documentation

◆ ProcessorMetricCollector()

template<typename T >
tpglibs::ProcessorMetricCollector< T >::ProcessorMetricCollector ( )
inline

Member Function Documentation

◆ _get_attached_processors()

template<typename signal_t >
std::vector< AbstractProcessor< signal_t > * > tpglibs::ProcessorMetricCollector< signal_t >::_get_attached_processors ( )

Definition at line 22 of file ProcessorMetricCollector.cpp.

22 {
24}
std::vector< AbstractProcessor< signal_t > * > m_attached_processors

◆ _get_processor_casted_data_table()

template<typename signal_t >
std::vector< std::vector< std::vector< int16_t > > > tpglibs::ProcessorMetricCollector< signal_t >::_get_processor_casted_data_table ( )

Definition at line 32 of file ProcessorMetricCollector.cpp.

32 {
34}
std::vector< std::vector< std::vector< int16_t > > > m_processor_casted_data_table

◆ _get_processor_metric_table()

template<typename signal_t >
std::vector< ProcessorMetricInformation > tpglibs::ProcessorMetricCollector< signal_t >::_get_processor_metric_table ( )

Definition at line 27 of file ProcessorMetricCollector.cpp.

27 {
29}
std::vector< ProcessorMetricInformation > m_processor_metric_table

◆ _get_retrieved_processor_metrics()

template<typename signal_t >
std::vector< std::vector< signal_t > > tpglibs::ProcessorMetricCollector< signal_t >::_get_retrieved_processor_metrics ( ) const

Definition at line 150 of file ProcessorMetricCollector.cpp.

150 {
151 // TODO: return collected metrics
153}
std::vector< std::vector< signal_t > > m_processor_metric_collection_table

◆ attach_processor()

template<typename T >
void tpglibs::ProcessorMetricCollector< T >::attach_processor ( AbstractProcessor< signal_t > & processor,
size_t pipeline_id )
inline

Definition at line 41 of file ProcessorMetricCollector.hpp.

41 {
42 // Attach a processor to be observed (collected) by this
44
45 // Look up and fill in information about this processor: its pipeline belonging and what is collected
46 auto metrics = processor.get_metric_items();
47 m_processor_metric_table[m_attach_counter].m_names_of_metrics = metrics;
48 m_processor_metric_table[m_attach_counter].m_pipeline_id = pipeline_id;
49
51 metrics.size(),
52 std::vector<int16_t>(16)
53 );
54
55 // Then instantiate the space for storing collected metrics
56 // Preallocated to preconfigured number of metrics
57 m_processor_metric_collection_table.push_back(std::vector<signal_t>(metrics.size()));
58
59 for (size_t i = pipeline_id*16; i < (pipeline_id+1)*16; i++) {
60 for (size_t j = 0; j < metrics.size(); j++) {
61 // Record metric name and initial value 0 for this channel
62 m_metrics[m_channel_numbers[i]].emplace_back(metrics[j], int16_t{0});
63 }
64 }
65
67 }
std::vector< dunedaq::trgdataformats::channel_t > m_channel_numbers
std::unordered_map< dunedaq::trgdataformats::channel_t, std::vector< std::pair< std::string, int16_t > > > m_metrics

◆ cast_metrics_from_raw_type()

template<typename signal_t >
void tpglibs::ProcessorMetricCollector< signal_t >::cast_metrics_from_raw_type ( )
private

Definition at line 90 of file ProcessorMetricCollector.cpp.

90 {
91 for (size_t i = 0; i < m_processor_metric_collection_table.size(); i++) {
93 for (size_t j = 0; j < raw.size(); j++) {
94 int16_t out[16];
95 _mm256_storeu_si256(reinterpret_cast<__m256i*>(&out), raw[j]);
96 for (size_t k = 0; k < 16; k++) {
98 }
99 }
100 }
101}
FELIX Initialization std::string initerror FELIX queue timed out

◆ collect_metrics_from_attached_processors()

template<typename signal_t >
void tpglibs::ProcessorMetricCollector< signal_t >::collect_metrics_from_attached_processors ( )
private

Definition at line 77 of file ProcessorMetricCollector.cpp.

77 {
78 size_t proc_id = 0;
79 for (auto processor_ptr : m_attached_processors) {
80 if (!processor_ptr) continue;
81 ProcessorMetricArray items = processor_ptr->read_from_metric_store_buffer();
82 for (size_t i = 0; i < items.m_size; i++) {
83 m_processor_metric_collection_table[proc_id][i] = items.m_data[i];
84 }
85 proc_id++;
86 }
87}

◆ configure()

template<typename signal_t >
void tpglibs::ProcessorMetricCollector< signal_t >::configure ( const std::vector< std::pair< std::string, nlohmann::json > > configs,
const std::vector< std::pair< dunedaq::trgdataformats::channel_t, int16_t > > channel_plane_numbers,
uint8_t num_pipelines )

Definition at line 37 of file ProcessorMetricCollector.cpp.

40{
41 // Stop any existing thread first - use proper synchronization
42 if (m_collector_thread.joinable()) {
43 m_stop_flag.store(true, std::memory_order_relaxed);
44 // Give the thread a chance to see the stop flag
45 std::this_thread::yield();
46 m_collector_thread.join();
47 }
48
49 // Reset all state for fresh start
50 m_signal_collect.store(false, std::memory_order_relaxed);
51 m_stop_flag.store(false, std::memory_order_relaxed);
53
54 // Clear all data structures
55 m_metrics.clear();
56
57 // create a fixed size array for reference to all the processors
58 // configs are pairs of (processor name, specific configs)
59 size_t n_processors = configs.size();
60 m_attached_processors = std::vector<AbstractProcessor<signal_t>*>(n_processors * num_pipelines);
61
62 m_processor_metric_table = std::vector<ProcessorMetricInformation>(n_processors * num_pipelines, ProcessorMetricInformation{0, {}});
63
65
67
68 m_channel_numbers = std::vector<dunedaq::trgdataformats::channel_t>(channel_plane_numbers.size(), 0);
69
70 for (size_t i = 0; i < channel_plane_numbers.size(); i++) {
71 m_channel_numbers[i] = channel_plane_numbers[i].first;
72 m_metrics[m_channel_numbers[i]] = std::vector<std::pair<std::string, int16_t>>();
73 }
74}

◆ convert_into_channel_metric_value()

template<typename signal_t >
void tpglibs::ProcessorMetricCollector< signal_t >::convert_into_channel_metric_value ( )
private

Definition at line 172 of file ProcessorMetricCollector.cpp.

172 {
173 std::scoped_lock lock(m_mutex);
174 std::vector<int16_t> mcount(m_channel_numbers.size(), 0);
175
176 for (size_t pid = 0; pid < m_processor_casted_data_table.size(); pid++) {
177 for (size_t mid = 0; mid < m_processor_casted_data_table[pid].size(); mid++) {
178 auto responsible_channels_start = m_processor_metric_table[pid].m_pipeline_id * 16;
179 size_t local_cid_in_table=0;
180 for (size_t cid = responsible_channels_start; cid < responsible_channels_start + 16; cid++) {
181 auto mnames = m_processor_metric_table[pid].m_names_of_metrics;
182 m_metrics[m_channel_numbers[cid]][mcount[cid]].second = m_processor_casted_data_table[pid][mid][local_cid_in_table];
183 m_metrics[m_channel_numbers[cid]][mcount[cid]].first = m_processor_metric_table[pid].m_names_of_metrics[mid];
184 local_cid_in_table++;
185 mcount[cid]++;
186 }
187 }
188 }
189}

◆ get_metrics()

template<typename signal_t >
std::unordered_map< dunedaq::trgdataformats::channel_t, std::vector< std::pair< std::string, int16_t > > > tpglibs::ProcessorMetricCollector< signal_t >::get_metrics ( )

Definition at line 192 of file ProcessorMetricCollector.cpp.

192 {
193 return m_metrics;
194}

◆ lock_metric_modify()

template<typename signal_t >
void tpglibs::ProcessorMetricCollector< signal_t >::lock_metric_modify ( )

Definition at line 197 of file ProcessorMetricCollector.cpp.

197 {
198 m_mutex.lock();
199}

◆ run()

template<typename signal_t >
void tpglibs::ProcessorMetricCollector< signal_t >::run ( )

Definition at line 113 of file ProcessorMetricCollector.cpp.

113 {
114 // TODO: main loop for metric collection thread
115 // Check if thread is already running
116 m_stop_flag.store(false, std::memory_order_release);
117 if (m_collector_thread.joinable()) {
118 return; // Thread already running
119 }
120
121 m_collector_thread = std::thread([this]() {
122 while (!this->m_stop_flag.load(std::memory_order_acquire)) {
123 // wait on collect signal
124 m_signal_collect.wait(false, std::memory_order_relaxed);
125
126 // when woken up, check if stop flag is set
127
128 if (this->m_stop_flag.load(std::memory_order_acquire)) {
129 break;
130 }
131
132 // If this is signaled to collect metrics
134
135 this->m_signal_collect.store(false, std::memory_order_relaxed);
136 // After collection, we reset the collect flag to false.
137 // Note that when signal_collect() is called at a far higher rate then possible, ultimately collection
138 // happens at the highest possible rate, not necessarily the set rate
139
140 // perform any post processings on the collected metrics as needed
143
144 }
145 });
146}

◆ signal_collect()

template<typename signal_t >
void tpglibs::ProcessorMetricCollector< signal_t >::signal_collect ( )

Definition at line 104 of file ProcessorMetricCollector.cpp.

104 {
105 // TODO: signal that a collection cycle should occur
106 m_signal_collect.store(true, std::memory_order_relaxed);
107 m_signal_collect.notify_one();
108 // this->collect_metrics_from_attached_processors();
109 // this->cast_metrics_from_raw_type();
110}

◆ stop()

template<typename signal_t >
void tpglibs::ProcessorMetricCollector< signal_t >::stop ( )

Definition at line 156 of file ProcessorMetricCollector.cpp.

156 {
157 // TODO: stop the collection thread
158 m_stop_flag.store(true, std::memory_order_release);
159 m_signal_collect.store(true, std::memory_order_release);
160 // make sure stop flag is set
161 while (!this->m_stop_flag.load(std::memory_order_acquire)) {
162 std::this_thread::yield();
163 }
164 // notify the thread to wake up
165 m_signal_collect.notify_one();
166 if (m_collector_thread.joinable()) {
167 m_collector_thread.join();
168 }
169}

◆ unlock_metric_modify()

template<typename signal_t >
void tpglibs::ProcessorMetricCollector< signal_t >::unlock_metric_modify ( )

Definition at line 202 of file ProcessorMetricCollector.cpp.

202 {
203 m_mutex.unlock();
204}

Member Data Documentation

◆ m_attach_counter

template<typename T >
size_t tpglibs::ProcessorMetricCollector< T >::m_attach_counter = 0
private

Definition at line 111 of file ProcessorMetricCollector.hpp.

◆ m_attached_processors

template<typename T >
std::vector<AbstractProcessor<signal_t>*> tpglibs::ProcessorMetricCollector< T >::m_attached_processors
private

Definition at line 98 of file ProcessorMetricCollector.hpp.

◆ m_channel_numbers

template<typename T >
std::vector<dunedaq::trgdataformats::channel_t> tpglibs::ProcessorMetricCollector< T >::m_channel_numbers
private

Definition at line 103 of file ProcessorMetricCollector.hpp.

◆ m_collector_thread

template<typename T >
std::thread tpglibs::ProcessorMetricCollector< T >::m_collector_thread
private

Definition at line 109 of file ProcessorMetricCollector.hpp.

◆ m_metrics

template<typename T >
std::unordered_map<dunedaq::trgdataformats::channel_t, std::vector<std::pair<std::string, int16_t> > > tpglibs::ProcessorMetricCollector< T >::m_metrics
private

Definition at line 105 of file ProcessorMetricCollector.hpp.

◆ m_mutex

template<typename T >
std::mutex tpglibs::ProcessorMetricCollector< T >::m_mutex
private

Definition at line 106 of file ProcessorMetricCollector.hpp.

◆ m_processor_casted_data_table

template<typename T >
std::vector<std::vector<std::vector<int16_t> > > tpglibs::ProcessorMetricCollector< T >::m_processor_casted_data_table
private

Definition at line 100 of file ProcessorMetricCollector.hpp.

◆ m_processor_metric_collection_table

template<typename T >
std::vector<std::vector<signal_t> > tpglibs::ProcessorMetricCollector< T >::m_processor_metric_collection_table
private

Definition at line 101 of file ProcessorMetricCollector.hpp.

◆ m_processor_metric_table

template<typename T >
std::vector<ProcessorMetricInformation> tpglibs::ProcessorMetricCollector< T >::m_processor_metric_table
private

Definition at line 99 of file ProcessorMetricCollector.hpp.

◆ m_signal_collect

template<typename T >
std::atomic<bool> tpglibs::ProcessorMetricCollector< T >::m_signal_collect {false}
private

Definition at line 108 of file ProcessorMetricCollector.hpp.

108{false};

◆ m_stop_flag

template<typename T >
std::atomic<bool> tpglibs::ProcessorMetricCollector< T >::m_stop_flag {false}
private

Definition at line 110 of file ProcessorMetricCollector.hpp.

110{false};

The documentation for this class was generated from the following files: