38 const std::vector<std::pair<dunedaq::trgdataformats::channel_t, int16_t>> channel_plane_numbers,
39 uint8_t num_pipelines)
42 if (m_collector_thread.joinable()) {
43 m_stop_flag.store(
true, std::memory_order_relaxed);
45 std::this_thread::yield();
46 m_collector_thread.join();
50 m_signal_collect.store(
false, std::memory_order_relaxed);
51 m_stop_flag.store(
false, std::memory_order_relaxed);
59 size_t n_processors = configs.size();
60 m_attached_processors = std::vector<AbstractProcessor<signal_t>*>(n_processors * num_pipelines);
62 m_processor_metric_table = std::vector<ProcessorMetricInformation>(n_processors * num_pipelines,
ProcessorMetricInformation{0, {}});
64 m_processor_metric_collection_table = {};
66 m_processor_casted_data_table = {};
68 m_channel_numbers = std::vector<dunedaq::trgdataformats::channel_t>(channel_plane_numbers.size(), 0);
70 for (
size_t i = 0; i < channel_plane_numbers.size(); i++) {
71 m_channel_numbers[i] = channel_plane_numbers[i].first;
72 m_metrics[m_channel_numbers[i]] = std::vector<std::pair<std::string, int16_t>>();
116 m_stop_flag.store(
false, std::memory_order_release);
117 if (m_collector_thread.joinable()) {
121 m_collector_thread = std::thread([
this]() {
122 while (!this->m_stop_flag.load(std::memory_order_acquire)) {
124 m_signal_collect.wait(
false, std::memory_order_relaxed);
128 if (this->m_stop_flag.load(std::memory_order_acquire)) {
133 this->collect_metrics_from_attached_processors();
135 this->m_signal_collect.store(
false, std::memory_order_relaxed);
141 this->cast_metrics_from_raw_type();
142 this->convert_into_channel_metric_value();
158 m_stop_flag.store(
true, std::memory_order_release);
159 m_signal_collect.store(
true, std::memory_order_release);
161 while (!this->m_stop_flag.load(std::memory_order_acquire)) {
162 std::this_thread::yield();
165 m_signal_collect.notify_one();
166 if (m_collector_thread.joinable()) {
167 m_collector_thread.join();
173 std::scoped_lock lock(m_mutex);
174 std::vector<int16_t> mcount(m_channel_numbers.size(), 0);
176 for (
size_t pid = 0; pid < m_processor_casted_data_table.size(); pid++) {
177 for (
size_t mid = 0; mid < m_processor_casted_data_table[pid].size(); mid++) {
178 auto responsible_channels_start = m_processor_metric_table[pid].m_pipeline_id * 16;
179 size_t local_cid_in_table=0;
180 for (
size_t cid = responsible_channels_start; cid < responsible_channels_start + 16; cid++) {
181 auto mnames = m_processor_metric_table[pid].m_names_of_metrics;
182 m_metrics[m_channel_numbers[cid]][mcount[cid]].second = m_processor_casted_data_table[pid][mid][local_cid_in_table];
183 m_metrics[m_channel_numbers[cid]][mcount[cid]].first = m_processor_metric_table[pid].m_names_of_metrics[mid];
184 local_cid_in_table++;