DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
dunedaq::fdreadoutlibs::WIBEthFrameProcessor Class Reference

#include <WIBEthFrameProcessor.hpp>

Inheritance diagram for dunedaq::fdreadoutlibs::WIBEthFrameProcessor:
[legend]
Collaboration diagram for dunedaq::fdreadoutlibs::WIBEthFrameProcessor:
[legend]

Public Types

using inherited = datahandlinglibs::TaskRawDataProcessorModel<types::DUNEWIBEthTypeAdapter>
 
using frameptr = types::DUNEWIBEthTypeAdapter*
 
using constframeptr = const types::DUNEWIBEthTypeAdapter*
 
using wibframeptr = dunedaq::fddetdataformats::WIBEthFrame*
 
- Public Types inherited from dunedaq::opmonlib::MonitorableObject
using NodePtr = std::weak_ptr<MonitorableObject>
 
using NewNodePtr = std::shared_ptr<MonitorableObject>
 
using ElementId = std::string
 

Public Member Functions

 WIBEthFrameProcessor (std::unique_ptr< datahandlinglibs::FrameErrorRegistry > &error_registry, bool processing_enabled)
 
void start (const nlohmann::json &args) override
 Start operation.
 
void stop (const nlohmann::json &args) override
 Stop operation.
 
void conf (const appmodel::DataHandlerModule *conf) override
 Set the emulator mode, if active, timestamps of processed packets are overwritten with new ones.
 
- Public Member Functions inherited from dunedaq::datahandlinglibs::TaskRawDataProcessorModel< types::DUNEWIBEthTypeAdapter >
 TaskRawDataProcessorModel (std::unique_ptr< FrameErrorRegistry > &error_registry, bool post_processing_enabled)
 
 ~TaskRawDataProcessorModel ()
 
void scrap (const nlohmann::json &) override
 Unconfigure.
 
void reset_last_daq_time ()
 
std::uint64_t get_last_daq_time () override
 Get newest timestamp of last seen packet.
 
void preprocess_item (types::DUNEWIBEthTypeAdapter *item) override
 Preprocess one element.
 
void postprocess_item (const types::DUNEWIBEthTypeAdapter *item) override
 Postprocess one element.
 
void add_preprocess_task (Task &&task)
 
void add_postprocess_task (Task &&task)
 
void invoke_all_preprocess_functions (types::DUNEWIBEthTypeAdapter *item)
 
void launch_all_preprocess_functions (types::DUNEWIBEthTypeAdapter *item)
 
- Public Member Functions inherited from dunedaq::datahandlinglibs::RawDataProcessorConcept< types::DUNEWIBEthTypeAdapter >
 RawDataProcessorConcept ()
 
 RawDataProcessorConcept (const RawDataProcessorConcept &)=delete
 RawDataProcessorConcept is not copy-constructible.
 
 RawDataProcessorConcept (RawDataProcessorConcept &&)=delete
 RawDataProcessorConcept is not move-constructible.
 
virtual ~RawDataProcessorConcept ()
 
RawDataProcessorConceptoperator= (const RawDataProcessorConcept &)=delete
 RawDataProcessorConcept is not copy-assginable.
 
RawDataProcessorConceptoperator= (RawDataProcessorConcept &&)=delete
 RawDataProcessorConcept is not move-assignable.
 
- Public Member Functions inherited from dunedaq::opmonlib::MonitorableObject
 MonitorableObject (const MonitorableObject &)=delete
 
MonitorableObjectoperator= (const MonitorableObject &)=delete
 
 MonitorableObject (MonitorableObject &&)=delete
 
MonitorableObjectoperator= (MonitorableObject &&)=delete
 
virtual ~MonitorableObject ()=default
 
auto get_opmon_id () const noexcept
 
auto get_opmon_level () const noexcept
 

Protected Member Functions

virtual void generate_opmon_data () override
 
void publish_processor_metric_to_opmon ()
 
void publish_processor_metric_to_opmon_with_aggregation ()
 
std::map< int16_t, std::map< std::string, std::tuple< float, int16_t, int16_t, float, dunedaq::trgdataformats::channel_t, dunedaq::trgdataformats::channel_t > > > calculate_all_metric_summaries_across_planes (const std::unordered_map< dunedaq::trgdataformats::channel_t, std::vector< std::pair< std::string, int16_t > > > &metrics)
 
void use_pattern_generator (frameptr fp)
 
void sequence_check (frameptr fp)
 
void timestamp_check (frameptr fp)
 
void find_hits (constframeptr fp)
 
- Protected Member Functions inherited from dunedaq::datahandlinglibs::TaskRawDataProcessorModel< types::DUNEWIBEthTypeAdapter >
void run_post_processing_thread (std::function< void(const types::DUNEWIBEthTypeAdapter *)> &function, folly::ProducerConsumerQueue< const types::DUNEWIBEthTypeAdapter * > &queue)
 
- Protected Member Functions inherited from dunedaq::opmonlib::MonitorableObject
 MonitorableObject ()=default
 
void register_node (ElementId name, NewNodePtr)
 
void publish (google::protobuf::Message &&, CustomOrigin &&co={}, OpMonLevel l=to_level(EntryOpMonLevel::kDefault)) const noexcept
 

Protected Attributes

dunedaq::daqdataformats::timestamp_t m_previous_ts = 0
 
dunedaq::daqdataformats::timestamp_t m_current_ts = 0
 
uint16_t m_previous_seq_id = 0
 
uint16_t m_current_seq_id = 0
 
dunedaq::daqdataformats::timestamp_t m_pattern_generator_previous_ts = 0
 
dunedaq::daqdataformats::timestamp_t m_pattern_generator_current_ts = 0
 
bool m_first_ts_missmatch = true
 
bool m_ts_problem_reported = false
 
bool m_ts_error_state = false
 
std::atomic< uint64_t > m_ts_error_ctr { 0 }
 
bool m_first_seq_id_mismatch = true
 
bool m_seq_id_problem_reported = false
 
bool m_seq_id_error_state = false
 
