25#define TRACE_NAME "TRBData"
30TriggerRecordBuilderData::TriggerRecordBuilderData(std::string connection_name,
size_t busy_threshold)
31 : m_busy_threshold(busy_threshold)
32 , m_free_threshold(busy_threshold)
35 , m_connection_name(connection_name)
38TriggerRecordBuilderData::TriggerRecordBuilderData(std::string connection_name,
39 size_t busy_threshold,
40 size_t free_threshold)
41 : m_busy_threshold(busy_threshold)
42 , m_free_threshold(busy_threshold)
45 , m_connection_name(connection_name)
47 if (busy_threshold < free_threshold)
48 throw dfmodules::DFOThresholdsNotConsistent(
ERS_HERE, busy_threshold, free_threshold);
51std::shared_ptr<AssignedTriggerDecision>
54 std::shared_ptr<AssignedTriggerDecision> dec_ptr;
55 auto lk = std::lock_guard<std::mutex>(m_assigned_trigger_decisions_mutex);
56 for (
auto it = m_assigned_trigger_decisions.begin(); it != m_assigned_trigger_decisions.end(); ++it) {
57 if ((*it)->decision.trigger_number == trigger_number) {
59 m_assigned_trigger_decisions.erase(it);
64 if (m_assigned_trigger_decisions.size() < m_free_threshold.load())
65 m_is_busy.store(
false);
70std::shared_ptr<AssignedTriggerDecision>
73 auto lk = std::lock_guard<std::mutex>(m_assigned_trigger_decisions_mutex);
74 for (
auto ptr : m_assigned_trigger_decisions) {
75 if (ptr->decision.trigger_number == trigger_number) {
83std::shared_ptr<AssignedTriggerDecision>
85 std::function<
void(nlohmann::json&)> metadata_fun)
88 auto dec_ptr = extract_assignment(trigger_number);
90 if (dec_ptr ==
nullptr)
93 auto now = std::chrono::steady_clock::now();
94 auto time = std::chrono::duration_cast<std::chrono::microseconds>(
now - dec_ptr->assigned_time);
96 auto lk = std::lock_guard<std::mutex>(m_latency_info_mutex);
97 m_latency_info.emplace_back(
now, time);
99 if (m_latency_info.size() > 1000)
100 m_latency_info.pop_front();
104 metadata_fun(m_metadata);
106 ++m_complete_counter;
107 auto completion_time =
108 std::chrono::duration_cast<std::chrono::microseconds>(
now - dec_ptr->assigned_time);
109 if (completion_time.count() < m_min_complete_time.load())
110 m_min_complete_time.store(completion_time.count());
111 if (completion_time.count() > m_max_complete_time.load())
112 m_max_complete_time.store(completion_time.count());
114 opmon::TRCompleteInfo i;
115 i.set_completion_time(completion_time.count());
116 i.set_tr_number( dec_ptr->decision.trigger_number );
117 i.set_run_number( dec_ptr->decision.run_number );
118 i.set_trigger_type( dec_ptr->decision.trigger_type );
124std::list<std::shared_ptr<AssignedTriggerDecision>>
125TriggerRecordBuilderData::flush()
128 auto lk = std::lock_guard<std::mutex>(m_assigned_trigger_decisions_mutex);
129 std::list<std::shared_ptr<AssignedTriggerDecision>> ret;
131 for (
const auto& td : m_assigned_trigger_decisions) {
134 m_assigned_trigger_decisions.clear();
136 auto stat_lock = std::lock_guard<std::mutex>(m_latency_info_mutex);
137 m_latency_info.clear();
141 m_metadata = nlohmann::json();
146std::shared_ptr<AssignedTriggerDecision>
147TriggerRecordBuilderData::make_assignment(dfmessages::TriggerDecision decision)
149 return std::make_shared<AssignedTriggerDecision>(decision, m_connection_name);
153TriggerRecordBuilderData::add_assignment(std::shared_ptr<AssignedTriggerDecision> assignment)
155 auto lk = std::lock_guard<std::mutex>(m_assigned_trigger_decisions_mutex);
158 throw NoSlotsAvailable(
ERS_HERE, assignment->decision.trigger_number, m_connection_name);
160 m_assigned_trigger_decisions.push_back(assignment);
161 TLOG_DEBUG(13) <<
"Size of assigned_trigger_decision list is " << m_assigned_trigger_decisions.size();
163 if (m_assigned_trigger_decisions.size() >= m_busy_threshold.load()) {
164 m_is_busy.store(
true);
169TriggerRecordBuilderData::generate_opmon_data()
172 info.set_min_time_since_assignment( std::numeric_limits<time_counter_t>::max() );
173 info.set_max_time_since_assignment(0);
175 time_counter_t time = 0;
177 auto lk = std::unique_lock<std::mutex>(m_assigned_trigger_decisions_mutex);
178 info.set_outstanding_decisions(m_assigned_trigger_decisions.size());
179 auto current_time = std::chrono::steady_clock::now();
180 for (
const auto& dec_ptr : m_assigned_trigger_decisions) {
181 auto us_since_assignment =
182 std::chrono::duration_cast<std::chrono::microseconds>(current_time - dec_ptr->assigned_time);
183 time += us_since_assignment.count();
184 if (us_since_assignment.count() < info.min_time_since_assignment())
185 info.set_min_time_since_assignment(us_since_assignment.count());
186 if (us_since_assignment.count() > info.max_time_since_assignment())
187 info.set_max_time_since_assignment(us_since_assignment.count());
191 info.set_total_time_since_assignment(time);
194 auto completed_trigger_records = m_complete_counter.exchange(0);
195 if ( completed_trigger_records > 0 ) {
196 m_last_average_time = 1e-6*0.5*(m_min_complete_time.exchange(0) + m_max_complete_time.exchange(0));
199 if ( m_last_average_time > 0. ) {
201 info.set_capacity_rate( 0.5*(m_busy_threshold.load()+m_free_threshold.load())/m_last_average_time );
204 publish(std::move(info));
208std::chrono::microseconds
209TriggerRecordBuilderData::average_latency(std::chrono::steady_clock::time_point since)
const
211 auto lk = std::lock_guard<std::mutex>(m_latency_info_mutex);
212 std::chrono::microseconds sum = std::chrono::microseconds(0);
214 for (
auto it = m_latency_info.rbegin(); it != m_latency_info.rend(); ++it) {
215 if (it->first < since)
#define TLOG_DEBUG(lvl,...)
constexpr auto to_level(T v)
AssignedTriggerDecisionNotFound