30namespace fdreadoutlibs {
32WIBEthFrameProcessor::WIBEthFrameProcessor(std::unique_ptr<datahandlinglibs::FrameErrorRegistry>& error_registry,
bool processing_enabled)
33 : TaskRawDataProcessorModel<types::DUNEWIBEthTypeAdapter>(error_registry, processing_enabled)
61 m_t0 = std::chrono::high_resolution_clock::now();
86 for (
auto output :
conf->get_outputs()) {
88 if (output->get_data_type() ==
"TriggerPrimitiveVector") {
92 ers::error(datahandlinglibs::ResourceQueueError(
ERS_HERE,
"tp",
"DefaultRequestHandlerModel", excpt));
98 auto geo_id =
conf->get_geo_id();
99 if (geo_id !=
nullptr) {
100 m_det_id = geo_id->get_detector_id();
114 auto dp =
conf->get_module_configuration()->get_data_processor();
121 auto conf_sot_minima = proc_conf->get_sot_minima();
122 std::vector<uint16_t> sot_minima{conf_sot_minima->get_sot_minimum_plane0(),
123 conf_sot_minima->get_sot_minimum_plane1(),
124 conf_sot_minima->get_sot_minimum_plane2()};
127 const std::vector<unsigned int> channel_mask_vec = proc_conf->get_channel_mask();
129 std::vector<const appmodel::ProcessingStep*> processing_steps = proc_conf->get_processing_steps();
130 for (
auto step : processing_steps) {
131 m_tpg_configs.push_back(std::make_pair(step->class_name(), step->to_json(
false).back()));
135 m_channel_map = dunedaq::detchannelmaps::make_tpc_map(proc_conf->get_channel_map());
136 for (
int chan = 0; chan < 64; chan++) {
138 int16_t plane =
m_channel_map->get_plane_from_offline_channel(off_channel);
143 if (std::find(channel_mask_vec.begin(), channel_mask_vec.end(), off_channel) != channel_mask_vec.end())
179 auto now = std::chrono::high_resolution_clock::now();
184 double seconds = std::chrono::duration_cast<std::chrono::microseconds>(
now -
m_t0).count() / 1000000.;
185 TLOG_DEBUG(TLVL_BOOKKEEPING) <<
"Hit rate: " << std::to_string(new_hits / seconds / 1000.) <<
" [kHz]";
186 TLOG_DEBUG(TLVL_BOOKKEEPING) <<
"Total new hits: " << new_hits <<
" new TPs: " << new_tps;
200 sort(channel_tp_rate_vec.begin(), channel_tp_rate_vec.end(), [](std::pair<uint, int>& a, std::pair<uint, int>& b) { return a.second > b.second; });
203 if (channel_tp_rate_vec.size() != 0) {
204 int top_highest_values = 10;
205 if (channel_tp_rate_vec.size() < 10) {
206 top_highest_values = channel_tp_rate_vec.size();
209 for (
int i = 0; i < top_highest_values; i++) {
213 publish(std::move(tpc_info), {{
"channel", std::to_string(channel_tp_rate_vec[i].first)}});
235 for (
const auto& [channel, vec] : metrics) {
237 for (
const auto& [name, val] : vec) {
238 if (name ==
"m_pedestal") {
240 }
else if (name ==
"m_accum") {
244 publish(std::move(tpg_proc_info), {{
"channel", std::to_string(channel)}});
248std::map<int16_t, std::map<std::string, std::tuple<float, int16_t, int16_t, float, dunedaq::trgdataformats::channel_t, dunedaq::trgdataformats::channel_t>>>
251 std::map<int16_t, std::map<std::string, std::tuple<float, int16_t, int16_t, float, dunedaq::trgdataformats::channel_t, dunedaq::trgdataformats::channel_t>>> all_stats;
254 std::map<int16_t, std::map<std::string, std::tuple<size_t, double, double, int16_t, int16_t, dunedaq::trgdataformats::channel_t, dunedaq::trgdataformats::channel_t>>> accumulators;
257 for (
const auto& [channel, vec] : metrics) {
260 int16_t plane =
m_channel_map->get_plane_from_offline_channel(channel);
262 for (
const auto& [name, val] : vec) {
263 auto& [count, mean, M2, min, max, min_channel_id, max_channel_id] = accumulators[plane][name];
267 if (count == 1 || val < min) {
269 min_channel_id = channel;
271 if (count == 1 || val > max) {
273 max_channel_id = channel;
283 double delta = val - mean;
284 mean += delta / count;
285 double delta2 = val - mean;
286 M2 += delta * delta2;
292 for (
const auto& [plane, metric_map] : accumulators) {
293 for (
const auto& [metric_name, acc_data] : metric_map) {
294 const auto& [count, mean, M2, min, max, min_channel_id, max_channel_id] = acc_data;
296 if (count == 0)
continue;
302 stddev = std::sqrt(M2 / (count - 1));
305 all_stats[plane][metric_name] = std::make_tuple(
static_cast<float>(mean), min, max, stddev, min_channel_id, max_channel_id);
320 for (
const auto& [plane, metric_map] : all_stats) {
321 for (
const auto& [metric_name, stats] : metric_map) {
322 const auto& [mean, min, max, stddev, min_channel_id, max_channel_id] = stats;
325 info.set_average(mean);
328 info.set_standard_dev(stddev);
329 info.set_max_channel_id(max_channel_id);
330 info.set_min_channel_id(min_channel_id);
331 publish(std::move(info), {{
"plane", std::to_string(plane)}, {
"metric", metric_name}});
367 if ( delta_seq_id > 0x800) {
368 delta_seq_id -= 0x1000;
369 }
else if ( delta_seq_id < -0x7ff) {
370 delta_seq_id += 0x1000;
373 if (delta_seq_id == 0) {
394 TLOG() <<
"*** Data Integrity ERROR *** Sequence ID continuity is completely broken! "
395 <<
"Something is wrong with the FE source or with the configuration!";
412 uint16_t wibeth_frame_tick_difference = wibeth_tick_difference * fp->
get_num_frames();
452 TLOG() <<
"*** Data Integrity ERROR *** Timestamp continuity is completely broken! "
453 <<
"Something is wrong with the FE source or with the configuration!";
482 std::vector<trgdataformats::TriggerPrimitive> tps = (*m_tp_generator)(wfptr);
488 for (
const auto& tp : tps) {
502 for (
int i = 0; i < 3; i++) {
507 const auto s_ts_begin =
m_tpa_vectors[i].front().tp.time_start;
508 const auto channel_begin =
m_tpa_vectors[i].front().tp.channel;
#define DUNE_DAQ_TYPESTRING(Type, typestring)
Declare the datatype_to_string method for the given type.
void start(const nlohmann::json &) override
void add_preprocess_task(Task &&task)
void add_postprocess_task(Task &&task)
void conf(const appmodel::DataHandlerModule *conf) override
bool m_post_processing_enabled
void stop(const nlohmann::json &) override
virtual void generate_opmon_data() override
std::atomic< uint64_t > m_last_processed_daq_ts
std::unique_ptr< FrameErrorRegistry > & m_error_registry
void set_num_tps_send_failed(::uint64_t value)
void set_num_tps_suppressed_too_long(::uint64_t value)
void set_num_tps_sent(::uint64_t value)
void set_rate_tp_hits(float value)
void set_channel_id(::uint64_t value)
void set_number_of_tps(::uint64_t value)
void set_accum(::int64_t value)
void set_pedestal(::int64_t value)
void publish_processor_metric_to_opmon_with_aggregation()
std::atomic< int16_t > m_seq_id_min_jump
std::atomic< uint64_t > m_tps_suppressed_too_long
bool m_ts_problem_reported
bool m_seq_id_error_state
void conf(const appmodel::DataHandlerModule *conf) override
Set the emulator mode, if active, timestamps of processed packets are overwritten with new ones.
void timestamp_check(frameptr fp)
std::unique_ptr< tpglibs::TPGenerator > m_tp_generator
void find_hits(constframeptr fp)
uint16_t m_current_seq_id
std::atomic< uint64_t > m_new_tps
std::chrono::time_point< std::chrono::high_resolution_clock > m_t0
std::shared_ptr< iomanager::SenderConcept< std::vector< trigger::TriggerPrimitiveTypeAdapter > > > m_tp_sink[3]
std::atomic< int16_t > m_seq_id_max_jump
std::vector< std::pair< trgdataformats::channel_t, int16_t > > m_channel_plane_numbers
daqdataformats::SourceID m_sourceid
void publish_processor_metric_to_opmon()
std::vector< trigger::TriggerPrimitiveTypeAdapter > m_tpa_vectors[3]
virtual void generate_opmon_data() override
bool m_tpg_metric_collect_enabled
std::map< uint, std::atomic< int > > m_tp_channel_rate_map
std::vector< std::pair< std::string, nlohmann::json > > m_tpg_configs
dunedaq::daqdataformats::timestamp_t m_current_ts
std::atomic< uint64_t > m_tps_send_failed
dunedaq::daqdataformats::timestamp_t m_previous_ts
bool m_seq_id_problem_reported
void stop(const nlohmann::json &args) override
Stop operation.
uint16_t m_previous_seq_id
std::atomic< int > m_tpg_hits_count
std::set< unsigned int > m_channel_mask_set
bool m_first_ts_missmatch
uint32_t m_metric_collect_opmon_period
std::map< int16_t, std::map< std::string, std::tuple< float, int16_t, int16_t, float, dunedaq::trgdataformats::channel_t, dunedaq::trgdataformats::channel_t > > > calculate_all_metric_summaries_across_planes(const std::unordered_map< dunedaq::trgdataformats::channel_t, std::vector< std::pair< std::string, int16_t > > > &metrics)
std::atomic< uint64_t > m_frame_counter
bool m_first_seq_id_mismatch
std::atomic< uint64_t > m_ts_error_ctr
void sequence_check(frameptr fp)
std::atomic< uint64_t > m_new_hits
std::shared_ptr< detchannelmaps::TPCChannelMap > m_channel_map
void start(const nlohmann::json &args) override
Start operation.
std::atomic< uint64_t > m_seq_id_error_ctr
static constexpr timeout_t s_no_block
void publish(google::protobuf::Message &&, CustomOrigin &&co={}, OpMonLevel l=to_level(EntryOpMonLevel::kDefault)) const noexcept
Base class for any user define issue.
#define TLOG_DEBUG(lvl,...)
static std::shared_ptr< iomanager::SenderConcept< Datatype > > get_iom_sender(iomanager::ConnectionId const &id)
void warning(const Issue &issue)
void error(const Issue &issue)
static const constexpr uint64_t samples_tick_difference
static const constexpr daqdataformats::SourceID::Subsystem subsystem
static const constexpr uint64_t expected_tick_difference
trgdataformats::TriggerPrimitive tp