std::atomic< uint64_t > m_seq_id_error_ctr { 0 }
 
std::atomic< int16_t > m_seq_id_min_jump { 0 }
 
std::atomic< int16_t > m_seq_id_max_jump { 0 }
 
- Protected Attributes inherited from dunedaq::datahandlinglibs::TaskRawDataProcessorModel< types::DUNEWIBEthTypeAdapter >
std::vector< std::function< void(types::DUNEWIBEthTypeAdapter *)> > m_preprocess_functions
 
std::unique_ptr< FrameErrorRegistry > & m_error_registry
 
bool m_post_processing_enabled
 
std::atomic< boolm_run_marker
 
std::vector< std::function< void(const types::DUNEWIBEthTypeAdapter *)> > m_post_process_functions
 
std::vector< std::unique_ptr< folly::ProducerConsumerQueue< const types::DUNEWIBEthTypeAdapter * > > > m_items_to_postprocess_queues
 
std::vector< std::unique_ptr< utilities::ReusableThread > > m_post_process_threads
 
size_t m_postprocess_queue_sizes
 
daqdataformats::SourceID m_sourceid
 
std::atomic< uint64_t > m_last_processed_daq_ts
 

Private Attributes

bool m_first_hit = true
 
bool m_tpg_metric_collect_enabled {false}
 
uint32_t m_metric_collect_opmon_period { 128 }
 
std::unique_ptr< tpglibs::TPGeneratorm_tp_generator
 
std::vector< std::pair< std::string, nlohmann::json > > m_tpg_configs
 
uint32_t m_tp_max_width
 
std::set< unsigned int > m_channel_mask_set
 
uint16_t m_tpg_threshold_selected
 
std::map< uint, std::atomic< int > > m_tp_channel_rate_map
 
size_t m_num_msg = 0
 
size_t m_num_push_fail = 0
 
std::atomic< int > m_tpg_hits_count { 0 }
 
uint32_t m_det_id
 
uint32_t m_crate_id
 
uint32_t m_slot_id
 
uint32_t m_stream_id
 
bool m_emulator_mode = false
 
std::shared_ptr< detchannelmaps::TPCChannelMap > m_channel_map
 
std::vector< std::pair< trgdataformats::channel_t, int16_t > > m_channel_plane_numbers
 
std::vector< trigger::TriggerPrimitiveTypeAdapterm_tpa_vectors [3]
 
std::shared_ptr< iomanager::SenderConcept< std::vector< trigger::TriggerPrimitiveTypeAdapter > > > m_tp_sink [3]
 
std::shared_ptr< iomanager::SenderConcept< fddetdataformats::WIBEthFrame > > m_err_frame_sink
 
daqdataformats::SourceID m_sourceid
 
std::atomic< uint64_t > m_new_hits { 0 }
 
std::atomic< uint64_t > m_new_tps { 0 }
 
std::atomic< uint64_t > m_tps_suppressed_too_long { 0 }
 
std::atomic< uint64_t > m_tps_send_failed { 0 }
 
std::atomic< uint64_t > m_frame_counter { 0 }
 
std::chrono::time_point< std::chrono::high_resolution_clock > m_t0
 

Additional Inherited Members

- Static Public Member Functions inherited from dunedaq::opmonlib::MonitorableObject
static bool publishable_metric (OpMonLevel entry, OpMonLevel system) noexcept
 

Detailed Description

Definition at line 52 of file WIBEthFrameProcessor.hpp.

Member Typedef Documentation

◆ constframeptr

◆ frameptr

◆ inherited

◆ wibframeptr

Constructor & Destructor Documentation

◆ WIBEthFrameProcessor()

dunedaq::fdreadoutlibs::WIBEthFrameProcessor::WIBEthFrameProcessor ( std::unique_ptr< datahandlinglibs::FrameErrorRegistry > & error_registry,
bool processing_enabled )
explicit

Definition at line 32 of file WIBEthFrameProcessor.cpp.

33 : TaskRawDataProcessorModel<types::DUNEWIBEthTypeAdapter>(error_registry, processing_enabled)
34{
35}

Member Function Documentation

◆ calculate_all_metric_summaries_across_planes()

std::map< int16_t, std::map< std::string, std::tuple< float, int16_t, int16_t, float, dunedaq::trgdataformats::channel_t, dunedaq::trgdataformats::channel_t > > > dunedaq::fdreadoutlibs::WIBEthFrameProcessor::calculate_all_metric_summaries_across_planes ( const std::unordered_map< dunedaq::trgdataformats::channel_t, std::vector< std::pair< std::string, int16_t > > > & metrics)
protected

Optimized version that calculates all metric summaries across all planes in a single pass Returns a map of plane_number -> map of metric_name -> summary statistics

Definition at line 249 of file WIBEthFrameProcessor.cpp.

