LCOV - code coverage report
Current view: top level - trigger/src - TCProcessor.cpp (source / functions) Coverage Total Hit
Test: code.result Lines: 0.0 % 511 0
Test Date: 2025-12-21 13:07:08 Functions: 0.0 % 108 0

            Line data    Source code
       1              : /**
       2              :  * @file TCProcessor.hpp TPC TP specific Task based raw processor
       3              :  *
       4              :  * This is part of the DUNE DAQ , copyright 2023.
       5              :  * Licensing/copyright details are in the COPYING file that you should have
       6              :  * received with this code.
       7              :  */
       8              : #include "trigger/TCProcessor.hpp" // NOLINT(build/include)
       9              : 
      10              : #include "iomanager/Sender.hpp"
      11              : #include "logging/Logging.hpp"
      12              : 
      13              : #include "datahandlinglibs/FrameErrorRegistry.hpp"
      14              : #include "datahandlinglibs/DataHandlingIssues.hpp"
      15              : #include "datahandlinglibs/ReadoutLogging.hpp"
      16              : #include "datahandlinglibs/models/IterableQueueModel.hpp"
      17              : #include "trigger/TCWrapper.hpp"
      18              : #include "triggeralgs/TriggerCandidate.hpp"
      19              : 
      20              : #include "appmodel/TCDataProcessor.hpp"
      21              : #include "appmodel/TriggerDataHandlerModule.hpp"
      22              : 
      23              : using dunedaq::datahandlinglibs::logging::TLVL_BOOKKEEPING;
      24              : using dunedaq::datahandlinglibs::logging::TLVL_TAKE_NOTE;
      25              : 
      26              : // THIS SHOULDN'T BE HERE!!!!! But it is necessary.....
      27              : DUNE_DAQ_TYPESTRING(dunedaq::trigger::TCWrapper, "TriggerCandidate")
      28              : 
      29              : namespace dunedaq {
      30              : namespace trigger {
      31              : 
      32            0 : TCProcessor::TCProcessor(std::unique_ptr<datahandlinglibs::FrameErrorRegistry>& error_registry, bool post_processing_enabled)
      33            0 :   : datahandlinglibs::TaskRawDataProcessorModel<TCWrapper>(error_registry, post_processing_enabled)
      34              : {
      35            0 : }
      36              : 
      37            0 : TCProcessor::~TCProcessor()
      38            0 : {}
      39              : 
      40              : void
      41            0 : TCProcessor::start(const appfwk::DAQModule::CommandData_t& args)
      42              : {
      43            0 :   TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << "TCProcessor: Entering start() method";
      44              : 
      45            0 :   m_running_flag.store(true);
      46            0 :   m_send_trigger_decisions_thread = std::thread(&TCProcessor::send_trigger_decisions, this);
      47            0 :   pthread_setname_np(m_send_trigger_decisions_thread.native_handle(), "mlt-dec"); // TODO: originally mlt-trig-dec
      48              : 
      49              :   // Reset stats
      50            0 :   m_tds_created_count.store(0);
      51            0 :   m_tds_sent_count.store(0);
      52            0 :   m_tds_dropped_count.store(0);
      53            0 :   m_tds_failed_bitword_count.store(0);
      54            0 :   m_tds_cleared_count.store(0);
      55              :   // per TC
      56            0 :   m_tc_received_count.store(0);
      57            0 :   m_tds_created_tc_count.store(0);
      58            0 :   m_tds_sent_tc_count.store(0);
      59            0 :   m_tds_dropped_tc_count.store(0);
      60            0 :   m_tds_failed_bitword_tc_count.store(0);
      61            0 :   m_tds_cleared_tc_count.store(0);
      62            0 :   m_tc_ignored_count.store(0);
      63            0 :   inherited::start(args);
      64              : 
      65            0 :   TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << "TCProcessor: Exiting start() method";
      66            0 : }
      67              : 
      68              : void
      69            0 : TCProcessor::stop(const appfwk::DAQModule::CommandData_t& args)
      70              : {
      71            0 :   TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << "TCProcessor: Entering stop() method";
      72              : 
      73            0 :   inherited::stop(args);
      74            0 :   m_running_flag.store(false);
      75              : 
      76              :   // Make sure condition_variable knows we flipped running flag
      77            0 :   {
      78            0 :     std::lock_guard<std::mutex> lock(m_td_vector_mutex);
      79            0 :     m_cv.notify_all();
      80            0 :   }
      81              : 
      82              :   // Wait for the TD-sending thread to stop
      83            0 :   m_send_trigger_decisions_thread.join();
      84              : 
      85              :   // Drop all TDs in vectors at run stage change. Have to do this
      86              :   // after joining m_send_trigger_decisions_thread so we don't
      87              :   // concurrently access the vectors
      88            0 :   clear_td_vectors();
      89              : 
      90            0 :   print_opmon_stats();
      91              : 
      92            0 :   TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << "TCProcessor: Exiting stop() method";
      93            0 : }
      94              : 
      95              : void
      96            0 : TCProcessor::conf(const appmodel::DataHandlerModule* cfg)
      97              : {
      98            0 :   TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << "TCProcessor: Entering conf() method";
      99              : 
     100            0 :   auto mtrg = cfg->cast<appmodel::TriggerDataHandlerModule>(); 
     101            0 :   if (mtrg == nullptr) {
     102            0 :     throw(InvalidConfiguration(ERS_HERE, "Provided null TriggerDataHandlerModule configuration!"));
     103              :   }
     104            0 :   for (auto output : mtrg->get_outputs()) {
     105            0 :    try {
     106            0 :       if (output->get_data_type() == "TriggerDecision") {
     107            0 :          m_td_sink = get_iom_sender<dfmessages::TriggerDecision>(output->UID());
     108              :       }
     109            0 :     } catch (const ers::Issue& excpt) {
     110            0 :       ers::error(datahandlinglibs::ResourceQueueError(ERS_HERE, "td", "DefaultRequestHandlerModel", excpt));
     111            0 :     }
     112              :   }
     113              : 
     114            0 :   auto dp = mtrg->get_module_configuration()->get_data_processor();
     115            0 :   auto proc_conf = dp->cast<appmodel::TCDataProcessor>();
     116              : 
     117              :   // Add all Source IDs to mandatoy links for now...
     118            0 :   for(auto const& link : mtrg->get_mandatory_source_ids()){
     119            0 :     m_mandatory_links.push_back(
     120            0 :         dfmessages::SourceID{
     121            0 :         daqdataformats::SourceID::string_to_subsystem(link->get_subsystem()),
     122            0 :         link->get_sid()});
     123              :   }  
     124            0 :   for(auto const& link : mtrg->get_enabled_source_ids()){
     125            0 :     m_mandatory_links.push_back(
     126            0 :         dfmessages::SourceID{
     127            0 :         daqdataformats::SourceID::string_to_subsystem(link->get_subsystem()),
     128            0 :         link->get_sid()});
     129              :   }
     130              : 
     131              :   // TODO: Group links!
     132              :   //m_group_links_data = conf->get_groups_links();
     133            0 :   parse_group_links(m_group_links_data);
     134            0 :   print_group_links();
     135            0 :   m_total_group_links = m_group_links.size();
     136            0 :   TLOG_DEBUG(3) << "Total group links: " << m_total_group_links;
     137              : 
     138            0 :   m_tc_merging        = proc_conf->get_merge_overlapping_tcs();
     139            0 :   m_ignore_tc_pileup = proc_conf->get_ignore_overlapping_tcs();
     140            0 :   m_buffer_timeout    = proc_conf->get_buffer_timeout();
     141            0 :   m_send_timed_out_tds = (m_ignore_tc_pileup) ? false : proc_conf->get_td_out_of_timeout();
     142            0 :   m_td_readout_limit  = proc_conf->get_td_readout_limit();
     143            0 :   m_ignored_tc_types = proc_conf->get_ignore_tc();
     144            0 :   m_ignoring_tc_types = !m_ignored_tc_types.empty();
     145              : 
     146              :   // Trigger bitwords
     147            0 :   std::vector<const appmodel::TriggerBitword*> bitwords = proc_conf->get_trigger_bitwords();
     148            0 :   m_use_bitwords = !bitwords.empty();
     149            0 :   if(m_use_bitwords){
     150            0 :     set_trigger_bitwords(bitwords);
     151            0 :     print_trigger_bitwords();
     152              :   }
     153            0 :   TLOG_DEBUG(3) << "Use bitwords: " << m_use_bitwords;
     154            0 :   TLOG_DEBUG(3) << "Allow merging: " << m_tc_merging;
     155            0 :   TLOG_DEBUG(3) << "Ignore pileup: " << m_ignore_tc_pileup;
     156            0 :   TLOG_DEBUG(3) << "Buffer timeout: " << m_buffer_timeout;
     157            0 :   TLOG_DEBUG(3) << "Should send timed out TDs: " << m_send_timed_out_tds;
     158            0 :   TLOG_DEBUG(3) << "TD readout limit: " << m_td_readout_limit;
     159              : 
     160              :   // ROI map
     161            0 :   m_roi_conf_data = proc_conf->get_roi_group_conf();
     162            0 :   m_use_roi_readout = !m_roi_conf_data.empty();
     163            0 :   if (m_use_roi_readout) {
     164            0 :     parse_roi_conf(m_roi_conf_data);
     165            0 :     print_roi_conf(m_roi_conf);
     166              :   }
     167            0 :   TLOG_DEBUG(3) << "Use ROI readout?: " << m_use_roi_readout;
     168              : 
     169              :   // Custom readout map
     170            0 :   m_readout_window_map_data = proc_conf->get_tc_readout_map();
     171            0 :   m_use_readout_map = !m_readout_window_map_data.empty();
     172            0 :   if (m_use_readout_map) {
     173            0 :     parse_readout_map(m_readout_window_map_data);
     174            0 :     print_readout_map(m_readout_window_map);
     175              :   }
     176            0 :   TLOG_DEBUG(3) << "Use readout map: " << m_use_readout_map;
     177              : 
     178              :   // Ignoring TC types
     179            0 :   TLOG_DEBUG(3) << "Ignoring TC types: " << m_ignoring_tc_types;
     180            0 :   if(m_ignoring_tc_types){
     181            0 :     TLOG_DEBUG(3) << "TC types to ignore: ";
     182            0 :     for (std::vector<unsigned int>::iterator it = m_ignored_tc_types.begin(); it != m_ignored_tc_types.end();) {
     183            0 :       TLOG_DEBUG(3) << *it;
     184            0 :       ++it;
     185              :     }
     186              :   }
     187            0 :   m_latency_monitoring.store( dp->get_latency_monitoring() );
     188            0 :   inherited::add_postprocess_task(std::bind(&TCProcessor::make_td, this, std::placeholders::_1));
     189              : 
     190            0 :   inherited::conf(mtrg);
     191              : 
     192            0 :   TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << "TCProcessor: Exiting conf() method";
     193            0 : }
     194              : 
     195              : void
     196            0 : TCProcessor::scrap(const appfwk::DAQModule::CommandData_t& args)
     197              : {
     198            0 :   TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << "TCProcessor: Entering scrap() method";
     199              : 
     200            0 :   m_mandatory_links.clear();
     201            0 :   m_group_links.clear();
     202            0 :   m_roi_conf.clear();
     203            0 :   m_roi_conf_data.clear();
     204            0 :   m_roi_conf_ids.clear();
     205            0 :   m_roi_conf_probs.clear();
     206            0 :   m_roi_conf_probs_c.clear();
     207            0 :   m_pending_tds.clear();
     208            0 :   m_readout_window_map_data.clear();
     209            0 :   m_readout_window_map.clear();
     210            0 :   m_ignored_tc_types.clear();
     211              :   
     212            0 :   m_td_sink.reset();
     213              :   
     214            0 :   m_group_links_data.clear();
     215              :   
     216            0 :   inherited::scrap(args);
     217              : 
     218            0 :   TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << "TCProcessor: Exiting scrap() method";
     219            0 : }
     220              : 
     221              : void
     222            0 : TCProcessor::generate_opmon_data()
     223              : {
     224            0 :   opmon::TCProcessorInfo info;
     225              : 
     226            0 :   info.set_tds_created_count( m_tds_created_count.load() );  
     227            0 :   info.set_tds_sent_count( m_tds_sent_count.load() );
     228            0 :   info.set_tds_dropped_count( m_tds_dropped_count.load() );
     229            0 :   info.set_tds_failed_bitword_count( m_tds_failed_bitword_count.load() );
     230            0 :   info.set_tds_cleared_count( m_tds_cleared_count.load() );
     231            0 :   info.set_tc_received_count( m_tc_received_count.load() );
     232            0 :   info.set_tc_ignored_count( m_tc_ignored_count.load() );
     233            0 :   info.set_tds_created_tc_count( m_tds_created_tc_count.load() );
     234            0 :   info.set_tds_sent_tc_count( m_tds_sent_tc_count.load() );
     235            0 :   info.set_tds_dropped_tc_count( m_tds_dropped_tc_count.load() );
     236            0 :   info.set_tds_failed_bitword_tc_count( m_tds_failed_bitword_tc_count.load() );
     237            0 :   info.set_tds_cleared_tc_count( m_tds_cleared_tc_count.load() );
     238              : 
     239            0 :   this->publish(std::move(info));
     240              : 
     241            0 :   if ( m_latency_monitoring.load() && m_running_flag.load() ) {
     242            0 :     opmon::TriggerLatency lat_info;
     243              : 
     244            0 :     lat_info.set_latency_in( m_latency_instance.get_latency_in() );
     245            0 :     lat_info.set_latency_out( m_latency_instance.get_latency_out() );
     246              : 
     247            0 :     this->publish(std::move(lat_info));
     248            0 :   }
     249            0 : }
     250              : 
     251              : /**
     252              :  * Pipeline Stage 2.: put valid TCs in a vector for grouping and forming of TDs
     253              :  * */
     254              : void
     255            0 : TCProcessor::make_td(const TCWrapper* tcw)
     256              : {
     257              :         
     258            0 :   auto tc = tcw->candidate;
     259            0 :   if (m_latency_monitoring.load()) m_latency_instance.update_latency_in( tc.time_start );
     260            0 :   m_tc_received_count++;
     261              : 
     262            0 :   if ( (m_use_readout_map) && (m_readout_window_map.count(tc.type)) ) {
     263            0 :     TLOG_DEBUG(3) << "Got TC of type " << static_cast<int>(tc.type) << ", timestamp " << tc.time_candidate
     264            0 :                   << ", start/end " << tc.time_start << "/" << tc.time_end << ", readout start/end "
     265            0 :                   << tc.time_candidate - m_readout_window_map[tc.type].first << "/"
     266            0 :                   << tc.time_candidate + m_readout_window_map[tc.type].second;
     267              :   } else {
     268            0 :     TLOG_DEBUG(3) << "Got TC of type " << static_cast<int>(tc.type) << ", timestamp " << tc.time_candidate
     269            0 :                   << ", start/end " << tc.time_start << "/" << tc.time_end;
     270              :   }
     271              : 
     272              :   // Option to ignore TC types (if given by config)
     273            0 :   if (m_ignoring_tc_types == true && check_trigger_type_ignore(static_cast<unsigned int>(tc.type)) == true) {
     274            0 :     TLOG_DEBUG(3) << " Ignore TC type: " << static_cast<unsigned int>(tc.type);
     275            0 :     m_tc_ignored_count++;
     276              : 
     277              :         /*FIXME: comment out this block: if a TC is to be ignored it shall just be ignored! 
     278              :           if (m_tc_merging) {
     279              :             // Still need to check for overlap with existing TD, if overlaps, include in the TD, but don't extend
     280              :             // readout
     281              :             std::lock_guard<std::mutex> lock(m_td_vector_mutex);
     282              :             add_tc_ignored(*tc);
     283              :           }
     284              :           */
     285              :   }
     286              :   else {
     287            0 :     std::lock_guard<std::mutex> lock(m_td_vector_mutex);
     288            0 :     add_tc(tc);
     289            0 :     m_cv.notify_one();
     290            0 :     TLOG_DEBUG(10) << "pending tds size: " << m_pending_tds.size();
     291            0 :   }
     292            0 :   m_last_processed_daq_ts = tc.time_start;
     293            0 :   return;
     294            0 : }
     295              : 
     296              : dfmessages::TriggerDecision
     297            0 : TCProcessor::create_decision(const PendingTD& pending_td)
     298              : {
     299            0 :   m_earliest_tc_index = get_earliest_tc_index(pending_td);
     300            0 :   TLOG_DEBUG(5) << "earliest TC index: " << m_earliest_tc_index;
     301              : 
     302            0 :   if (pending_td.contributing_tcs.size() > 1) {
     303            0 :     TLOG_DEBUG(5) << "!!! TD created from " << pending_td.contributing_tcs.size() << " TCs !!!";
     304              :   }
     305              : 
     306            0 :   dfmessages::TriggerDecision decision;
     307            0 :   decision.trigger_number = 0; // filled by MLT
     308            0 :   decision.run_number = 0; // filled by MLT
     309            0 :   decision.trigger_timestamp = pending_td.contributing_tcs[m_earliest_tc_index].time_candidate;
     310            0 :   decision.readout_type = dfmessages::ReadoutType::kLocalized;
     311              : 
     312            0 :   TDBitset td_bitword = get_TD_bitword(pending_td);
     313            0 :   TLOG_DEBUG(5) << "[MLT] TD has bitword: " << td_bitword << " "
     314            0 :                                      << static_cast<dfmessages::trigger_type_t>(td_bitword.to_ulong());
     315            0 :   decision.trigger_type = static_cast<dfmessages::trigger_type_t>(td_bitword.to_ulong()); // m_trigger_type;
     316              : 
     317              :     //decision.trigger_type = 1; // m_trigger_type;
     318              : 
     319            0 :   TLOG_DEBUG(3) << ", TC detid: " << pending_td.contributing_tcs[m_earliest_tc_index].detid
     320            0 :                 << ", TC type: " << static_cast<int>(pending_td.contributing_tcs[m_earliest_tc_index].type)
     321            0 :                 << ", TC cont number: " << pending_td.contributing_tcs.size()
     322            0 :                 << ", DECISION trigger type: " << decision.trigger_type
     323            0 :                 << ", DECISION timestamp: " << decision.trigger_timestamp
     324            0 :                 << ", request window begin: " << pending_td.readout_start
     325            0 :                 << ", request window end: " << pending_td.readout_end;
     326              : 
     327            0 :   std::vector<dfmessages::ComponentRequest> requests =
     328            0 :     create_all_decision_requests(m_mandatory_links, pending_td.readout_start, pending_td.readout_end);
     329            0 :   add_requests_to_decision(decision, requests);
     330              : 
     331            0 :   if (!m_use_roi_readout) {
     332            0 :     for (const auto& [key, value] : m_group_links) {
     333            0 :       std::vector<dfmessages::ComponentRequest> group_requests =
     334            0 :         create_all_decision_requests(value, pending_td.readout_start, pending_td.readout_end);
     335            0 :       add_requests_to_decision(decision, group_requests);
     336            0 :     }
     337              :   } else { // using ROI readout
     338            0 :     roi_readout_make_requests(decision);
     339              :   }
     340              : 
     341            0 :   m_tds_created_count++;
     342            0 :   m_tds_created_tc_count += pending_td.contributing_tcs.size();
     343              : 
     344            0 :   return decision;
     345            0 : }
     346              : 
     347              : 
     348              : void
     349            0 : TCProcessor::send_trigger_decisions() {
     350              :  // A unique lock that can be locked and unlocked
     351            0 :  std::unique_lock<std::mutex> lock(m_td_vector_mutex);
     352              : 
     353            0 :  while (m_running_flag) {
     354              :     // TODO: think about better implementation (notify?, something event driven)
     355            0 :     m_cv.wait_for(lock, std::chrono::microseconds(100));
     356            0 :     auto ready_tds = get_ready_tds(m_pending_tds);
     357            0 :     TLOG_DEBUG(10) << "ready tds: " << ready_tds.size() << ", updated pending tds: " << m_pending_tds.size();
     358              : 
     359            0 :     for (std::vector<PendingTD>::iterator it = ready_tds.begin(); it != ready_tds.end();) {
     360            0 :         call_tc_decision(*it);
     361            0 :         ++it;
     362              :     }
     363            0 :  }
     364            0 : }
     365              : 
     366              : void
     367            0 : TCProcessor::call_tc_decision(const TCProcessor::PendingTD& pending_td)
     368              : {
     369              : 
     370            0 :   if (m_use_bitwords) {
     371              :     // Check trigger bitwords
     372            0 :     TDBitset td_bitword = get_TD_bitword(pending_td);
     373            0 :     if (!check_trigger_bitwords(td_bitword)) {
     374              :       // Don't process further if the bitword check failed
     375            0 :       m_tds_failed_bitword_count++;
     376            0 :       m_tds_failed_bitword_tc_count += pending_td.contributing_tcs.size();
     377            0 :       return;
     378              :     }
     379              :   }
     380              : 
     381            0 :   dfmessages::TriggerDecision decision = create_decision(pending_td);
     382            0 :   auto tn = decision.trigger_number;
     383            0 :   auto td_ts = decision.trigger_timestamp;
     384              : 
     385            0 :   if (m_latency_monitoring.load()) m_latency_instance.update_latency_out( pending_td.contributing_tcs.front().time_start );
     386            0 :   if(!m_td_sink->try_send(std::move(decision), iomanager::Sender::s_no_block)) {
     387            0 :     ers::warning(TDDropped(ERS_HERE, tn, td_ts));
     388            0 :     m_tds_dropped_count++;
     389            0 :     m_tds_dropped_tc_count += pending_td.contributing_tcs.size();
     390              :   }
     391              :   else {
     392            0 :     m_tds_sent_count++;
     393            0 :     m_tds_sent_tc_count += pending_td.contributing_tcs.size();
     394              :   }
     395            0 : }
     396              : 
     397              : 
     398              : void
     399            0 : TCProcessor::add_tc(const triggeralgs::TriggerCandidate tc)
     400              : {
     401            0 :   bool tc_dealt = false;
     402            0 :   int64_t tc_wallclock_arrived =
     403            0 :     std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
     404              : 
     405            0 :   if (m_tc_merging || m_ignore_tc_pileup) {
     406              : 
     407            0 :     for (std::vector<PendingTD>::iterator it = m_pending_tds.begin(); it != m_pending_tds.end();) {
     408              :       // Don't deal with TC here if there's no overlap
     409            0 :       if (!check_overlap(tc, *it)) {
     410            0 :         it++;
     411            0 :         continue;
     412              :       }
     413              : 
     414              :       // If overlap and ignoring, we drop the TC and flag it as dealt with.
     415            0 :       if (m_ignore_tc_pileup) {
     416            0 :         m_tds_dropped_tc_count++;
     417            0 :         tc_dealt = true;
     418            0 :         TLOG_DEBUG(3) << "TC overlapping with a previous TD, dropping!";
     419            0 :         break;
     420              :       }
     421              : 
     422              :       // If we're here, TC merging must be on, in which case we're actually
     423              :       // going to merge the TC into the TD.    
     424            0 :       it->contributing_tcs.push_back(tc);
     425            0 :       if ( (m_use_readout_map) && (m_readout_window_map.count(tc.type)) ){
     426            0 :         TLOG_DEBUG(3) << "TC with start/end times " << tc.time_candidate - m_readout_window_map[tc.type].first << "/"
     427            0 :                       << tc.time_candidate + m_readout_window_map[tc.type].second
     428            0 :                       << " overlaps with pending TD with start/end times " << it->readout_start << "/"
     429            0 :                       << it->readout_end;
     430            0 :         it->readout_start = ((tc.time_candidate - m_readout_window_map[tc.type].first) >= it->readout_start)
     431            0 :                               ? it->readout_start
     432            0 :                               : (tc.time_candidate - m_readout_window_map[tc.type].first);
     433            0 :         it->readout_end = ((tc.time_candidate + m_readout_window_map[tc.type].second) >= it->readout_end)
     434            0 :                             ? (tc.time_candidate + m_readout_window_map[tc.type].second)
     435              :                             : it->readout_end;
     436              :       } else {
     437            0 :         TLOG_DEBUG(3) << "TC with start/end times " << tc.time_start << "/" << tc.time_end
     438            0 :                       << " overlaps with pending TD with start/end times " << it->readout_start << "/"
     439            0 :                       << it->readout_end;
     440            0 :         it->readout_start = (tc.time_start >= it->readout_start) ? it->readout_start : tc.time_start;
     441            0 :         it->readout_end = (tc.time_end >= it->readout_end) ? tc.time_end : it->readout_end;
     442              :       }
     443            0 :       it->walltime_expiration = tc_wallclock_arrived + m_buffer_timeout;
     444            0 :       tc_dealt = true;
     445            0 :       break;
     446              :     }
     447              :   }
     448              : 
     449              :   // Don't do anything else if we've already dealt with the TC
     450            0 :   if (tc_dealt) {
     451            0 :     return;
     452              :   }
     453              : 
     454              :   // Create a new TD out of the TC
     455            0 :   PendingTD td_candidate;
     456            0 :   td_candidate.contributing_tcs.push_back(tc);
     457            0 :   if ( (m_use_readout_map) && (m_readout_window_map.count(tc.type)) ){
     458            0 :     td_candidate.readout_start = tc.time_candidate - m_readout_window_map[tc.type].first;
     459            0 :     td_candidate.readout_end = tc.time_candidate + m_readout_window_map[tc.type].second;
     460              :   } else {
     461            0 :     td_candidate.readout_start = tc.time_start;
     462            0 :     td_candidate.readout_end = tc.time_end;
     463              :   }
     464            0 :   td_candidate.walltime_expiration = tc_wallclock_arrived + m_buffer_timeout;
     465            0 :   m_pending_tds.push_back(td_candidate);
     466            0 : }
     467              : 
     468              : void
     469            0 : TCProcessor::add_tc_ignored(const triggeralgs::TriggerCandidate tc)
     470              : {
     471            0 :   for (std::vector<PendingTD>::iterator it = m_pending_tds.begin(); it != m_pending_tds.end();) {
     472            0 :     if (check_overlap(tc, *it)) {
     473            0 :       if ( (m_use_readout_map) && (m_readout_window_map.count(tc.type)) ) {
     474            0 :         TLOG_DEBUG(3) << "!Ignored! TC with start/end times " << tc.time_candidate - m_readout_window_map[tc.type].first
     475            0 :                       << "/" << tc.time_candidate + m_readout_window_map[tc.type].second
     476            0 :                       << " overlaps with pending TD with start/end times " << it->readout_start << "/"
     477            0 :                       << it->readout_end;
     478              :       } else {
     479            0 :         TLOG_DEBUG(3) << "!Ignored! TC with start/end times " << tc.time_start << "/" << tc.time_end
     480            0 :                       << " overlaps with pending TD with start/end times " << it->readout_start << "/"
     481            0 :                       << it->readout_end;
     482              :       }
     483            0 :       it->contributing_tcs.push_back(tc);
     484              :       break;
     485              :     }
     486            0 :     ++it;
     487              :   }
     488            0 :   return;
     489              : }
     490              : 
     491              : bool
     492            0 : TCProcessor::check_overlap(const triggeralgs::TriggerCandidate& tc, const PendingTD& pending_td)
     493              : {
     494            0 :   if ( (m_use_readout_map) && (m_readout_window_map.count(tc.type)) ) {
     495            0 :     return !(((tc.time_candidate + m_readout_window_map[tc.type].second) < pending_td.readout_start) ||
     496            0 :              ((tc.time_candidate - m_readout_window_map[tc.type].first > pending_td.readout_end)));
     497              :   } else {
     498            0 :     return !((tc.time_end < pending_td.readout_start) || (tc.time_start > pending_td.readout_end));
     499              :   }
     500              : }
     501              : 
     502              : std::vector<TCProcessor::PendingTD>
     503            0 : TCProcessor::get_ready_tds(std::vector<PendingTD>& pending_tds)
     504              : {
     505            0 :   std::vector<PendingTD> return_tds;
     506            0 :   for (std::vector<PendingTD>::iterator it = pending_tds.begin(); it != pending_tds.end();) {
     507            0 :     auto timestamp_now =
     508            0 :       std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch())
     509            0 :         .count();
     510            0 :     if (timestamp_now >= it->walltime_expiration) {
     511            0 :       return_tds.push_back(*it);
     512            0 :       it = pending_tds.erase(it);
     513            0 :     } else if (check_td_readout_length(*it)) { // Also pass on TDs with (too) long readout window
     514            0 :       return_tds.push_back(*it);
     515            0 :       it = pending_tds.erase(it);
     516              :     } else {
     517            0 :       ++it;
     518              :     }
     519              :   }
     520            0 :   return return_tds;
     521            0 : }
     522              : 
     523              : int
     524            0 : TCProcessor::get_earliest_tc_index(const PendingTD& pending_td)
     525              : {
     526            0 :   int earliest_tc_index = -1;
     527            0 :   triggeralgs::timestamp_t earliest_tc_time;
     528            0 :   for (int i = 0; i < static_cast<int>(pending_td.contributing_tcs.size()); i++) {
     529            0 :     if (earliest_tc_index == -1) {
     530            0 :       earliest_tc_time = pending_td.contributing_tcs[i].time_candidate;
     531            0 :       earliest_tc_index = i;
     532              :     } else {
     533            0 :       if (pending_td.contributing_tcs[i].time_candidate < earliest_tc_time) {
     534            0 :         earliest_tc_time = pending_td.contributing_tcs[i].time_candidate;
     535            0 :         earliest_tc_index = i;
     536              :       }
     537              :     }
     538              :   }
     539            0 :   return earliest_tc_index;
     540              : }
     541              : 
     542            0 : bool TCProcessor::check_td_readout_length(const PendingTD& pending_td)
     543              : {
     544            0 :   bool td_too_long = false;
     545            0 :   if (static_cast<int64_t>(pending_td.readout_end - pending_td.readout_start) >= m_td_readout_limit) {
     546            0 :     td_too_long = true;
     547            0 :     TLOG_DEBUG(3) << "Too long readout window: " << (pending_td.readout_end - pending_td.readout_start)
     548            0 :                   << ", sending immediate TD!";
     549              :   }
     550            0 :   return td_too_long;
     551              : }
     552              : 
     553              : void
     554            0 : TCProcessor::clear_td_vectors()
     555              : {
     556            0 :   std::lock_guard<std::mutex> lock(m_td_vector_mutex);
     557            0 :   m_tds_cleared_count += m_pending_tds.size();
     558              :   // Use std::accumulate to sum up the sizes of all contributing_tcs vectors
     559            0 :   size_t tds_cleared_tc_count = std::accumulate(
     560              :     m_pending_tds.begin(), m_pending_tds.end(), 0,
     561            0 :     [](size_t sum, const PendingTD& ptd) {
     562            0 :       return sum + ptd.contributing_tcs.size();
     563              :     }
     564            0 :   );
     565            0 :   m_tds_cleared_tc_count += tds_cleared_tc_count;
     566            0 :   m_pending_tds.clear();
     567            0 : }
     568              : bool
     569            0 : TCProcessor::check_trigger_type_ignore(unsigned int tc_type)
     570              : {
     571            0 :   bool ignore = false;
     572            0 :   for (std::vector<unsigned int>::iterator it = m_ignored_tc_types.begin(); it != m_ignored_tc_types.end();) {
     573            0 :     if (tc_type == *it) {
     574              :       ignore = true;
     575              :       break;
     576              :     }
     577            0 :     ++it;
     578              :   }
     579            0 :   return ignore;
     580              : }
     581              : 
     582              : void
     583            0 : TCProcessor::print_trigger_bitwords()
     584              : {
     585            0 :   TLOG_DEBUG(3) << "Configured trigger words:";
     586            0 :   for (const auto& bitword : m_trigger_bitwords) {
     587            0 :     TLOG_DEBUG(3) << bitword;
     588              :   }
     589            0 : }
     590              : 
     591              : bool
     592            0 : TCProcessor::check_trigger_bitwords(const TDBitset& td_bitword) const
     593              : {
     594            0 :   bool trigger_check = false;
     595            0 :   for (const auto& bitword : m_trigger_bitwords) {
     596            0 :     TLOG_DEBUG(15) << "TD word: " << td_bitword << ", bitword: " << bitword;
     597            0 :     trigger_check = ((td_bitword & bitword) == bitword);
     598            0 :     TLOG_DEBUG(15) << "&: " << (td_bitword & bitword);
     599            0 :     TLOG_DEBUG(15) << "trigger?: " << trigger_check;
     600            0 :     if (trigger_check == true)
     601              :       break;
     602              :   }
     603            0 :   return trigger_check;
     604              : }
     605              : 
     606              : void
     607            0 : TCProcessor::set_trigger_bitwords(const std::vector<const appmodel::TriggerBitword*>& _bitwords)
     608              : {
     609            0 :   for (const appmodel::TriggerBitword* bitword : _bitwords) {
     610            0 :     TDBitset temp_bitword;
     611              : 
     612            0 :     for (const std::string& tctype_str: bitword->get_bitword()) {
     613            0 :       TCType tc_type = static_cast<TCType>(dunedaq::trgdataformats::string_to_trigger_candidate_type(tctype_str));
     614              : 
     615            0 :       if (tc_type == TCType::kUnknown) {
     616            0 :         throw(InvalidConfiguration(ERS_HERE, "Provided an unknown/non-existent TC type as a trigger bitword!"));
     617              :       }
     618              : 
     619            0 :       temp_bitword.set(static_cast<uint64_t>(tc_type));
     620              :     }
     621              : 
     622            0 :     m_trigger_bitwords.push_back(temp_bitword);
     623              :   }
     624            0 : }
     625              : 
     626              : void
     627            0 : TCProcessor::parse_readout_map(const std::vector<const appmodel::TCReadoutMap*>& data)
     628              : {
     629            0 :   for (auto readout_type : data) {
     630            0 :     TCType tc_type = static_cast<TCType>(
     631            0 :       dunedaq::trgdataformats::string_to_trigger_candidate_type(readout_type->get_tc_type_name()));
     632              : 
     633              :       // Throw error if unknown TC type
     634            0 :       if (tc_type == TCType::kUnknown) {
     635            0 :         throw(InvalidConfiguration(ERS_HERE, "Provided an unknown TC type in the TCReadoutMap for the TCProcessor"));
     636              :       }
     637              : 
     638            0 :     m_readout_window_map[tc_type] = {
     639            0 :       readout_type->get_time_before(), readout_type->get_time_after()
     640            0 :     };
     641              :   }
     642            0 :   return;
     643              : }
     644              : void
     645            0 : TCProcessor::print_readout_map(std::map<TCType,
     646              :                                         std::pair<triggeralgs::timestamp_t, triggeralgs::timestamp_t>> map)
     647              : {
     648            0 :   TLOG_DEBUG(3) << "MLT TD Readout map:";
     649            0 :   for (auto const& [key, val] : map) {
     650            0 :     TLOG_DEBUG(3) << "Type: " << static_cast<int>(key) << ", before: " << val.first << ", after: " << val.second;
     651              :   }
     652            0 :   return;
     653              : }
     654              : 
     655              : void
     656            0 : TCProcessor::parse_group_links(const nlohmann::json& data)
     657              : {
     658            0 :   for (auto group : data) {
     659            0 :     const nlohmann::json& temp_links_data = group["links"];
     660            0 :     std::vector<dfmessages::SourceID> temp_links;
     661            0 :     for (auto link : temp_links_data) {
     662            0 :       temp_links.push_back(
     663            0 :         dfmessages::SourceID{ daqdataformats::SourceID::string_to_subsystem(link["subsystem"]), link["element"] });
     664            0 :     }
     665            0 :     m_group_links.insert({ group["group"], temp_links });
     666            0 :   }
     667            0 :   return;
     668              : }
     669              : 
     670              : void
     671            0 : TCProcessor::print_group_links()
     672              : {
     673            0 :   TLOG_DEBUG(3) << "MLT Group Links:";
     674            0 :   for (auto const& [key, val] : m_group_links) {
     675            0 :     TLOG_DEBUG(3) << "Group: " << key;
     676            0 :     for (auto const& link : val) {
     677            0 :       TLOG_DEBUG(3) << link;
     678              :     }
     679              :   }
     680            0 :   TLOG_DEBUG(3) << " ";
     681            0 :   return;
     682              : }
     683              : dfmessages::ComponentRequest
     684            0 : TCProcessor::create_request_for_link(dfmessages::SourceID link,
     685              :                                             triggeralgs::timestamp_t start,
     686              :                                             triggeralgs::timestamp_t end)
     687              : {
     688            0 :   dfmessages::ComponentRequest request;
     689            0 :   request.component = link;
     690            0 :   request.window_begin = start;
     691            0 :   request.window_end = end;
     692              : 
     693            0 :   TLOG_DEBUG(10) << "setting request start: " << request.window_begin;
     694            0 :   TLOG_DEBUG(10) << "setting request end: " << request.window_end;
     695              : 
     696            0 :   return request;
     697              : }
     698              : 
     699              : std::vector<dfmessages::ComponentRequest>
     700            0 : TCProcessor::create_all_decision_requests(std::vector<dfmessages::SourceID> links,
     701              :                                                  triggeralgs::timestamp_t start,
     702              :                                                  triggeralgs::timestamp_t end)
     703              : {
     704            0 :   std::vector<dfmessages::ComponentRequest> requests;
     705            0 :   for (auto link : links) {
     706            0 :     requests.push_back(create_request_for_link(link, start, end));
     707              :   }
     708            0 :   return requests;
     709            0 : }
     710              : 
     711              : void
     712            0 : TCProcessor::add_requests_to_decision(dfmessages::TriggerDecision& decision,
     713              :                                              std::vector<dfmessages::ComponentRequest> requests)
     714              : {
     715            0 :   for (auto request : requests) {
     716            0 :     decision.components.push_back(request);
     717              :   }
     718            0 : }
     719              : 
     720              : void
     721            0 : TCProcessor::parse_roi_conf(const std::vector<const appmodel::ROIGroupConf*>& data)
     722              : {
     723            0 :   int counter = 0;
     724            0 :   float run_sum = 0;
     725            0 :   for (auto group : data) {
     726            0 :     roi_group temp_roi_group;
     727            0 :     temp_roi_group.n_links = group->get_number_of_link_groups();
     728            0 :     temp_roi_group.prob         = group->get_probability();
     729            0 :     temp_roi_group.time_window  = group->get_time_window();
     730            0 :     temp_roi_group.mode         = group->get_groups_selection_mode();
     731            0 :     m_roi_conf.insert({ counter, temp_roi_group });
     732            0 :     m_roi_conf_ids.push_back(counter);
     733            0 :     m_roi_conf_probs.push_back(group->get_probability());
     734            0 :     run_sum += static_cast<float>(group->get_probability());
     735            0 :     m_roi_conf_probs_c.push_back(run_sum);
     736            0 :     counter++;
     737            0 :   }
     738            0 :   return;
     739              : }
     740              : 
     741              : void
     742            0 : TCProcessor::print_roi_conf(std::map<int, roi_group> roi_conf)
     743              : {
     744            0 :   TLOG_DEBUG(3) << "ROI CONF";
     745            0 :   for (const auto& [key, value] : roi_conf) {
     746            0 :     TLOG_DEBUG(3) << "ID: " << key;
     747            0 :     TLOG_DEBUG(3) << "n links: " << value.n_links;
     748            0 :     TLOG_DEBUG(3) << "prob: " << value.prob;
     749            0 :     TLOG_DEBUG(3) << "time: " << value.time_window;
     750            0 :     TLOG_DEBUG(3) << "mode: " << value.mode;
     751              :   }
     752            0 :   TLOG_DEBUG(3) << " ";
     753            0 :   return;
     754              : }
     755              : 
     756              : float
     757            0 : TCProcessor::get_random_num_float(float limit)
     758              : {
     759            0 :   float rnd = (double)rand() / RAND_MAX;
     760            0 :   return rnd * (limit);
     761              : }
     762              : 
     763              : int
     764            0 : TCProcessor::pick_roi_group_conf()
     765              : {
     766            0 :   float rnd_num = get_random_num_float(m_roi_conf_probs_c.back());
     767            0 :   for (int i = 0; i < static_cast<int>(m_roi_conf_probs_c.size()); i++) {
     768            0 :     if (rnd_num < m_roi_conf_probs_c[i]) {
     769            0 :       return i;
     770              :     }
     771              :   }
     772              :   return -1;
     773              : }
     774              : 
     775              : int
     776            0 : TCProcessor::get_random_num_int()
     777              : {
     778            0 :   int range = m_total_group_links;
     779            0 :   int rnd = rand() % range;
     780            0 :   return rnd;
     781              : }
     782              : void
     783            0 : TCProcessor::roi_readout_make_requests(dfmessages::TriggerDecision& decision)
     784              : {
     785              :   // Get configuration at random (weighted)
     786            0 :   int group_pick = pick_roi_group_conf();
     787            0 :   if (group_pick != -1) {
     788            0 :     roi_group this_group = m_roi_conf[m_roi_conf_ids[group_pick]];
     789            0 :     std::vector<dfmessages::SourceID> links;
     790              : 
     791              :     // If mode is random, pick groups to request at random
     792            0 :     if (this_group.mode == "kRandom") {
     793            0 :       TLOG_DEBUG(10) << "RAND";
     794            0 :       std::set<int> groups;
     795            0 :       while (static_cast<int>(groups.size()) < this_group.n_links) {
     796            0 :         groups.insert(get_random_num_int());
     797              :       }
     798            0 :       for (auto r_id : groups) {
     799            0 :         links.insert(links.end(), m_group_links[r_id].begin(), m_group_links[r_id].end());
     800              :       }
     801              :       // Otherwise, read sequntially by IDs, starting at 0
     802            0 :     } else {
     803            0 :       TLOG_DEBUG(10) << "SEQ";
     804            0 :       int r_id = 0;
     805            0 :       while (r_id < this_group.n_links) {
     806            0 :         links.insert(links.end(), m_group_links[r_id].begin(), m_group_links[r_id].end());
     807            0 :         r_id++;
     808              :       }
     809              :     }
     810              : 
     811            0 :     TLOG_DEBUG(10) << "TD timestamp: " << decision.trigger_timestamp;
     812            0 :     TLOG_DEBUG(10) << "group window: " << this_group.time_window;
     813              : 
     814              :     // Once the components are prepared, create requests and append them to decision
     815            0 :     std::vector<dfmessages::ComponentRequest> requests =
     816              :       create_all_decision_requests(links, decision.trigger_timestamp - this_group.time_window,
     817            0 :                                    decision.trigger_timestamp + this_group.time_window);
     818            0 :     add_requests_to_decision(decision, requests);
     819            0 :     links.clear();
     820            0 :   }
     821            0 :   return;
     822              : }
     823              : 
     824              : TCProcessor::TDBitset
     825            0 : TCProcessor::get_TD_bitword(const PendingTD& ready_td) const
     826              : {
     827              :   // get only unique types
     828            0 :   std::vector<int> tc_types;
     829            0 :   for (auto tc : ready_td.contributing_tcs) {
     830            0 :     tc_types.push_back(static_cast<int>(tc.type));
     831            0 :   }
     832            0 :   tc_types.erase(std::unique(tc_types.begin(), tc_types.end()), tc_types.end());
     833              : 
     834              :   // form TD bitword
     835            0 :   TDBitset td_bitword;
     836            0 :   for (auto tc_type : tc_types) {
     837            0 :     td_bitword.set(tc_type);
     838              :   }
     839            0 :   return td_bitword;
     840            0 : }
     841              : 
     842              : void
     843            0 : TCProcessor::print_opmon_stats()
     844              : {
     845            0 :   TLOG() << "TCProcessor opmon counters summary:";
     846            0 :   TLOG() << "------------------------------";
     847            0 :   TLOG() << "TDs created: \t\t\t" << m_tds_created_count << " \t(" << m_tds_created_tc_count << " TCs)";
     848            0 :   TLOG() << "TDs sent: \t\t\t" << m_tds_sent_count << " \t(" << m_tds_sent_tc_count << " TCs)";
     849            0 :   TLOG() << "TDs dropped: \t\t\t" << m_tds_dropped_count << " \t(" << m_tds_dropped_tc_count << " TCs)";
     850            0 :   TLOG() << "TDs failed bitword check: \t" << m_tds_failed_bitword_count << " \t(" << m_tds_failed_bitword_tc_count << " TCs)";
     851            0 :   TLOG() << "TDs cleared: \t\t\t" << m_tds_cleared_count << " \t(" << m_tds_cleared_tc_count << " TCs)";
     852            0 :   TLOG() << "------------------------------";
     853            0 :   TLOG() << "TCs received: \t" << m_tc_received_count;
     854            0 :   TLOG() << "TCs ignored: \t" << m_tc_ignored_count;
     855            0 :   TLOG();
     856            0 : }
     857              : 
     858              : } // namespace fdreadoutlibs
     859              : } // namespace dunedaq
        

Generated by: LCOV version 2.0-1