LCOV - code coverage report
Current view: top level - dfmodules/src - TriggerRecordBuilderData.cpp (source / functions) Coverage Total Hit
Test: code.result Lines: 98.4 % 125 123
Test Date: 2026-02-16 10:18:04 Functions: 100.0 % 11 11

            Line data    Source code
       1              : /**
       2              :  * @file TriggerRecordBuilderData.cpp TriggerRecordBuilderData Class Implementation
       3              :  *
       4              :  * The TriggerRecordBuilderData class represents the current state of a dataflow application's Trigger Record buffers
       5              :  * for use by the DFO.
       6              :  *
       7              :  * This is part of the DUNE DAQ Application Framework, copyright 2020.
       8              :  * Licensing/copyright details are in the COPYING file that you should have
       9              :  * received with this code.
      10              :  */
      11              : 
      12              : #include "dfmodules/TriggerRecordBuilderData.hpp"
      13              : #include "dfmodules/opmon/TRBuilderData.pb.h"
      14              : 
      15              : #include "logging/Logging.hpp"
      16              : 
      17              : #include <limits>
      18              : #include <memory>
      19              : #include <string>
      20              : #include <utility>
      21              : 
      22              : /**
      23              :  * @brief Name used by TRACE TLOG calls from this source file
      24              :  */
      25              : #define TRACE_NAME "TRBData" // NOLINT
      26              : 
      27              : namespace dunedaq {
      28              : namespace dfmodules {
      29              : 
      30            3 : TriggerRecordBuilderData::TriggerRecordBuilderData(std::string connection_name, size_t busy_threshold)
      31            3 :   : m_busy_threshold(busy_threshold)
      32            3 :   , m_free_threshold(busy_threshold)
      33            3 :   , m_is_busy(false)
      34            3 :   , m_in_error(false)
      35            9 :   , m_connection_name(connection_name)
      36            3 : {}
      37              : 
      38            3 : TriggerRecordBuilderData::TriggerRecordBuilderData(std::string connection_name,
      39              :                                                    size_t busy_threshold,
      40            3 :                                                    size_t free_threshold)
      41            3 :   : m_busy_threshold(busy_threshold)
      42            3 :   , m_free_threshold(busy_threshold)
      43            3 :   , m_is_busy(false)
      44            3 :   , m_in_error(false)
      45            9 :   , m_connection_name(connection_name)
      46              : {
      47            3 :   if (busy_threshold < free_threshold)
      48            1 :     throw dfmodules::DFOThresholdsNotConsistent(ERS_HERE, busy_threshold, free_threshold);
      49            7 : }
      50              : 
      51              : std::shared_ptr<AssignedTriggerDecision>
      52            8 : TriggerRecordBuilderData::extract_assignment(daqdataformats::trigger_number_t trigger_number)
      53              : {
      54            8 :   std::shared_ptr<AssignedTriggerDecision> dec_ptr;
      55            8 :   auto lk = std::lock_guard<std::mutex>(m_assigned_trigger_decisions_mutex);
      56           10 :   for (auto it = m_assigned_trigger_decisions.begin(); it != m_assigned_trigger_decisions.end(); ++it) {
      57            7 :     if ((*it)->decision.trigger_number == trigger_number) {
      58            5 :       dec_ptr = *it;
      59            5 :       m_assigned_trigger_decisions.erase(it);
      60            5 :       break;
      61              :     }
      62              :   }
      63              : 
      64            8 :   if (m_assigned_trigger_decisions.size() < m_free_threshold.load())
      65            6 :     m_is_busy.store(false);
      66              : 
      67            8 :   return dec_ptr;
      68            8 : }
      69              : 
      70              : std::shared_ptr<AssignedTriggerDecision>
      71            2 : TriggerRecordBuilderData::get_assignment(daqdataformats::trigger_number_t trigger_number) const
      72              : {
      73            2 :   auto lk = std::lock_guard<std::mutex>(m_assigned_trigger_decisions_mutex);
      74            2 :   for (auto ptr : m_assigned_trigger_decisions) {
      75            1 :     if (ptr->decision.trigger_number == trigger_number) {
      76            1 :       return ptr;
      77              :     }
      78            0 :   }
      79              : 
      80            1 :   return nullptr;
      81            2 : }
      82              : 
      83              : std::shared_ptr<AssignedTriggerDecision>
      84            6 : TriggerRecordBuilderData::complete_assignment(daqdataformats::trigger_number_t trigger_number,
      85              :                                               std::function<void(nlohmann::json&)> metadata_fun)
      86              : {
      87              : 
      88            6 :   auto dec_ptr = extract_assignment(trigger_number);
      89              : 
      90            6 :   if (dec_ptr == nullptr)
      91            2 :     throw AssignedTriggerDecisionNotFound(ERS_HERE, trigger_number, m_connection_name);
      92              : 
      93            4 :   auto now = std::chrono::steady_clock::now();
      94            4 :   auto time = std::chrono::duration_cast<std::chrono::microseconds>(now - dec_ptr->assigned_time);
      95            4 :   {
      96            4 :     auto lk = std::lock_guard<std::mutex>(m_latency_info_mutex);
      97            4 :     m_latency_info.emplace_back(now, time);
      98              : 
      99            4 :     if (m_latency_info.size() > 1000)
     100            0 :       m_latency_info.pop_front();
     101            4 :   }
     102              : 
     103            4 :   if (metadata_fun)
     104            1 :     metadata_fun(m_metadata);
     105              : 
     106            4 :   ++m_complete_counter;
     107            4 :   auto completion_time =
     108            4 :     std::chrono::duration_cast<std::chrono::microseconds>(now - dec_ptr->assigned_time);
     109            4 :   if (completion_time.count() < m_min_complete_time.load())
     110            2 :     m_min_complete_time.store(completion_time.count());
     111            4 :   if (completion_time.count() > m_max_complete_time.load())
     112            4 :     m_max_complete_time.store(completion_time.count());
     113              : 
     114            4 :   opmon::TRCompleteInfo i;
     115            4 :   i.set_completion_time(completion_time.count());
     116            4 :   i.set_tr_number( dec_ptr->decision.trigger_number );
     117            4 :   i.set_run_number( dec_ptr->decision.run_number );
     118            4 :   i.set_trigger_type( dec_ptr->decision.trigger_type );
     119            4 :   publish( std::move(i), {}, opmonlib::to_level(opmonlib::EntryOpMonLevel::kEventDriven) );
     120              :   
     121            4 :   return dec_ptr;
     122            6 : }
     123              : 
     124              : std::list<std::shared_ptr<AssignedTriggerDecision>>
     125            3 : TriggerRecordBuilderData::flush()
     126              : {
     127              : 
     128            3 :   auto lk = std::lock_guard<std::mutex>(m_assigned_trigger_decisions_mutex);
     129            3 :   std::list<std::shared_ptr<AssignedTriggerDecision>> ret;
     130              : 
     131            4 :   for (const auto& td : m_assigned_trigger_decisions) {
     132            1 :     ret.push_back(td);
     133              :   }
     134            3 :   m_assigned_trigger_decisions.clear();
     135              : 
     136            3 :   auto stat_lock = std::lock_guard<std::mutex>(m_latency_info_mutex);
     137            3 :   m_latency_info.clear();
     138            3 :   m_is_busy = false;
     139              : 
     140            3 :   m_in_error = false;
     141            3 :   m_metadata = nlohmann::json();
     142              : 
     143            3 :   return ret;
     144            3 : }
     145              : 
     146              : std::shared_ptr<AssignedTriggerDecision>
     147            9 : TriggerRecordBuilderData::make_assignment(dfmessages::TriggerDecision decision)
     148              : {
     149            9 :   return std::make_shared<AssignedTriggerDecision>(decision, m_connection_name);
     150              : }
     151              : 
     152              : void
     153           10 : TriggerRecordBuilderData::add_assignment(std::shared_ptr<AssignedTriggerDecision> assignment)
     154              : {
     155           10 :   auto lk = std::lock_guard<std::mutex>(m_assigned_trigger_decisions_mutex);
     156              : 
     157           10 :   if (is_in_error())
     158            1 :     throw NoSlotsAvailable(ERS_HERE, assignment->decision.trigger_number, m_connection_name);
     159              : 
     160            9 :   m_assigned_trigger_decisions.push_back(assignment);
     161            9 :   TLOG_DEBUG(13) << "Size of assigned_trigger_decision list is " << m_assigned_trigger_decisions.size();
     162              : 
     163            9 :   if (m_assigned_trigger_decisions.size() >= m_busy_threshold.load()) {
     164            4 :     m_is_busy.store(true);
     165              :   }
     166           10 : }
     167              : 
     168              : void
     169            4 : TriggerRecordBuilderData::generate_opmon_data() 
     170              : {
     171            4 :   metric_t info;
     172            4 :   info.set_min_time_since_assignment( std::numeric_limits<time_counter_t>::max() );
     173            4 :   info.set_max_time_since_assignment(0);
     174              : 
     175            4 :   time_counter_t time = 0;
     176              : 
     177            4 :   auto lk = std::unique_lock<std::mutex>(m_assigned_trigger_decisions_mutex);
     178            4 :   info.set_outstanding_decisions(m_assigned_trigger_decisions.size());
     179            4 :   auto current_time = std::chrono::steady_clock::now();
     180            6 :   for (const auto& dec_ptr : m_assigned_trigger_decisions) {
     181            2 :     auto us_since_assignment =
     182            2 :       std::chrono::duration_cast<std::chrono::microseconds>(current_time - dec_ptr->assigned_time);
     183            2 :     time += us_since_assignment.count();
     184            2 :     if (us_since_assignment.count() < info.min_time_since_assignment())
     185            2 :       info.set_min_time_since_assignment(us_since_assignment.count());
     186            2 :     if (us_since_assignment.count() > info.max_time_since_assignment())
     187            1 :       info.set_max_time_since_assignment(us_since_assignment.count());
     188              :   }
     189            4 :   lk.unlock();
     190              :   
     191            4 :   info.set_total_time_since_assignment(time);
     192              : 
     193              :   // estimate of the capcity
     194            4 :   auto completed_trigger_records = m_complete_counter.exchange(0);
     195            4 :   if ( completed_trigger_records > 0 ) {
     196            1 :     m_last_average_time = 1e-6*0.5*(m_min_complete_time.exchange(0) + m_max_complete_time.exchange(0)); // in seconds     
     197              :   }
     198              : 
     199            4 :   if ( m_last_average_time > 0. ) {
     200              :     // prediction rate metrics
     201            1 :     info.set_capacity_rate( 0.5*(m_busy_threshold.load()+m_free_threshold.load())/m_last_average_time );
     202              :   }
     203              :   
     204            4 :   publish(std::move(info));
     205              :   
     206            4 : }
     207              : 
     208              : std::chrono::microseconds
     209            1 : TriggerRecordBuilderData::average_latency(std::chrono::steady_clock::time_point since) const
     210              : {
     211            1 :   auto lk = std::lock_guard<std::mutex>(m_latency_info_mutex);
     212            1 :   std::chrono::microseconds sum = std::chrono::microseconds(0);
     213            1 :   size_t count = 0;
     214            2 :   for (auto it = m_latency_info.rbegin(); it != m_latency_info.rend(); ++it) {
     215            1 :     if (it->first < since)
     216              :       break;
     217              : 
     218            1 :     count++;
     219            1 :     sum += it->second;
     220              :   }
     221              : 
     222            1 :   return sum / count;
     223            1 : }
     224              : 
     225              : } // namespace dfmodules
     226              : } // namespace dunedaq
        

Generated by: LCOV version 2.0-1