249 {
250 // Structure to hold all statistics: plane -> metric -> (mean, min, max, stddev, min_channel_id, max_channel_id)
251 std::map<int16_t, std::map<std::string, std::tuple<float, int16_t, int16_t, float, dunedaq::trgdataformats::channel_t, dunedaq::trgdataformats::channel_t>>> all_stats;
252
253 // Structure to accumulate statistics: plane -> metric -> (count, mean, M2, min, max, min_channel_id, max_channel_id)
254 std::map<int16_t, std::map<std::string, std::tuple<size_t, double, double, int16_t, int16_t, dunedaq::trgdataformats::channel_t, dunedaq::trgdataformats::channel_t>>> accumulators;
255
256 // Single pass through all metrics to collect data using Welford's online algorithm for variance
257 for (const auto& [channel, vec] : metrics) {
258 if (!m_channel_map) continue;
259
260 int16_t plane = m_channel_map->get_plane_from_offline_channel(channel);
261
262 for (const auto& [name, val] : vec) {
263 auto& [count, mean, M2, min, max, min_channel_id, max_channel_id] = accumulators[plane][name];
264
265 count++;
266
267 if (count == 1 || val < min) {
268 min = val;
269 min_channel_id = channel;
270 }
271 if (count == 1 || val > max) {
272 max = val;
273 max_channel_id = channel;
274 }
275
276 // Welford's online algorithm for variance calculation
277 if (count == 1) {
278 // First value: initialize mean and M2
279 mean = val;
280 M2 = 0.0;
281 } else {
282 // Update mean and M2 using Welford's algorithm
283 double delta = val - mean;
284 mean += delta / count;
285 double delta2 = val - mean;
286 M2 += delta * delta2;
287 }
288 }
289 }
290
291 // Calculate final statistics from accumulated data
292 for (const auto& [plane, metric_map] : accumulators) {
293 for (const auto& [metric_name, acc_data] : metric_map) {
294 const auto& [count, mean, M2, min, max, min_channel_id, max_channel_id] = acc_data;
295
296 if (count == 0) continue;
297
298 float stddev = 0.0f;
299
300 // Calculate standard deviation using accumulated M2
301 if (count > 1) {
302 stddev = std::sqrt(M2 / (count - 1));
303 }
304
305 all_stats[plane][metric_name] = std::make_tuple(static_cast<float>(mean), min, max, stddev, min_channel_id, max_channel_id);
306 }
307 }
308
309 return all_stats;
310}
std::shared_ptr< detchannelmaps::TPCChannelMap > m_channel_map

◆ conf()

void dunedaq::fdreadoutlibs::WIBEthFrameProcessor::conf ( const appmodel::DataHandlerModule * conf)
overridevirtual

Set the emulator mode, if active, timestamps of processed packets are overwritten with new ones.

Reimplemented from dunedaq::datahandlinglibs::TaskRawDataProcessorModel< types::DUNEWIBEthTypeAdapter >.

Definition at line 83 of file WIBEthFrameProcessor.cpp.

84{
85 size_t idx = 0;
86 for (auto output : conf->get_outputs()) {
87 try {
88 if (output->get_data_type() == "TriggerPrimitiveVector") {
90 }
91 } catch (const ers::Issue& excpt) {
92 ers::error(datahandlinglibs::ResourceQueueError(ERS_HERE, "tp", "DefaultRequestHandlerModel", excpt));
93 }
94 }
95
96 m_sourceid.id = conf->get_source_id();
98 auto geo_id = conf->get_geo_id();
99 if (geo_id != nullptr) {
100 m_det_id = geo_id->get_detector_id();
101 m_crate_id = geo_id->get_crate_id();
102 m_slot_id = geo_id->get_slot_id();
103 m_stream_id = geo_id->get_stream_id();
104 }
105 m_emulator_mode = conf->get_emulation_mode();
106
107 // Setup pre-processing pipeline
108 if (!m_emulator_mode)
109 inherited::add_preprocess_task(std::bind(&WIBEthFrameProcessor::sequence_check, this, std::placeholders::_1));
110
111 inherited::add_preprocess_task(std::bind(&WIBEthFrameProcessor::timestamp_check, this, std::placeholders::_1));
112
113 // Check it post-processing is active
114 auto dp = conf->get_module_configuration()->get_data_processor();
115 if (dp != nullptr) {
116 auto proc_conf = dp->cast<appmodel::TPCRawDataProcessor>();
117 if (proc_conf != nullptr && m_post_processing_enabled) {
118 m_tp_generator = std::make_unique<tpglibs::TPGenerator>();
119
120 // Set the minimum TP samples over threshold.
121 auto conf_sot_minima = proc_conf->get_sot_minima();
122 std::vector<uint16_t> sot_minima{conf_sot_minima->get_sot_minimum_plane0(),
123 conf_sot_minima->get_sot_minimum_plane1(),
124 conf_sot_minima->get_sot_minimum_plane2()};
125 m_tp_generator->set_sot_minima(sot_minima);
126
127 const std::vector<unsigned int> channel_mask_vec = proc_conf->get_channel_mask();
128
129 std::vector<const appmodel::ProcessingStep*> processing_steps = proc_conf->get_processing_steps();
130 for (auto step : processing_steps) {
131 m_tpg_configs.push_back(std::make_pair(step->class_name(), step->to_json(false).back()));
132 }
133
134 // Setup post-processing pipeline
135 m_channel_map = dunedaq::detchannelmaps::make_tpc_map(proc_conf->get_channel_map());
136 for (int chan = 0; chan < 64; chan++) {
137 trgdataformats::channel_t off_channel = m_channel_map->get_offline_channel_from_det_crate_slot_stream_chan(m_det_id, m_crate_id, m_slot_id, m_stream_id, chan);
138 int16_t plane = m_channel_map->get_plane_from_offline_channel(off_channel);
139 m_channel_plane_numbers.push_back(std::make_pair(off_channel, plane));
140
141 // This processor only needs to handle some (maybe 0) of the masked channels.
142 // Only get those relevant channels for the later check.
143 if (std::find(channel_mask_vec.begin(), channel_mask_vec.end(), off_channel) != channel_mask_vec.end())
144 m_channel_mask_set.insert(off_channel);
145 }
146
147 m_metric_collect_opmon_period = proc_conf->get_metric_collect_opmon_rate();
148
149 // Let the TPG generator configure
150
152
153 // After it sees the configs, it will set the metric collector enable state
154
155 m_tpg_metric_collect_enabled = m_tp_generator->get_metric_collector_enable_state();
156
157 inherited::add_postprocess_task(std::bind(&WIBEthFrameProcessor::find_hits, this, std::placeholders::_1));
158 }
159 }
161}
#define ERS_HERE
void conf(const appmodel::DataHandlerModule *conf) override
Set the emulator mode, if active, timestamps of processed packets are overwritten with new ones.
std::unique_ptr< tpglibs::TPGenerator > m_tp_generator
std::shared_ptr< iomanager::SenderConcept< std::vector< trigger::TriggerPrimitiveTypeAdapter > > > m_tp_sink[3]
std::vector< std::pair< trgdataformats::channel_t, int16_t > > m_channel_plane_numbers
std::vector< std::pair< std::string, nlohmann::json > > m_tpg_configs
Base class for any user define issue.
Definition Issue.hpp:69
static std::shared_ptr< iomanager::SenderConcept< Datatype > > get_iom_sender(iomanager::ConnectionId const &id)
void error(const Issue &issue)
Definition ers.hpp:81
Subsystem subsystem
The general subsystem of the source of the data.
Definition SourceID.hpp:69
ID_t id
Unique identifier of the source of the data.
Definition SourceID.hpp:74
static const constexpr daqdataformats::SourceID::Subsystem subsystem

