5namespace fdreadoutlibs {
7using datahandlinglibs::logging::TLVL_BOOKKEEPING;
8using datahandlinglibs::logging::TLVL_TAKE_NOTE;
10template <
class ReadoutTypeAdapter>
16template <
class ReadoutTypeAdapter>
41 m_t0 = std::chrono::high_resolution_clock::now();
52template <
class ReadoutTypeAdapter>
67template <
class ReadoutTypeAdapter>
72 m_sourceid.subsystem = ReadoutTypeAdapter::subsystem;
73 auto geo_id =
conf->get_geo_id();
74 if (geo_id !=
nullptr) {
82template <
class ReadoutTypeAdapter>
94template <
class ReadoutTypeAdapter>
98 const std::shared_ptr<detchannelmaps::TPCChannelMap> channel_map = dunedaq::detchannelmaps::make_tpc_map(proc_conf->
get_channel_map());
99 const std::vector<unsigned int> channel_mask_vec = proc_conf->
get_channel_mask();
101 for (
int chan = 0; chan < 64; chan++) {
103 int16_t plane = channel_map->get_plane_from_offline_channel(off_channel);
109 if (std::find(channel_mask_vec.begin(), channel_mask_vec.end(), off_channel) != channel_mask_vec.end()) {
112 m_channel_plane_map[off_channel] = plane;
113 m_plane_numbers_set.insert(plane);
118template <
class ReadoutTypeAdapter>
124 int plane_number = 0;
125 for (
auto output :
conf->get_outputs()) {
127 if (output->get_data_type() ==
"TriggerPrimitiveVector") {
134 ers::error(datahandlinglibs::ResourceQueueError(
ERS_HERE,
"tp",
"DefaultRequestHandlerModel", excpt));
139 if (m_plane_numbers_set.size() > m_plane_to_tp_sink_map.size()) {
140 ers::error(DetectorPlaneToTPSinkMismatch(
ERS_HERE, m_plane_numbers_set.size(), m_plane_to_tp_sink_map.size()));
143 m_tp_generator = std::make_unique<tpglibs::TPGenerator>();
147 std::vector<uint16_t> sot_minima{conf_sot_minima->get_sot_minimum_plane0(),
148 conf_sot_minima->get_sot_minimum_plane1(),
149 conf_sot_minima->get_sot_minimum_plane2()};
150 m_tp_generator->set_sot_minima(sot_minima);
152 std::vector<const appmodel::ProcessingStep*> processing_steps = proc_conf->
get_processing_steps();
153 for (
auto step : processing_steps) {
154 m_tpg_configs.push_back(std::make_pair(step->class_name(), step->to_json(
false).back()));
158 m_tp_generator->configure(m_tpg_configs, m_channel_plane_numbers, ReadoutTypeAdapter::samples_tick_difference);
163 m_frame_limit_enabled = m_frame_count_limit > 0;
164 m_tp_limit_enabled = m_tp_count_limit > 0;
166 if (!m_frame_limit_enabled && !m_tp_limit_enabled) {
173 m_tpg_metric_collect_enabled =
false;
174 for (
const auto& name_config : m_tpg_configs) {
175 if (name_config.second.contains(
"metric_collect_toggle_state") &&
176 name_config.second[
"metric_collect_toggle_state"] ==
true) {
177 m_tpg_metric_collect_enabled =
true;
183 if (m_tpg_metric_collect_enabled) {
184 auto processsor_references = m_tp_generator->get_all_processor_references_with_pipeline_index();
186 m_state_harvester = std::make_unique<fdreadoutlibs::TPGInternalStateHarvester>();
188 const uint8_t channels_per_pipeline = 16;
189 const uint8_t pipelines =
static_cast<uint8_t
>(m_channel_plane_numbers.size() / channels_per_pipeline);
192 <<
" channels per pipeline, " <<
static_cast<int>(pipelines) <<
" pipelines, "
193 << processsor_references.size() <<
" processor references";
195 m_state_harvester->update_channel_plane_numbers(m_channel_plane_numbers,
196 channels_per_pipeline, pipelines);
197 m_state_harvester->set_processor_references(processsor_references);
200 m_state_harvester->start_collection_thread();
208template <
class ReadoutTypeAdapter>
218 if (proc_conf ==
nullptr) {
227template <
class ReadoutTypeAdapter>
242template <
class ReadoutTypeAdapter>
254template <
class ReadoutTypeAdapter>
287template <
class ReadoutTypeAdapter>
317 m_t0 = std::chrono::high_resolution_clock::now();
320template <
class ReadoutTypeAdapter>
335template <
class ReadoutTypeAdapter>
347 this->
publish(std::move(info));
352 auto now = std::chrono::high_resolution_clock::now();
356 double seconds = std::chrono::duration_cast<std::chrono::microseconds>(
now -
m_t0).count() / 1000000.;
367 this->
publish(std::move(tp_info));
372 sort(channel_tp_rate_vec.begin(), channel_tp_rate_vec.end(), [](std::pair<uint, int>& a, std::pair<uint, int>& b) { return a.second > b.second; });
375 if (channel_tp_rate_vec.size() != 0) {
376 int top_highest_values = 10;
377 if (channel_tp_rate_vec.size() < 10) {
378 top_highest_values = channel_tp_rate_vec.size();
381 for (
int i = 0; i < top_highest_values; i++) {
385 this->
publish(std::move(tpc_info), {{
"channel", std::to_string(channel_tp_rate_vec[i].first)}});
404template <
class ReadoutTypeAdapter>
416 int metrics_published = 0;
419 for (
const auto& [channel, vec] : metrics) {
421 bool has_valid_metrics =
false;
423 for (
const auto& [name, val] : vec) {
424 if (name ==
"pedestal") {
426 has_valid_metrics =
true;
427 }
else if (name ==
"accum") {
429 has_valid_metrics =
true;
433 if (has_valid_metrics) {
434 this->
publish(std::move(tpg_proc_info), {{
"channel", std::to_string(channel)}});
442template <
class ReadoutTypeAdapter>
443std::map<int16_t, std::map<std::string, std::tuple<float, int16_t, int16_t, float, dunedaq::trgdataformats::channel_t, dunedaq::trgdataformats::channel_t>>>
446 std::map<int16_t, std::map<std::string, std::tuple<float, int16_t, int16_t, float, dunedaq::trgdataformats::channel_t, dunedaq::trgdataformats::channel_t>>> all_stats;
449 std::map<int16_t, std::map<std::string, std::tuple<size_t, double, double, int16_t, int16_t, dunedaq::trgdataformats::channel_t, dunedaq::trgdataformats::channel_t>>> accumulators;
452 for (
const auto& [channel, vec] : metrics) {
457 for (
const auto& [name, val] : vec) {
458 auto& [count, mean, M2, min, max, min_channel_id, max_channel_id] = accumulators[plane][name];
462 if (count == 1 || val < min) {
464 min_channel_id = channel;
466 if (count == 1 || val > max) {
468 max_channel_id = channel;
478 double delta = val - mean;
479 mean += delta / count;
480 double delta2 = val - mean;
481 M2 += delta * delta2;
487 for (
const auto& [plane, metric_map] : accumulators) {
488 for (
const auto& [metric_name, acc_data] : metric_map) {
489 const auto& [count, mean, M2, min, max, min_channel_id, max_channel_id] = acc_data;
491 if (count == 0)
continue;
497 stddev = std::sqrt(M2 / (count - 1));
500 all_stats[plane][metric_name] = std::make_tuple(
static_cast<float>(mean), min, max, stddev, min_channel_id, max_channel_id);
507template <
class ReadoutTypeAdapter>
523 for (
const auto& [plane, metric_map] : all_stats) {
524 for (
const auto& [metric_name, stats] : metric_map) {
525 const auto& [mean, min, max, stddev, min_channel_id, max_channel_id] = stats;
528 info.set_average(mean);
531 info.set_standard_dev(stddev);
532 info.set_max_channel_id(max_channel_id);
533 info.set_min_channel_id(min_channel_id);
534 this->
publish(std::move(info), {{
"plane", std::to_string(plane)}, {
"metric", metric_name}});
543template <
class ReadoutTypeAdapter>
564 if ( delta_seq_id > 0x800) {
565 delta_seq_id -= 0x1000;
566 }
else if ( delta_seq_id < -0x7ff) {
567 delta_seq_id += 0x1000;
570 if (delta_seq_id == 0) {
591 TLOG() <<
"*** Data Integrity ERROR *** Sequence ID continuity is completely broken! "
592 <<
"Something is wrong with the FE source or with the configuration!";
604template <
class ReadoutTypeAdapter>
609 uint16_t tpceth_tick_difference = ReadoutTypeAdapter::expected_tick_difference;
610 uint16_t tpceth_frame_tick_difference = tpceth_tick_difference * fp->get_num_frames();
634 TLOG() <<
"*** Data Integrity ERROR *** Timestamp continuity is completely broken! "
635 <<
"Something is wrong with the FE source or with the configuration!";
647template <
class ReadoutTypeAdapter>
653 auto wfptr =
reinterpret_cast<tpcframeptr>((uint8_t*)fp);
655 std::vector<trgdataformats::TriggerPrimitive> tps = (*m_tp_generator)(wfptr);
657 uint64_t current_frame_count =
m_frame_counter.fetch_add(1, std::memory_order_relaxed) + 1;
665 for (
const auto& tp : tps) {
682 if (frame_limit_reached || tp_limit_reached) [[unlikely]] {
686 int num_new_tps = tpa_vector.size();
687 if (num_new_tps == 0) {
690 const auto ts_begin = tpa_vector.front().tp.time_start;
691 const auto channel_begin = tpa_vector.front().tp.channel;
692 const auto ts_end = tpa_vector.back().tp.time_start;
693 const auto channel_end = tpa_vector.back().tp.channel;
#define DUNE_DAQ_TYPESTRING(Type, typestring)
Declare the datatype_to_string method for the given type.
const std::vector< uint32_t > & get_channel_mask() const
Get "channel_mask" attribute value. List of channels to be masked from TP generation.
const std::string & get_channel_map() const
Get "channel_map" attribute value.
uint32_t get_frame_count_limit() const
Get "frame_count_limit" attribute value. When this number of frames is reached the TPs are sent to th...
const dunedaq::appmodel::SamplesOverThresholdMinima * get_sot_minima() const
Get "sot_minima" relationship value. TP samples over threshold minimum requirement by plane.
const std::vector< const dunedaq::appmodel::ProcessingStep * > & get_processing_steps() const
Get "processing_steps" relationship value.
uint32_t get_metric_collect_opmon_period() const
Get "metric_collect_opmon_period" attribute value. The rate at which processor metric is polled from ...
uint32_t get_tp_count_limit() const
Get "tp_count_limit" attribute value. When this number of TPs is reached, the TPs are sent to the sin...
const TARGET * cast() const noexcept
Casts object to different class.
void start(const appfwk::DAQModule::CommandData_t &) override
void add_preprocess_task(Task &&task)
void conf(const appmodel::DataHandlerModule *conf) override
bool m_post_processing_enabled
void stop(const appfwk::DAQModule::CommandData_t &) override
virtual void generate_opmon_data() override
TaskRawDataProcessorModel(std::unique_ptr< FrameErrorRegistry > &error_registry, bool post_processing_enabled)
std::atomic< uint64_t > m_last_processed_daq_ts
void scrap(const appfwk::DAQModule::CommandData_t &) override
std::unique_ptr< FrameErrorRegistry > & m_error_registry
void set_num_tps_send_failed(::uint64_t value)
void set_num_tps_suppressed_too_long(::uint64_t value)
void set_num_tps_sent(::uint64_t value)
void set_rate_tp_hits(float value)
void set_channel_id(::uint64_t value)
void set_number_of_tps(::uint64_t value)
void set_accum(::int64_t value)
void set_pedestal(::int64_t value)
daqdataformats::SourceID m_sourceid
const ReadoutTypeAdapter * constframeptr
std::set< unsigned int > m_plane_numbers_set
uint16_t m_current_seq_id
void scrap_postprocessing()
uint32_t m_current_tp_count
void configure_postprocessing(const appmodel::DataHandlerModule *conf)
void scrap_preprocessing()
ReadoutTypeAdapter::FrameType * tpcframeptr
bool m_frame_limit_enabled
uint32_t m_metric_collect_opmon_period
std::unique_ptr< fdreadoutlibs::TPGInternalStateHarvester > m_state_harvester
void configure_channel_plane_numbers(const appmodel::TPCRawDataProcessor *proc_conf)
void stop(const appfwk::DAQModule::CommandData_t &args) override
Stop operation.
std::atomic< uint64_t > m_tps_suppressed_too_long
void publish_processor_metric_to_opmon()
std::vector< std::pair< std::string, nlohmann::json > > m_tpg_configs
bool m_tpg_metric_collect_enabled
std::unordered_map< unsigned int, std::vector< trigger::TriggerPrimitiveTypeAdapter > > m_plane_to_tpa_vector_map
void configure_find_tps(const appmodel::DataHandlerModule *conf, const appmodel::TPCRawDataProcessor *proc_conf)
uint32_t m_frame_count_limit
void scrap(const appfwk::DAQModule::CommandData_t &cfg) override
Unconfigure.
std::unique_ptr< tpglibs::TPGenerator > m_tp_generator
bool m_first_ts_missmatch
dunedaq::daqdataformats::timestamp_t m_previous_ts
std::chrono::time_point< std::chrono::high_resolution_clock > m_t0
bool m_ts_problem_reported
std::atomic< uint64_t > m_num_new_tps
std::vector< std::pair< trgdataformats::channel_t, int16_t > > m_channel_plane_numbers
std::unordered_map< unsigned int, std::shared_ptr< iomanager::SenderConcept< std::vector< trigger::TriggerPrimitiveTypeAdapter > > > > m_plane_to_tp_sink_map
bool m_first_seq_id_mismatch
void scrap_source_and_geo_ids()
std::map< uint, std::atomic< int > > m_tp_channel_rate_map
std::atomic< uint64_t > m_frame_counter
uint32_t m_tp_count_limit
void publish_processor_metric_to_opmon_with_aggregation()
void configure_source_and_geo_ids(const appmodel::DataHandlerModule *conf)
bool m_seq_id_problem_reported
void sequence_check(frameptr fp)
std::atomic< uint64_t > m_tps_send_failed
std::map< int16_t, std::map< std::string, std::tuple< float, int16_t, int16_t, float, dunedaq::trgdataformats::channel_t, dunedaq::trgdataformats::channel_t > > > calculate_all_metric_summaries_across_planes(const std::unordered_map< dunedaq::trgdataformats::channel_t, std::vector< std::pair< std::string, int16_t > > > &metrics)
ReadoutTypeAdapter * frameptr
std::unordered_map< trgdataformats::channel_t, unsigned int > m_channel_plane_map
void generate_opmon_data() override
std::atomic< uint64_t > m_seq_id_error_ctr
void start(const appfwk::DAQModule::CommandData_t &args) override
Start operation.
std::atomic< uint64_t > m_ts_error_ctr
void configure_preprocessing(const appmodel::DataHandlerModule *conf)
bool m_seq_id_error_state
void conf(const appmodel::DataHandlerModule *conf) override
Set the emulator mode, if active, timestamps of processed packets are overwritten with new ones.
std::atomic< int16_t > m_seq_id_max_jump
TPCEthFrameProcessor(std::unique_ptr< datahandlinglibs::FrameErrorRegistry > &error_registry, bool processing_enabled)
dunedaq::daqdataformats::timestamp_t m_pattern_generator_current_ts
void timestamp_check(frameptr fp)
uint32_t m_frame_count_at_last_send
dunedaq::daqdataformats::timestamp_t m_pattern_generator_previous_ts
std::set< unsigned int > m_channel_mask_set
std::atomic< int16_t > m_seq_id_min_jump
dunedaq::daqdataformats::timestamp_t m_current_ts
void find_tps(constframeptr fp)
uint16_t m_previous_seq_id
static constexpr timeout_t s_no_block
void publish(google::protobuf::Message &&, CustomOrigin &&co={}, OpMonLevel l=to_level(EntryOpMonLevel::kDefault)) const noexcept
Base class for any user define issue.
#define TLOG_DEBUG(lvl,...)
static std::shared_ptr< iomanager::SenderConcept< Datatype > > get_iom_sender(iomanager::ConnectionId const &id)
Both frame_count_limit and tp_count_limit were set FailedToSendTPVector
FrameAndTPCountersDisabled
void warning(const Issue &issue)
void error(const Issue &issue)
trgdataformats::TriggerPrimitive tp