32TCProcessor::TCProcessor(std::unique_ptr<datahandlinglibs::FrameErrorRegistry>& error_registry,
bool post_processing_enabled)
33 : datahandlinglibs::TaskRawDataProcessorModel<
TCWrapper>(error_registry, post_processing_enabled)
101 if (mtrg ==
nullptr) {
104 for (
auto output : mtrg->get_outputs()) {
106 if (output->get_data_type() ==
"TriggerDecision") {
110 ers::error(datahandlinglibs::ResourceQueueError(
ERS_HERE,
"td",
"DefaultRequestHandlerModel", excpt));
118 for(
auto const& link : mtrg->get_mandatory_source_ids()){
124 for(
auto const& link : mtrg->get_enabled_source_ids()){
147 std::vector<const appmodel::TriggerBitword*> bitwords = proc_conf->get_trigger_bitwords();
239 this->
publish(std::move(info));
247 this->
publish(std::move(lat_info));
263 TLOG_DEBUG(3) <<
"Got TC of type " <<
static_cast<int>(tc.type) <<
", timestamp " << tc.time_candidate
264 <<
", start/end " << tc.time_start <<
"/" << tc.time_end <<
", readout start/end "
268 TLOG_DEBUG(3) <<
"Got TC of type " <<
static_cast<int>(tc.type) <<
", timestamp " << tc.time_candidate
269 <<
", start/end " << tc.time_start <<
"/" << tc.time_end;
274 TLOG_DEBUG(3) <<
" Ignore TC type: " <<
static_cast<unsigned int>(tc.type);
313 TLOG_DEBUG(5) <<
"[MLT] TD has bitword: " << td_bitword <<
" "
325 <<
", request window end: " << pending_td.
readout_end;
327 std::vector<dfmessages::ComponentRequest> requests =
333 std::vector<dfmessages::ComponentRequest> group_requests =
355 m_cv.wait(lock, [
this] {
361 for (std::vector<PendingTD>::iterator it = ready_tds.begin(); it != ready_tds.end();) {
403 bool tc_dealt =
false;
404 int64_t tc_wallclock_arrived =
405 std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
420 TLOG_DEBUG(3) <<
"TC overlapping with a previous TD, dropping!";
426 it->contributing_tcs.push_back(tc);
430 <<
" overlaps with pending TD with start/end times " << it->readout_start <<
"/"
440 <<
" overlaps with pending TD with start/end times " << it->readout_start <<
"/"
443 it->readout_end = (tc.
time_end >= it->readout_end) ? tc.
time_end : it->readout_end;
478 <<
" overlaps with pending TD with start/end times " << it->readout_start <<
"/"
482 <<
" overlaps with pending TD with start/end times " << it->readout_start <<
"/"
485 it->contributing_tcs.push_back(tc);
504std::vector<TCProcessor::PendingTD>
507 std::vector<PendingTD> return_tds;
508 for (std::vector<PendingTD>::iterator it = pending_tds.begin(); it != pending_tds.end();) {
510 std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch())
512 if (timestamp_now >= it->walltime_expiration) {
513 return_tds.push_back(*it);
514 it = pending_tds.erase(it);
516 return_tds.push_back(*it);
517 it = pending_tds.erase(it);
528 int earliest_tc_index = -1;
530 for (
int i = 0; i < static_cast<int>(pending_td.
contributing_tcs.size()); i++) {
531 if (earliest_tc_index == -1) {
533 earliest_tc_index = i;
537 earliest_tc_index = i;
541 return earliest_tc_index;
546 bool td_too_long =
false;
550 <<
", sending immediate TD!";
561 size_t tds_cleared_tc_count = std::accumulate(
564 return sum + ptd.contributing_tcs.size();
575 if (tc_type == *it) {
596 bool trigger_check =
false;
598 TLOG_DEBUG(15) <<
"TD word: " << td_bitword <<
", bitword: " << bitword;
599 trigger_check = ((td_bitword & bitword) == bitword);
600 TLOG_DEBUG(15) <<
"&: " << (td_bitword & bitword);
601 TLOG_DEBUG(15) <<
"trigger?: " << trigger_check;
602 if (trigger_check ==
true)
605 return trigger_check;
614 for (
const std::string& tctype_str: bitword->get_bitword()) {
617 if (tc_type == TCType::kUnknown) {
621 temp_bitword.set(
static_cast<uint64_t
>(tc_type));
631 for (
auto readout_type : data) {
636 if (tc_type == TCType::kUnknown) {
641 readout_type->get_time_before(), readout_type->get_time_after()
648 std::pair<triggeralgs::timestamp_t, triggeralgs::timestamp_t>> map)
651 for (
auto const& [key, val] : map) {
652 TLOG_DEBUG(3) <<
"Type: " <<
static_cast<int>(key) <<
", before: " << val.first <<
", after: " << val.second;
660 for (
auto group : data) {
661 const nlohmann::json& temp_links_data = group[
"links"];
662 std::vector<dfmessages::SourceID> temp_links;
663 for (
auto link : temp_links_data) {
664 temp_links.push_back(
678 for (
auto const& link : val) {
692 request.window_begin =
start;
693 request.window_end = end;
695 TLOG_DEBUG(10) <<
"setting request start: " << request.window_begin;
696 TLOG_DEBUG(10) <<
"setting request end: " << request.window_end;
701std::vector<dfmessages::ComponentRequest>
706 std::vector<dfmessages::ComponentRequest> requests;
707 for (
auto link : links) {
715 std::vector<dfmessages::ComponentRequest> requests)
717 for (
auto request : requests) {
727 for (
auto group : data) {
729 temp_roi_group.
n_links = group->get_number_of_link_groups();
730 temp_roi_group.
prob = group->get_probability();
731 temp_roi_group.
time_window = group->get_time_window();
732 temp_roi_group.
mode = group->get_groups_selection_mode();
733 m_roi_conf.insert({ counter, temp_roi_group });
736 run_sum +=
static_cast<float>(group->get_probability());
747 for (
const auto& [key, value] : roi_conf) {
749 TLOG_DEBUG(3) <<
"n links: " << value.n_links;
751 TLOG_DEBUG(3) <<
"time: " << value.time_window;
761 float rnd = (double)rand() / RAND_MAX;
762 return rnd * (limit);
781 int rnd = rand() %
range;
789 if (group_pick != -1) {
791 std::vector<dfmessages::SourceID> links;
794 if (this_group.
mode ==
"kRandom") {
796 std::set<int> groups;
797 while (
static_cast<int>(groups.size()) < this_group.
n_links) {
800 for (
auto r_id : groups) {
807 while (r_id < this_group.
n_links) {
817 std::vector<dfmessages::ComponentRequest> requests =
830 std::vector<int> tc_types;
832 tc_types.push_back(
static_cast<int>(tc.type));
834 tc_types.erase(std::unique(tc_types.begin(), tc_types.end()), tc_types.end());
838 for (
auto tc_type : tc_types) {
839 td_bitword.set(tc_type);
847 TLOG() <<
"TCProcessor opmon counters summary:";
848 TLOG() <<
"------------------------------";
854 TLOG() <<
"------------------------------";
#define DUNE_DAQ_TYPESTRING(Type, typestring)
const dunedaq::appmodel::DataProcessor * get_data_processor() const
Get "data_processor" relationship value.
const dunedaq::appmodel::DataHandlerConf * get_module_configuration() const
Get "module_configuration" relationship value.
const TARGET * cast() const noexcept
Casts object to different class.
void scrap(const nlohmann::json &) override
void start(const nlohmann::json &) override
void add_postprocess_task(Task &&task)
void conf(const appmodel::DataHandlerModule *conf) override
void stop(const nlohmann::json &) override
std::atomic< uint64_t > m_last_processed_daq_ts
static constexpr timeout_t s_no_block
void publish(google::protobuf::Message &&, CustomOrigin &&co={}, OpMonLevel l=to_level(EntryOpMonLevel::kDefault)) const noexcept
void update_latency_out(uint64_t latency)
latency get_latency_in() const
latency get_latency_out() const
void update_latency_in(uint64_t latency)
std::vector< const appmodel::TCReadoutMap * > m_readout_window_map_data
std::atomic< metric_counter_type > m_tc_ignored_count
std::atomic< metric_counter_type > m_tds_dropped_count
void stop(const nlohmann::json &args) override
Stop operation.
std::bitset< 64 > TDBitset
bool check_trigger_type_ignore(unsigned int tc_type)
void call_tc_decision(const PendingTD &pending_td)
dunedaq::trigger::Latency m_latency_instance
std::vector< unsigned int > m_ignored_tc_types
std::map< int, std::vector< daqdataformats::SourceID > > m_group_links
void print_roi_conf(std::map< int, roi_group > roi_conf)
TDBitset get_TD_bitword(const PendingTD &ready_td) const
void parse_roi_conf(const std::vector< const appmodel::ROIGroupConf * > &data)
std::thread m_send_trigger_decisions_thread
void scrap(const nlohmann::json &args) override
Unconfigure.
std::mutex m_td_vector_mutex
std::atomic< metric_counter_type > m_tds_failed_bitword_tc_count
void add_requests_to_decision(dfmessages::TriggerDecision &decision, std::vector< dfmessages::ComponentRequest > requests)
std::shared_ptr< iomanager::SenderConcept< dfmessages::TriggerDecision > > m_td_sink
dfmessages::TriggerDecision create_decision(const PendingTD &pending_td)
std::vector< dfmessages::ComponentRequest > create_all_decision_requests(std::vector< daqdataformats::SourceID > links, triggeralgs::timestamp_t start, triggeralgs::timestamp_t end)
std::atomic< metric_counter_type > m_tds_cleared_count
nlohmann::json m_group_links_data
void conf(const appmodel::DataHandlerModule *conf) override
Set the emulator mode, if active, timestamps of processed packets are overwritten with new ones.
std::atomic< bool > m_tc_merging
bool check_trigger_bitwords(const TDBitset &td_bitword) const
void roi_readout_make_requests(dfmessages::TriggerDecision &decision)
std::map< int, roi_group > m_roi_conf
std::atomic< metric_counter_type > m_tds_sent_tc_count
void print_readout_map(std::map< TCType, std::pair< triggeralgs::timestamp_t, triggeralgs::timestamp_t > > map)
void make_td(const TCWrapper *tc)
void generate_opmon_data() override
std::atomic< metric_counter_type > m_tds_cleared_tc_count
void set_trigger_bitwords(const std::vector< const appmodel::TriggerBitword * > &_bitwords)
bool check_td_readout_length(const PendingTD &)
std::atomic< metric_counter_type > m_tds_dropped_tc_count
std::atomic< metric_counter_type > m_tds_failed_bitword_count
std::vector< int > m_roi_conf_ids
std::vector< PendingTD > get_ready_tds(std::vector< PendingTD > &pending_tds)
std::vector< float > m_roi_conf_probs_c
std::vector< PendingTD > m_pending_tds
std::atomic< metric_counter_type > m_tc_received_count
std::atomic< metric_counter_type > m_tds_created_tc_count
void start(const nlohmann::json &args) override
Start operation.
void print_trigger_bitwords()
bool m_ignore_tc_pileup
Ignore TCs that overlap with already made TD.
dfmessages::ComponentRequest create_request_for_link(daqdataformats::SourceID link, triggeralgs::timestamp_t start, triggeralgs::timestamp_t end)
std::vector< const appmodel::ROIGroupConf * > m_roi_conf_data
std::atomic< bool > m_running_flag
void parse_readout_map(const std::vector< const appmodel::TCReadoutMap * > &data)
std::condition_variable m_cv
void add_tc(const triggeralgs::TriggerCandidate tc)
std::atomic< metric_counter_type > m_tds_sent_count
int64_t m_td_readout_limit
std::vector< float > m_roi_conf_probs
std::atomic< bool > m_latency_monitoring
bool check_overlap(const triggeralgs::TriggerCandidate &tc, const PendingTD &pending_td)
int pick_roi_group_conf()
void parse_group_links(const nlohmann::json &data)
void send_trigger_decisions()
std::atomic< metric_counter_type > m_tds_created_count
float get_random_num_float(float limit)
std::vector< TDBitset > m_trigger_bitwords
void add_tc_ignored(const triggeralgs::TriggerCandidate tc)
std::atomic< bool > m_send_timed_out_tds
std::vector< daqdataformats::SourceID > m_mandatory_links
int get_earliest_tc_index(const PendingTD &pending_td)
std::map< TCType, std::pair< triggeralgs::timestamp_t, triggeralgs::timestamp_t > > m_readout_window_map
void set_latency_out(::uint32_t value)
void set_latency_in(::uint32_t value)
Base class for any user define issue.
#define TLVL_ENTER_EXIT_METHODS
#define TLOG_DEBUG(lvl,...)
@ kLocalized
Local readout, send Fragments to dataflow.
daqdataformats::trigger_type_t trigger_type_t
Copy daqdataformats::trigger_type_t.
DAC value out of range
Message.
static std::shared_ptr< iomanager::SenderConcept< Datatype > > get_iom_sender(iomanager::ConnectionId const &id)
void warning(const Issue &issue)
void error(const Issue &issue)
dunedaq::trgdataformats::timestamp_t timestamp_t
A message containing information about a Trigger from Data Selection (or a TriggerDecisionEmulator)
std::vector< ComponentRequest > components
The DAQ components which should be read out to create the TriggerRecord.
ReadoutType readout_type
The type of readout to use (i.e. where to route data)
run_number_t run_number
The current run number.
trigger_number_t trigger_number
The trigger number assigned to this TriggerDecision.
timestamp_t trigger_timestamp
The DAQ timestamp.
trigger_type_t trigger_type
The type of the trigger.
std::vector< triggeralgs::TriggerCandidate > contributing_tcs
triggeralgs::timestamp_t readout_end
triggeralgs::timestamp_t readout_start
int64_t walltime_expiration
triggeralgs::timestamp_t time_window
triggeralgs::TriggerCandidate candidate
Factory couldn t std::string alg_name InvalidConfiguration