◆ find_hits()

void dunedaq::fdreadoutlibs::WIBEthFrameProcessor::find_hits ( constframeptr fp)
protected

Pipeline Stage 2.: Do software TPG

Definition at line 466 of file WIBEthFrameProcessor.cpp.

467{
468 size_t nhits = 0;
469 if (!fp)
470 return;
471 auto wfptr = reinterpret_cast<dunedaq::fddetdataformats::WIBEthFrame*>((uint8_t*)fp); // NOLINT
472
473 // Check that the system is properly configured from the first hit.
474 if (m_first_hit) {
475 if (wfptr->daq_header.crate_id != m_crate_id || wfptr->daq_header.slot_id != m_slot_id || wfptr->daq_header.stream_id != m_stream_id) {
476 ers::error(LinkMisconfiguration(ERS_HERE, wfptr->daq_header.crate_id, wfptr->daq_header.slot_id, wfptr->daq_header.stream_id, m_crate_id, m_slot_id, m_stream_id));
477 }
478
479 m_first_hit = false;
480 }
481
482 std::vector<trgdataformats::TriggerPrimitive> tps = (*m_tp_generator)(wfptr);
483 m_frame_counter.fetch_add(1, std::memory_order_relaxed);
484 if (m_tpg_metric_collect_enabled && m_frame_counter.load(std::memory_order_relaxed) % m_metric_collect_opmon_period == 0) {
485 m_tp_generator->signal_metric_collection();
486 }
487
488 for (const auto& tp : tps) {
489 // If this TP is on a masked channel, skip it.
490 if (std::binary_search(m_channel_mask_set.begin(), m_channel_mask_set.end(), tp.channel))
491 continue;
492 // Need to move into a type adapter.
493 trigger::TriggerPrimitiveTypeAdapter tpa;
494 tpa.tp = tp;
495
496 tpa.tp.detid = m_det_id; // Last missing piece.
497 m_tpa_vectors[m_channel_map->get_plane_from_offline_channel(tp.channel)].push_back(tpa);
498 m_tp_channel_rate_map[tp.channel]++;
499 }
500
501 if (m_frame_counter.load(std::memory_order_relaxed) % 100 == 0) { // FIXME: Hard-coding 100 for now. This should be defined elsewhere or configurable.
502 for (int i = 0; i < 3; i++) {
503 int new_tps = m_tpa_vectors[i].size();
504 if (new_tps == 0) {
505 continue;
506 }
507 const auto s_ts_begin = m_tpa_vectors[i].front().tp.time_start;
508 const auto channel_begin = m_tpa_vectors[i].front().tp.channel;
509 const auto s_ts_end = m_tpa_vectors[i].back().tp.time_start;
510 const auto channel_end = m_tpa_vectors[i].back().tp.channel;
511 if (!m_tp_sink[i]->try_send(std::move(m_tpa_vectors[i]), iomanager::Sender::s_no_block)) {
512 ers::warning(FailedToSendTPVector(ERS_HERE, s_ts_begin, channel_begin, s_ts_end, channel_end));
514 } else {
515 m_new_tps += new_tps;
516 nhits += new_tps;
517 }
518 }
519 }
520
521 m_tpg_hits_count += nhits;
522 return;
523}
Class for accessing raw WIB eth frames, as used in ProtoDUNE-II.
std::vector< trigger::TriggerPrimitiveTypeAdapter > m_tpa_vectors[3]
std::map< uint, std::atomic< int > > m_tp_channel_rate_map
static constexpr timeout_t s_no_block
Definition Sender.hpp:26
void warning(const Issue &issue)
Definition ers.hpp:115

◆ generate_opmon_data()

void dunedaq::fdreadoutlibs::WIBEthFrameProcessor::generate_opmon_data ( )
overrideprotectedvirtual

Hook for customisable pubblication. The function can throw, exception will be caught by the monitoring thread

Reimplemented from dunedaq::datahandlinglibs::TaskRawDataProcessorModel< types::DUNEWIBEthTypeAdapter >.

Definition at line 164 of file WIBEthFrameProcessor.cpp.

