DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
TriggerRecordBuilderData.cpp
Go to the documentation of this file.
1
14
15#include "logging/Logging.hpp"
16
17#include <limits>
18#include <memory>
19#include <string>
20#include <utility>
21
25#define TRACE_NAME "TRBData" // NOLINT
26
27namespace dunedaq {
28namespace dfmodules {
29
30TriggerRecordBuilderData::TriggerRecordBuilderData(std::string connection_name, size_t busy_threshold)
31 : m_busy_threshold(busy_threshold)
32 , m_free_threshold(busy_threshold)
33 , m_is_busy(false)
34 , m_in_error(false)
35 , m_connection_name(connection_name)
36{}
37
38TriggerRecordBuilderData::TriggerRecordBuilderData(std::string connection_name,
39 size_t busy_threshold,
40 size_t free_threshold)
41 : m_busy_threshold(busy_threshold)
42 , m_free_threshold(busy_threshold)
43 , m_is_busy(false)
44 , m_in_error(false)
45 , m_connection_name(connection_name)
46{
47 if (busy_threshold < free_threshold)
48 throw dfmodules::DFOThresholdsNotConsistent(ERS_HERE, busy_threshold, free_threshold);
49}
50
51std::shared_ptr<AssignedTriggerDecision>
52TriggerRecordBuilderData::extract_assignment(daqdataformats::trigger_number_t trigger_number)
53{
54 std::shared_ptr<AssignedTriggerDecision> dec_ptr;
55 auto lk = std::lock_guard<std::mutex>(m_assigned_trigger_decisions_mutex);
56 for (auto it = m_assigned_trigger_decisions.begin(); it != m_assigned_trigger_decisions.end(); ++it) {
57 if ((*it)->decision.trigger_number == trigger_number) {
58 dec_ptr = *it;
59 m_assigned_trigger_decisions.erase(it);
60 break;
61 }
62 }
63
64 if (m_assigned_trigger_decisions.size() < m_free_threshold.load())
65 m_is_busy.store(false);
66
67 return dec_ptr;
68}
69
70std::shared_ptr<AssignedTriggerDecision>
71TriggerRecordBuilderData::get_assignment(daqdataformats::trigger_number_t trigger_number) const
72{
73 auto lk = std::lock_guard<std::mutex>(m_assigned_trigger_decisions_mutex);
74 for (auto ptr : m_assigned_trigger_decisions) {
75 if (ptr->decision.trigger_number == trigger_number) {
76 return ptr;
77 }
78 }
79
80 return nullptr;
81}
82
83std::shared_ptr<AssignedTriggerDecision>
84TriggerRecordBuilderData::complete_assignment(daqdataformats::trigger_number_t trigger_number,
85 std::function<void(nlohmann::json&)> metadata_fun)
86{
87
88 auto dec_ptr = extract_assignment(trigger_number);
89
90 if (dec_ptr == nullptr)
91 throw AssignedTriggerDecisionNotFound(ERS_HERE, trigger_number, m_connection_name);
92
93 auto now = std::chrono::steady_clock::now();
94 auto time = std::chrono::duration_cast<std::chrono::microseconds>(now - dec_ptr->assigned_time);
95 {
96 auto lk = std::lock_guard<std::mutex>(m_latency_info_mutex);
97 m_latency_info.emplace_back(now, time);
98
99 if (m_latency_info.size() > 1000)
100 m_latency_info.pop_front();
101 }
102
103 if (metadata_fun)
104 metadata_fun(m_metadata);
105
106 ++m_complete_counter;
107 auto completion_time =
108 std::chrono::duration_cast<std::chrono::microseconds>(now - dec_ptr->assigned_time);
109 if (completion_time.count() < m_min_complete_time.load())
110 m_min_complete_time.store(completion_time.count());
111 if (completion_time.count() > m_max_complete_time.load())
112 m_max_complete_time.store(completion_time.count());
113
114 opmon::TRCompleteInfo i;
115 i.set_completion_time(completion_time.count());
116 i.set_tr_number( dec_ptr->decision.trigger_number );
117 i.set_run_number( dec_ptr->decision.run_number );
118 i.set_trigger_type( dec_ptr->decision.trigger_type );
120
121 return dec_ptr;
122}
123
124std::list<std::shared_ptr<AssignedTriggerDecision>>
125TriggerRecordBuilderData::flush()
126{
127
128 auto lk = std::lock_guard<std::mutex>(m_assigned_trigger_decisions_mutex);
129 std::list<std::shared_ptr<AssignedTriggerDecision>> ret;
130
131 for (const auto& td : m_assigned_trigger_decisions) {
132 ret.push_back(td);
133 }
134 m_assigned_trigger_decisions.clear();
135
136 auto stat_lock = std::lock_guard<std::mutex>(m_latency_info_mutex);
137 m_latency_info.clear();
138 m_is_busy = false;
139
140 m_in_error = false;
141 m_metadata = nlohmann::json();
142
143 return ret;
144}
145
146std::shared_ptr<AssignedTriggerDecision>
147TriggerRecordBuilderData::make_assignment(dfmessages::TriggerDecision decision)
148{
149 return std::make_shared<AssignedTriggerDecision>(decision, m_connection_name);
150}
151
152void
153TriggerRecordBuilderData::add_assignment(std::shared_ptr<AssignedTriggerDecision> assignment)
154{
155 auto lk = std::lock_guard<std::mutex>(m_assigned_trigger_decisions_mutex);
156
157 if (is_in_error())
158 throw NoSlotsAvailable(ERS_HERE, assignment->decision.trigger_number, m_connection_name);
159
160 m_assigned_trigger_decisions.push_back(assignment);
161 TLOG_DEBUG(13) << "Size of assigned_trigger_decision list is " << m_assigned_trigger_decisions.size();
162
163 if (m_assigned_trigger_decisions.size() >= m_busy_threshold.load()) {
164 m_is_busy.store(true);
165 }
166}
167
168void
169TriggerRecordBuilderData::generate_opmon_data()
170{
171 metric_t info;
172 info.set_min_time_since_assignment( std::numeric_limits<time_counter_t>::max() );
173 info.set_max_time_since_assignment(0);
174
175 time_counter_t time = 0;
176
177 auto lk = std::unique_lock<std::mutex>(m_assigned_trigger_decisions_mutex);
178 info.set_outstanding_decisions(m_assigned_trigger_decisions.size());
179 auto current_time = std::chrono::steady_clock::now();
180 for (const auto& dec_ptr : m_assigned_trigger_decisions) {
181 auto us_since_assignment =
182 std::chrono::duration_cast<std::chrono::microseconds>(current_time - dec_ptr->assigned_time);
183 time += us_since_assignment.count();
184 if (us_since_assignment.count() < info.min_time_since_assignment())
185 info.set_min_time_since_assignment(us_since_assignment.count());
186 if (us_since_assignment.count() > info.max_time_since_assignment())
187 info.set_max_time_since_assignment(us_since_assignment.count());
188 }
189 lk.unlock();
190
191 info.set_total_time_since_assignment(time);
192
193 // estimate of the capcity
194 auto completed_trigger_records = m_complete_counter.exchange(0);
195 if ( completed_trigger_records > 0 ) {
196 m_last_average_time = 1e-6*0.5*(m_min_complete_time.exchange(0) + m_max_complete_time.exchange(0)); // in seconds
197 }
198
199 if ( m_last_average_time > 0. ) {
200 // prediction rate metrics
201 info.set_capacity_rate( 0.5*(m_busy_threshold.load()+m_free_threshold.load())/m_last_average_time );
202 }
203
204 publish(std::move(info));
205
206}
207
208std::chrono::microseconds
209TriggerRecordBuilderData::average_latency(std::chrono::steady_clock::time_point since) const
210{
211 auto lk = std::lock_guard<std::mutex>(m_latency_info_mutex);
212 std::chrono::microseconds sum = std::chrono::microseconds(0);
213 size_t count = 0;
214 for (auto it = m_latency_info.rbegin(); it != m_latency_info.rend(); ++it) {
215 if (it->first < since)
216 break;
217
218 count++;
219 sum += it->second;
220 }
221
222 return sum / count;
223}
224
225} // namespace dfmodules
226} // namespace dunedaq
#define ERS_HERE
static int64_t now()
#define TLOG_DEBUG(lvl,...)
Definition Logging.hpp:112
uint64_t trigger_number_t
Type used to represent trigger number.
Definition Types.hpp:24
constexpr auto to_level(T v)
Including Qt Headers.