28namespace fdreadoutlibs {
33 TLOG() <<
"Looking for TP sink...";
35 for (
auto output :
conf->get_outputs()) {
36 TLOG() <<
"On outputs... (" << output->UID() <<
"," << output->get_data_type() <<
")";
38 if (output->get_data_type() ==
"TriggerPrimitiveVector") {
39 TLOG() <<
"Found TP sink.";
41 TLOG() <<
" SINK INITIALIZED for TriggerPrimitives with UID : " << output->UID();
44 ers::error(datahandlinglibs::ResourceQueueError(
ERS_HERE,
"tp",
"DefaultRequestHandlerModel", excpt));
48 TLOG() <<
"Registering processing tasks...";
51 auto dp =
conf->get_module_configuration()->get_data_processor();
53 TLOG()<<
" PDS Data processor does not exist.";
56 if (proc_conf ==
nullptr) {
57 TLOG()<<
"PDS RawDataProcessor does not exist.";
61 auto geo_id =
conf->get_geo_id();
62 if (geo_id !=
nullptr) {
63 m_det_id = geo_id->get_detector_id();
69 m_channel_map = dunedaq::detchannelmaps::make_pds_map(proc_conf->get_channel_map());
70 const std::vector<unsigned int> channel_mask_vec = proc_conf->get_channel_mask();
72 for (
int chan = 0; chan < 48; chan++) {
74 if (std::find(channel_mask_vec.begin(), channel_mask_vec.end(), off_channel) != channel_mask_vec.end())
85 TLOG() <<
"Calling parent conf.";
98 m_t0 = std::chrono::high_resolution_clock::now();
135 if(df_ptr[i].get_timestamp() > 0xFFFFFFFFFFFF0000 || df_ptr[i].get_timestamp() < 0xFFFF){
136 ers::warning(PDSUnphysicalFrameTimestamp(
ERS_HERE, df_ptr[i].get_timestamp(), df_ptr[i].get_channel(), i));
138 df_ptr[i].daq_header.timestamp_1 = df_ptr[i].daq_header.timestamp_2 = 0;
144 uint64_t k_clock_frequency = 62500000;
145 TLOG_DEBUG(
TLVL_FRAME_RECEIVED) <<
"Received DAPHNE frame timestamp value of " <<
m_current_ts <<
" ticks (..." << std::fixed << std::setprecision(8) << (static_cast<double>(
m_current_ts % (k_clock_frequency*1000)) /
static_cast<double>(k_clock_frequency)) <<
" sec)";
150 std::cout <<
"*** Data Integrity ERROR *** Timestamp continuity is completely broken! "
151 <<
"Something is wrong with the FE source or with the configuration!\n";
173 if (!fp || fp==
nullptr){
178 auto nonconstframeptr =
const_cast<frameptr>(fp);
180 std::vector<trigger::TriggerPrimitiveTypeAdapter> ttpp;
186 if(df_ptr[i].peaks_data.is_found(j))
188 int ch =
m_channel_map->get_offline_channel_from_det_crate_slot_stream_chan(df_ptr[i].daq_header.det_id, df_ptr[i].daq_header.crate_id, df_ptr[i].daq_header.slot_id, df_ptr[i].daq_header.link_id, df_ptr[i].get_channel());
201 tpa.
tp.
detid = df_ptr->daq_header.det_id;
207 int num_new_tps = ttpp.size();
208 if (num_new_tps > 0) {
210 const auto s_ts_begin = ttpp.front().tp.time_start;
211 const auto channel_begin = ttpp.front().tp.channel;
212 const auto s_ts_end = ttpp.back().tp.time_start;
249 auto now = std::chrono::high_resolution_clock::now();
251 int new_tps_suppressed_too_long = 0;
253 double seconds = std::chrono::duration_cast<std::chrono::microseconds>(
now -
m_t0).count() / 1000000.;
#define DUNE_DAQ_TYPESTRING(Type, typestring)
Declare the datatype_to_string method for the given type.
void start(const appfwk::DAQModule::CommandData_t &) override
void add_preprocess_task(Task &&task)
void conf(const appmodel::DataHandlerModule *conf) override
bool m_post_processing_enabled
void stop(const appfwk::DAQModule::CommandData_t &) override
virtual void generate_opmon_data() override
std::atomic< uint64_t > m_last_processed_daq_ts
void set_num_tps_send_failed(::uint64_t value)
void set_num_tps_suppressed_too_long(::uint64_t value)
void set_num_tps_sent(::uint64_t value)
void set_rate_tp_hits(float value)
void timestamp_check(frameptr)
std::shared_ptr< detchannelmaps::PDSChannelMap > m_channel_map
void conf(const appmodel::DataHandlerModule *conf) override
Set the emulator mode, if active, timestamps of processed packets are overwritten with new ones.
void stop(const appfwk::DAQModule::CommandData_t &args) override
Stop operation.
std::atomic< int > m_ts_error_ctr
void frame_error_check(frameptr)
void extract_tps(constframeptr fp)
types::DAPHNESuperChunkTypeAdapter * frameptr
const types::DAPHNESuperChunkTypeAdapter * constframeptr
std::atomic< uint64_t > m_tps_send_failed
virtual void generate_opmon_data() override
void start(const appfwk::DAQModule::CommandData_t &args) override
Start operation.
timestamp_t m_previous_ts
bool m_first_ts_missmatch
std::shared_ptr< iomanager::SenderConcept< std::vector< trigger::TriggerPrimitiveTypeAdapter > > > m_tp_sink
std::chrono::time_point< std::chrono::high_resolution_clock > m_t0
dunedaq::trgdataformats::TriggerPrimitive peak_to_tp(dunedaq::fddetdataformats::DAPHNEFrame &frame, int i)
uint32_t m_def_adc_intg_thresh
std::atomic< uint64_t > m_num_new_tps
std::set< unsigned int > m_channel_mask_set
static constexpr timeout_t s_no_block
void publish(google::protobuf::Message &&, CustomOrigin &&co={}, OpMonLevel l=to_level(EntryOpMonLevel::kDefault)) const noexcept
Base class for any user define issue.
#define TLOG_DEBUG(lvl,...)
const constexpr std::size_t kDAPHNENumFrames
For DAPHNE the numbers are different. 12[DAPHNE frames] x 454[32-bit words] x 4[Bytes per word] = 217...
static std::shared_ptr< iomanager::SenderConcept< Datatype > > get_iom_sender(iomanager::ConnectionId const &id)
Both frame_count_limit and tp_count_limit were set FailedToSendTPVector
void warning(const Issue &issue)
void error(const Issue &issue)
uint64_t get_timestamp() const
trgdataformats::TriggerPrimitive tp