165{
166 datahandlinglibs::opmon::FixedRateDataProcessorInfo info;
167
168 info.set_num_seq_id_errors(m_seq_id_error_ctr.load());
169 info.set_min_seq_id_jump(m_seq_id_min_jump.exchange(0));
170 info.set_max_seq_id_jump(m_seq_id_max_jump.exchange(0));
171
172 info.set_num_ts_errors(m_ts_error_ctr.load());
173
174 publish(std::move(info));
175
176 m_error_registry->log_registered_errors();
177
179 auto now = std::chrono::high_resolution_clock::now();
180 int new_hits = m_tpg_hits_count.exchange(0);
181 int new_tps = m_new_tps.exchange(0);
182 int new_tps_suppressed_too_long = m_tps_suppressed_too_long.exchange(0);
183 int new_tps_send_failed = m_tps_send_failed.exchange(0);
184 double seconds = std::chrono::duration_cast<std::chrono::microseconds>(now - m_t0).count() / 1000000.;
185 TLOG_DEBUG(TLVL_BOOKKEEPING) << "Hit rate: " << std::to_string(new_hits / seconds / 1000.) << " [kHz]";
186 TLOG_DEBUG(TLVL_BOOKKEEPING) << "Total new hits: " << new_hits << " new TPs: " << new_tps;
187
188 datahandlinglibs::opmon::HitFindingInfo tp_info;
189 tp_info.set_rate_tp_hits(new_hits / seconds / 1000.);
190
191 tp_info.set_num_tps_sent(new_tps);
192 tp_info.set_num_tps_suppressed_too_long(new_tps_suppressed_too_long);
193 tp_info.set_num_tps_send_failed(new_tps_send_failed);
194
195 publish(std::move(tp_info));
196 // Find the channels with the top TP rates
197 // Create a vector of pairs to store the map elements
198 std::vector<std::pair<uint, int>> channel_tp_rate_vec(m_tp_channel_rate_map.begin(), m_tp_channel_rate_map.end());
199 // Sort the vector in descending order of the value of the pairs
200 sort(channel_tp_rate_vec.begin(), channel_tp_rate_vec.end(), [](std::pair<uint, int>& a, std::pair<uint, int>& b) { return a.second > b.second; });
201 // Add the metrics to opmon
202 // For convenience we are selecting only the top 10 elements
203 if (channel_tp_rate_vec.size() != 0) {
204 int top_highest_values = 10;
205 if (channel_tp_rate_vec.size() < 10) {
206 top_highest_values = channel_tp_rate_vec.size();
207 }
208 //datahandlinglibs::opmon::TPChannelsInfo channels_info;
209 for (int i = 0; i < top_highest_values; i++) {
210 datahandlinglibs::opmon::TPChannelInfo tpc_info;
211 tpc_info.set_number_of_tps(channel_tp_rate_vec[i].second);
212 tpc_info.set_channel_id(channel_tp_rate_vec[i].first);
213 publish(std::move(tpc_info), {{"channel", std::to_string(channel_tp_rate_vec[i].first)}});
214 }
215 }
216
217 // Reset the counter in the channel rate map
218 for (auto& el : m_tp_channel_rate_map) {
219 el.second = 0;
220 }
221 m_t0 = now;
222
226 }
227 }
228
230 }
std::chrono::time_point< std::chrono::high_resolution_clock > m_t0
void publish(google::protobuf::Message &&, CustomOrigin &&co={}, OpMonLevel l=to_level(EntryOpMonLevel::kDefault)) const noexcept
static int64_t now()
#define TLOG_DEBUG(lvl,...)
Definition Logging.hpp:112

◆ publish_processor_metric_to_opmon()

void dunedaq::fdreadoutlibs::WIBEthFrameProcessor::publish_processor_metric_to_opmon ( )
protected

Publishes collected processor metrics to opmon, currently called in generate_opmon_data()

Definition at line 233 of file WIBEthFrameProcessor.cpp.

233 {
234 auto metrics = m_tp_generator->get_processor_metrics();
235 for (const auto& [channel, vec] : metrics) {
236 datahandlinglibs::opmon::TPGProcessorInfo tpg_proc_info;
237 for (const auto& [name, val] : vec) {
238 if (name == "m_pedestal") {
239 tpg_proc_info.set_pedestal(val);
240 } else if (name == "m_accum") {
241 tpg_proc_info.set_accum(val);
242 }
243 }
244 publish(std::move(tpg_proc_info), {{"channel", std::to_string(channel)}});
245 }
246}

◆ publish_processor_metric_to_opmon_with_aggregation()

void dunedaq::fdreadoutlibs::WIBEthFrameProcessor::publish_processor_metric_to_opmon_with_aggregation ( )
protected

Publishes collected processor metrics to opmon, with aggregation of metrics to summary statistics across physical planes

Definition at line 313 of file WIBEthFrameProcessor.cpp.

313 {
314 auto metrics = m_tp_generator->get_processor_metrics();
315
316 // Use optimized single-pass calculation for all metrics across all planes
317 auto all_stats = calculate_all_metric_summaries_across_planes(metrics);
318
319 // Publish all calculated statistics
320 for (const auto& [plane, metric_map] : all_stats) {
321 for (const auto& [metric_name, stats] : metric_map) {
322 const auto& [mean, min, max, stddev, min_channel_id, max_channel_id] = stats;
323
324 datahandlinglibs::opmon::TPGProcessorReducedInfo info;
325 info.set_average(mean);
326 info.set_max(max);
327 info.set_min(min);
328 info.set_standard_dev(stddev);
329 info.set_max_channel_id(max_channel_id);
330 info.set_min_channel_id(min_channel_id);
331 publish(std::move(info), {{"plane", std::to_string(plane)}, {"metric", metric_name}});
332 }
333 }
334}
std::map< int16_t, std::map< std::string, std::tuple< float, int16_t, int16_t, float, dunedaq::trgdataformats::channel_t, dunedaq::trgdataformats::channel_t > > > calculate_all_metric_summaries_across_planes(const std::unordered_map< dunedaq::trgdataformats::channel_t, std::vector< std::pair< std::string, int16_t > > > &metrics)

◆ sequence_check()

void dunedaq::fdreadoutlibs::WIBEthFrameProcessor::sequence_check ( frameptr fp)
protected

Pipeline Stage 1.: Check proper sequence id increments in DAQ Eth header

Pipeline Stage 1.: Check proper timestamp increments in WIB frame

Definition at line 341 of file WIBEthFrameProcessor.cpp.

342{
343 // FIXME: Make source emulator deal with this! Hard to do since source emu is templated...
344 /* If EMU data, emulate perfectly incrementing timestamp
345 if (m_emulator_mode) {
346 // uint64_t ts_next = m_previous_seq_id + 1; // NOLINT(build/unsigned)
347 auto wf = reinterpret_cast<wibframeptr>(((uint8_t*)fp)); // NOLINT
348 for (unsigned int i = 0; i < fp->get_num_frames(); ++i) { // NOLINT(build/unsigned)
349 //auto wfh = const_cast<dunedaq::fddetdataformats::WIBEthFrame*>(wf->header());
350 wf->daq_header.crate_id = m_crate_id;
351 wf->daq_header.slot_id = m_slot_id;
352 wf->daq_header.stream_id = m_stream_id;
353 wf->daq_header.seq_id = (m_previous_seq_id+i) & 0xfff;
354 wf++;
355 }
356 }
357 */
358
359 // Acquire timestamp
360 auto wfptr = reinterpret_cast<dunedaq::fddetdataformats::WIBEthFrame*>(fp); // NOLINT
362
363 // Check sequence id
364 // Calculate the next sequence id (12 bits)
365 uint16_t expected_seq_id = (m_previous_seq_id + fp->get_num_frames()) & 0xfff;
366 int16_t delta_seq_id = m_current_seq_id-expected_seq_id;
367 if ( delta_seq_id > 0x800) {
368 delta_seq_id -= 0x1000;
369 } else if ( delta_seq_id < -0x7ff) {
370 delta_seq_id += 0x1000;
371 }
372
373 if (delta_seq_id == 0) {
374 m_seq_id_error_state = false;
375 } else {
376 // uint16_t delta_seq_id = (m_current_seq_id-expected_seq_id);
378 m_seq_id_max_jump = std::max(delta_seq_id, m_seq_id_max_jump.load());
379 m_seq_id_min_jump = std::min(delta_seq_id, m_seq_id_min_jump.load());
380
381 if (m_first_seq_id_mismatch) { // log once
382 TLOG_DEBUG(TLVL_BOOKKEEPING) << "First sequence id MISMATCH! -> | previous: " << std::to_string(m_previous_seq_id) << " current: " + std::to_string(m_current_seq_id);
384 } else {
386 m_error_registry->add_error("Sequence ID jump", datahandlinglibs::FrameErrorRegistry::ErrorInterval(expected_seq_id, m_current_seq_id));
388 }
389 }
390 }
391
392 if (m_seq_id_error_ctr > 1000) {
394 TLOG() << "*** Data Integrity ERROR *** Sequence ID continuity is completely broken! "
395 << "Something is wrong with the FE source or with the configuration!";
397 }
398 }
399
401
402}
detdataformats::DAQEthHeader daq_header
#define TLOG(...)
Definition macro.hpp:22

◆ start()

void dunedaq::fdreadoutlibs::WIBEthFrameProcessor::start ( const nlohmann::json & )
overridevirtual

Start operation.

Reimplemented from dunedaq::datahandlinglibs::TaskRawDataProcessorModel< types::DUNEWIBEthTypeAdapter >.

Definition at line 38 of file WIBEthFrameProcessor.cpp.

39{
40 // Reset software TPG resources
44 }
45
46 // Reset timestamp check
47 m_previous_ts = 0;
48 m_current_ts = 0;
51 m_ts_error_state = false;
53
58
59
60 // Reset stats
61 m_t0 = std::chrono::high_resolution_clock::now();
62 m_new_hits = 0;
63 m_new_tps = 0;
64 m_tpg_hits_count.exchange(0);
65 inherited::start(args);
66}
dunedaq::daqdataformats::timestamp_t m_current_ts
dunedaq::daqdataformats::timestamp_t m_previous_ts

◆ stop()

void dunedaq::fdreadoutlibs::WIBEthFrameProcessor::stop ( const nlohmann::json & )
overridevirtual

Stop operation.

Reimplemented from dunedaq::datahandlinglibs::TaskRawDataProcessorModel< types::DUNEWIBEthTypeAdapter >.

Definition at line 69 of file WIBEthFrameProcessor.cpp.

70{
71 inherited::stop(args);
74 m_tp_generator->free_metric_collector();
75 }
76 // Clears the pipelines and resets with the given configs.
77 m_tp_generator->set_metric_collector_enable_state(m_tpg_metric_collect_enabled);
79 }
80}

◆ timestamp_check()

void dunedaq::fdreadoutlibs::WIBEthFrameProcessor::timestamp_check ( frameptr fp)
protected

Pipeline Stage 1.: Check proper timestamp increments in DAQ Eth header

Pipeline Stage 1.: Check proper timestamp increments in WIB frame

Definition at line 408 of file WIBEthFrameProcessor.cpp.

409{
410
411 uint16_t wibeth_tick_difference = types::DUNEWIBEthTypeAdapter::expected_tick_difference;
412 uint16_t wibeth_frame_tick_difference = wibeth_tick_difference * fp->get_num_frames();
413
414 // FIXME: let source emulator deal with this!
415 /* If EMU data, emulate perfectly incrementing timestamp
416 if (m_emulator_mode) { // emulate perfectly incrementing timestamp
417 uint64_t ts_next = m_previous_ts + wibeth_frame_tick_difference; // NOLINT(build/unsigned)
418 auto wf = reinterpret_cast<wibframeptr>(((uint8_t*)fp)); // NOLINT
419 for (unsigned int i = 0; i < fp->get_num_frames(); ++i) { // NOLINT(build/unsigned)
420 //auto wfh = const_cast<dunedaq::fddetdataformats::WIBEthFrame*>(wf->header());
421 wf->daq_header.crate_id = m_crate_id;
422 wf->daq_header.slot_id = m_slot_id;
423 wf->daq_header.stream_id = m_stream_id;
424 wf->set_timestamp(ts_next);
425 ts_next += wibeth_tick_difference;
426 wf++;
427 }
428 }*/
429
430 auto wfptr = reinterpret_cast<dunedaq::fddetdataformats::WIBEthFrame*>(fp); // NOLINT
431 m_current_ts = wfptr->get_timestamp();
432
433 // Check timestamp
434 if (m_previous_ts > 0 &&
435 m_current_ts - m_previous_ts != wibeth_frame_tick_difference) [[unlikely]] {
437 if (m_first_ts_missmatch) { // log once
438 TLOG_DEBUG(TLVL_BOOKKEEPING) << "First timestamp MISMATCH! -> | previous: " << std::to_string(m_previous_ts) << " current: " + std::to_string(m_current_ts);
439 m_first_ts_missmatch = false;
440 } else {
441 if (!m_ts_error_state) {
442 m_error_registry->add_error("Timestamp jump", datahandlinglibs::FrameErrorRegistry::ErrorInterval(m_previous_ts + wibeth_frame_tick_difference, m_current_ts));
443 m_ts_error_state = true;
444 }
445 }
446 } else {
447 m_ts_error_state = false;
448 }
449
450 if (m_ts_error_ctr > 1000) {
452 TLOG() << "*** Data Integrity ERROR *** Timestamp continuity is completely broken! "
453 << "Something is wrong with the FE source or with the configuration!";
455 }
456 }
457
460}
uint64_t get_timestamp() const
Get the starting 64-bit timestamp of the frame.

◆ use_pattern_generator()

void dunedaq::fdreadoutlibs::WIBEthFrameProcessor::use_pattern_generator ( frameptr fp)
protected

Pipeline Stage 0: Pattern generator for hit finding in emulated mode

Member Data Documentation

◆ m_channel_map

std::shared_ptr<detchannelmaps::TPCChannelMap> dunedaq::fdreadoutlibs::WIBEthFrameProcessor::m_channel_map
private

Definition at line 161 of file WIBEthFrameProcessor.hpp.

◆ m_channel_mask_set

std::set<unsigned int> dunedaq::fdreadoutlibs::WIBEthFrameProcessor::m_channel_mask_set
private

Definition at line 145 of file WIBEthFrameProcessor.hpp.

◆ m_channel_plane_numbers

std::vector<std::pair<trgdataformats::channel_t, int16_t> > dunedaq::fdreadoutlibs::WIBEthFrameProcessor::m_channel_plane_numbers
private

Definition at line 165 of file WIBEthFrameProcessor.hpp.

◆ m_crate_id

uint32_t dunedaq::fdreadoutlibs::WIBEthFrameProcessor::m_crate_id
private

Definition at line 156 of file WIBEthFrameProcessor.hpp.

◆ m_current_seq_id

uint16_t dunedaq::fdreadoutlibs::WIBEthFrameProcessor::m_current_seq_id = 0
protected

Definition at line 96 of file WIBEthFrameProcessor.hpp.

◆ m_current_ts

dunedaq::daqdataformats::timestamp_t dunedaq::fdreadoutlibs::WIBEthFrameProcessor::m_current_ts = 0
protected

Definition at line 93 of file WIBEthFrameProcessor.hpp.

◆ m_det_id

uint32_t dunedaq::fdreadoutlibs::WIBEthFrameProcessor::m_det_id
private

Definition at line 155 of file WIBEthFrameProcessor.hpp.

◆ m_emulator_mode

bool dunedaq::fdreadoutlibs::WIBEthFrameProcessor::m_emulator_mode = false
private

Definition at line 159 of file WIBEthFrameProcessor.hpp.

◆ m_err_frame_sink

std::shared_ptr<iomanager::SenderConcept<fddetdataformats::WIBEthFrame> > dunedaq::fdreadoutlibs::WIBEthFrameProcessor::m_err_frame_sink
private

Definition at line 169 of file WIBEthFrameProcessor.hpp.

◆ m_first_hit

bool dunedaq::fdreadoutlibs::WIBEthFrameProcessor::m_first_hit = true
private

Definition at line 139 of file WIBEthFrameProcessor.hpp.

◆ m_first_seq_id_mismatch

bool dunedaq::fdreadoutlibs::WIBEthFrameProcessor::m_first_seq_id_mismatch = true
protected

Definition at line 106 of file WIBEthFrameProcessor.hpp.

◆ m_first_ts_missmatch

bool dunedaq::fdreadoutlibs::WIBEthFrameProcessor::m_first_ts_missmatch = true
protected

Definition at line 101 of file WIBEthFrameProcessor.hpp.

◆ m_frame_counter

std::atomic<uint64_t> dunedaq::fdreadoutlibs::WIBEthFrameProcessor::m_frame_counter { 0 }
private

Definition at line 179 of file WIBEthFrameProcessor.hpp.

179{ 0 };

◆ m_metric_collect_opmon_period

uint32_t dunedaq::fdreadoutlibs::WIBEthFrameProcessor::m_metric_collect_opmon_period { 128 }
private

Definition at line 141 of file WIBEthFrameProcessor.hpp.

141{ 128 };

◆ m_new_hits

std::atomic<uint64_t> dunedaq::fdreadoutlibs::WIBEthFrameProcessor::m_new_hits { 0 }
private

Definition at line 175 of file WIBEthFrameProcessor.hpp.

175{ 0 }; // NOLINT(build/unsigned)

◆ m_new_tps

std::atomic<uint64_t> dunedaq::fdreadoutlibs::WIBEthFrameProcessor::m_new_tps { 0 }
private

Definition at line 176 of file WIBEthFrameProcessor.hpp.

176{ 0 }; // NOLINT(build/unsigned)

◆ m_num_msg

size_t dunedaq::fdreadoutlibs::WIBEthFrameProcessor::m_num_msg = 0
private

Definition at line 150 of file WIBEthFrameProcessor.hpp.

◆ m_num_push_fail

size_t dunedaq::fdreadoutlibs::WIBEthFrameProcessor::m_num_push_fail = 0
private

Definition at line 151 of file WIBEthFrameProcessor.hpp.

◆ m_pattern_generator_current_ts

dunedaq::daqdataformats::timestamp_t dunedaq::fdreadoutlibs::WIBEthFrameProcessor::m_pattern_generator_current_ts = 0
protected

Definition at line 99 of file WIBEthFrameProcessor.hpp.

◆ m_pattern_generator_previous_ts

dunedaq::daqdataformats::timestamp_t dunedaq::fdreadoutlibs::WIBEthFrameProcessor::m_pattern_generator_previous_ts = 0
protected

Definition at line 98 of file WIBEthFrameProcessor.hpp.

◆ m_previous_seq_id

uint16_t dunedaq::fdreadoutlibs::WIBEthFrameProcessor::m_previous_seq_id = 0
protected

Definition at line 95 of file WIBEthFrameProcessor.hpp.

◆ m_previous_ts

dunedaq::daqdataformats::timestamp_t dunedaq::fdreadoutlibs::WIBEthFrameProcessor::m_previous_ts = 0
protected

Definition at line 92 of file WIBEthFrameProcessor.hpp.

◆ m_seq_id_error_ctr

std::atomic<uint64_t> dunedaq::fdreadoutlibs::WIBEthFrameProcessor::m_seq_id_error_ctr { 0 }
protected

Definition at line 109 of file WIBEthFrameProcessor.hpp.

109{ 0 };

◆ m_seq_id_error_state

bool dunedaq::fdreadoutlibs::WIBEthFrameProcessor::m_seq_id_error_state = false
protected

Definition at line 108 of file WIBEthFrameProcessor.hpp.

◆ m_seq_id_max_jump

std::atomic<int16_t> dunedaq::fdreadoutlibs::WIBEthFrameProcessor::m_seq_id_max_jump { 0 }
protected

Definition at line 111 of file WIBEthFrameProcessor.hpp.

111{ 0 };

◆ m_seq_id_min_jump

std::atomic<int16_t> dunedaq::fdreadoutlibs::WIBEthFrameProcessor::m_seq_id_min_jump { 0 }
protected

Definition at line 110 of file WIBEthFrameProcessor.hpp.

110{ 0 };

◆ m_seq_id_problem_reported

bool dunedaq::fdreadoutlibs::WIBEthFrameProcessor::m_seq_id_problem_reported = false
protected

Definition at line 107 of file WIBEthFrameProcessor.hpp.

◆ m_slot_id

uint32_t dunedaq::fdreadoutlibs::WIBEthFrameProcessor::m_slot_id
private

Definition at line 157 of file WIBEthFrameProcessor.hpp.

◆ m_sourceid

daqdataformats::SourceID dunedaq::fdreadoutlibs::WIBEthFrameProcessor::m_sourceid
private

Definition at line 173 of file WIBEthFrameProcessor.hpp.

◆ m_stream_id

uint32_t dunedaq::fdreadoutlibs::WIBEthFrameProcessor::m_stream_id
private

Definition at line 158 of file WIBEthFrameProcessor.hpp.

◆ m_t0

std::chrono::time_point<std::chrono::high_resolution_clock> dunedaq::fdreadoutlibs::WIBEthFrameProcessor::m_t0
private

Definition at line 181 of file WIBEthFrameProcessor.hpp.

◆ m_tp_channel_rate_map

std::map<uint, std::atomic<int> > dunedaq::fdreadoutlibs::WIBEthFrameProcessor::m_tp_channel_rate_map
private

Definition at line 148 of file WIBEthFrameProcessor.hpp.

◆ m_tp_generator

std::unique_ptr<tpglibs::TPGenerator> dunedaq::fdreadoutlibs::WIBEthFrameProcessor::m_tp_generator
private

Definition at line 142 of file WIBEthFrameProcessor.hpp.

◆ m_tp_max_width

uint32_t dunedaq::fdreadoutlibs::WIBEthFrameProcessor::m_tp_max_width
private

Definition at line 144 of file WIBEthFrameProcessor.hpp.

◆ m_tp_sink

std::shared_ptr<iomanager::SenderConcept<std::vector<trigger::TriggerPrimitiveTypeAdapter> > > dunedaq::fdreadoutlibs::WIBEthFrameProcessor::m_tp_sink[3]
private

Definition at line 168 of file WIBEthFrameProcessor.hpp.

◆ m_tpa_vectors

std::vector<trigger::TriggerPrimitiveTypeAdapter> dunedaq::fdreadoutlibs::WIBEthFrameProcessor::m_tpa_vectors[3]
private

Definition at line 166 of file WIBEthFrameProcessor.hpp.

◆ m_tpg_configs

std::vector<std::pair<std::string, nlohmann::json> > dunedaq::fdreadoutlibs::WIBEthFrameProcessor::m_tpg_configs
private

Definition at line 143 of file WIBEthFrameProcessor.hpp.

◆ m_tpg_hits_count

std::atomic<int> dunedaq::fdreadoutlibs::WIBEthFrameProcessor::m_tpg_hits_count { 0 }
private

Definition at line 153 of file WIBEthFrameProcessor.hpp.

153{ 0 };

◆ m_tpg_metric_collect_enabled

bool dunedaq::fdreadoutlibs::WIBEthFrameProcessor::m_tpg_metric_collect_enabled {false}
private

Definition at line 140 of file WIBEthFrameProcessor.hpp.

140{false};

◆ m_tpg_threshold_selected

uint16_t dunedaq::fdreadoutlibs::WIBEthFrameProcessor::m_tpg_threshold_selected
private

Definition at line 146 of file WIBEthFrameProcessor.hpp.

◆ m_tps_send_failed

std::atomic<uint64_t> dunedaq::fdreadoutlibs::WIBEthFrameProcessor::m_tps_send_failed { 0 }
private

Definition at line 178 of file WIBEthFrameProcessor.hpp.

178{ 0 };

◆ m_tps_suppressed_too_long

std::atomic<uint64_t> dunedaq::fdreadoutlibs::WIBEthFrameProcessor::m_tps_suppressed_too_long { 0 }
private

Definition at line 177 of file WIBEthFrameProcessor.hpp.

177{ 0 };

◆ m_ts_error_ctr

std::atomic<uint64_t> dunedaq::fdreadoutlibs::WIBEthFrameProcessor::m_ts_error_ctr { 0 }
protected

Definition at line 104 of file WIBEthFrameProcessor.hpp.

104{ 0 };

◆ m_ts_error_state

bool dunedaq::fdreadoutlibs::WIBEthFrameProcessor::m_ts_error_state = false
protected

Definition at line 103 of file WIBEthFrameProcessor.hpp.

◆ m_ts_problem_reported

bool dunedaq::fdreadoutlibs::WIBEthFrameProcessor::m_ts_problem_reported = false
protected

Definition at line 102 of file WIBEthFrameProcessor.hpp.


The documentation for this class was generated from the following files: