Line data Source code
1 : /**
2 : * @file TriggerRecordBuilderData.cpp TriggerRecordBuilderData Class Implementation
3 : *
4 : * The TriggerRecordBuilderData class represents the current state of a dataflow application's Trigger Record buffers
5 : * for use by the DFO.
6 : *
7 : * This is part of the DUNE DAQ Application Framework, copyright 2020.
8 : * Licensing/copyright details are in the COPYING file that you should have
9 : * received with this code.
10 : */
11 :
12 : #include "dfmodules/TriggerRecordBuilderData.hpp"
13 : #include "dfmodules/opmon/TRBuilderData.pb.h"
14 :
15 : #include "logging/Logging.hpp"
16 :
17 : #include <limits>
18 : #include <memory>
19 : #include <string>
20 : #include <utility>
21 :
22 : /**
23 : * @brief Name used by TRACE TLOG calls from this source file
24 : */
25 : #define TRACE_NAME "TRBData" // NOLINT
26 :
27 : namespace dunedaq {
28 : namespace dfmodules {
29 :
30 3 : TriggerRecordBuilderData::TriggerRecordBuilderData(std::string connection_name, size_t busy_threshold)
31 3 : : m_busy_threshold(busy_threshold)
32 3 : , m_free_threshold(busy_threshold)
33 3 : , m_is_busy(false)
34 3 : , m_in_error(false)
35 9 : , m_connection_name(connection_name)
36 3 : {}
37 :
38 3 : TriggerRecordBuilderData::TriggerRecordBuilderData(std::string connection_name,
39 : size_t busy_threshold,
40 3 : size_t free_threshold)
41 3 : : m_busy_threshold(busy_threshold)
42 3 : , m_free_threshold(busy_threshold)
43 3 : , m_is_busy(false)
44 3 : , m_in_error(false)
45 9 : , m_connection_name(connection_name)
46 : {
47 3 : if (busy_threshold < free_threshold)
48 1 : throw dfmodules::DFOThresholdsNotConsistent(ERS_HERE, busy_threshold, free_threshold);
49 7 : }
50 :
51 : std::shared_ptr<AssignedTriggerDecision>
52 8 : TriggerRecordBuilderData::extract_assignment(daqdataformats::trigger_number_t trigger_number)
53 : {
54 8 : std::shared_ptr<AssignedTriggerDecision> dec_ptr;
55 8 : auto lk = std::lock_guard<std::mutex>(m_assigned_trigger_decisions_mutex);
56 10 : for (auto it = m_assigned_trigger_decisions.begin(); it != m_assigned_trigger_decisions.end(); ++it) {
57 7 : if ((*it)->decision.trigger_number == trigger_number) {
58 5 : dec_ptr = *it;
59 5 : m_assigned_trigger_decisions.erase(it);
60 5 : break;
61 : }
62 : }
63 :
64 8 : if (m_assigned_trigger_decisions.size() < m_free_threshold.load())
65 6 : m_is_busy.store(false);
66 :
67 8 : return dec_ptr;
68 8 : }
69 :
70 : std::shared_ptr<AssignedTriggerDecision>
71 2 : TriggerRecordBuilderData::get_assignment(daqdataformats::trigger_number_t trigger_number) const
72 : {
73 2 : auto lk = std::lock_guard<std::mutex>(m_assigned_trigger_decisions_mutex);
74 2 : for (auto ptr : m_assigned_trigger_decisions) {
75 1 : if (ptr->decision.trigger_number == trigger_number) {
76 1 : return ptr;
77 : }
78 0 : }
79 :
80 1 : return nullptr;
81 2 : }
82 :
83 : std::shared_ptr<AssignedTriggerDecision>
84 6 : TriggerRecordBuilderData::complete_assignment(daqdataformats::trigger_number_t trigger_number,
85 : std::function<void(nlohmann::json&)> metadata_fun)
86 : {
87 :
88 6 : auto dec_ptr = extract_assignment(trigger_number);
89 :
90 6 : if (dec_ptr == nullptr)
91 2 : throw AssignedTriggerDecisionNotFound(ERS_HERE, trigger_number, m_connection_name);
92 :
93 4 : auto now = std::chrono::steady_clock::now();
94 4 : auto time = std::chrono::duration_cast<std::chrono::microseconds>(now - dec_ptr->assigned_time);
95 4 : {
96 4 : auto lk = std::lock_guard<std::mutex>(m_latency_info_mutex);
97 4 : m_latency_info.emplace_back(now, time);
98 :
99 4 : if (m_latency_info.size() > 1000)
100 0 : m_latency_info.pop_front();
101 4 : }
102 :
103 4 : if (metadata_fun)
104 1 : metadata_fun(m_metadata);
105 :
106 4 : ++m_complete_counter;
107 4 : auto completion_time =
108 4 : std::chrono::duration_cast<std::chrono::microseconds>(now - dec_ptr->assigned_time);
109 4 : if (completion_time.count() < m_min_complete_time.load())
110 2 : m_min_complete_time.store(completion_time.count());
111 4 : if (completion_time.count() > m_max_complete_time.load())
112 4 : m_max_complete_time.store(completion_time.count());
113 :
114 4 : opmon::TRCompleteInfo i;
115 4 : i.set_completion_time(completion_time.count());
116 4 : i.set_tr_number( dec_ptr->decision.trigger_number );
117 4 : i.set_run_number( dec_ptr->decision.run_number );
118 4 : i.set_trigger_type( dec_ptr->decision.trigger_type );
119 4 : publish( std::move(i), {}, opmonlib::to_level(opmonlib::EntryOpMonLevel::kEventDriven) );
120 :
121 4 : return dec_ptr;
122 6 : }
123 :
124 : std::list<std::shared_ptr<AssignedTriggerDecision>>
125 3 : TriggerRecordBuilderData::flush()
126 : {
127 :
128 3 : auto lk = std::lock_guard<std::mutex>(m_assigned_trigger_decisions_mutex);
129 3 : std::list<std::shared_ptr<AssignedTriggerDecision>> ret;
130 :
131 4 : for (const auto& td : m_assigned_trigger_decisions) {
132 1 : ret.push_back(td);
133 : }
134 3 : m_assigned_trigger_decisions.clear();
135 :
136 3 : auto stat_lock = std::lock_guard<std::mutex>(m_latency_info_mutex);
137 3 : m_latency_info.clear();
138 3 : m_is_busy = false;
139 :
140 3 : m_in_error = false;
141 3 : m_metadata = nlohmann::json();
142 :
143 3 : return ret;
144 3 : }
145 :
146 : std::shared_ptr<AssignedTriggerDecision>
147 9 : TriggerRecordBuilderData::make_assignment(dfmessages::TriggerDecision decision)
148 : {
149 9 : return std::make_shared<AssignedTriggerDecision>(decision, m_connection_name);
150 : }
151 :
152 : void
153 10 : TriggerRecordBuilderData::add_assignment(std::shared_ptr<AssignedTriggerDecision> assignment)
154 : {
155 10 : auto lk = std::lock_guard<std::mutex>(m_assigned_trigger_decisions_mutex);
156 :
157 10 : if (is_in_error())
158 1 : throw NoSlotsAvailable(ERS_HERE, assignment->decision.trigger_number, m_connection_name);
159 :
160 9 : m_assigned_trigger_decisions.push_back(assignment);
161 9 : TLOG_DEBUG(13) << "Size of assigned_trigger_decision list is " << m_assigned_trigger_decisions.size();
162 :
163 9 : if (m_assigned_trigger_decisions.size() >= m_busy_threshold.load()) {
164 4 : m_is_busy.store(true);
165 : }
166 10 : }
167 :
168 : void
169 4 : TriggerRecordBuilderData::generate_opmon_data()
170 : {
171 4 : metric_t info;
172 4 : info.set_min_time_since_assignment( std::numeric_limits<time_counter_t>::max() );
173 4 : info.set_max_time_since_assignment(0);
174 :
175 4 : time_counter_t time = 0;
176 :
177 4 : auto lk = std::unique_lock<std::mutex>(m_assigned_trigger_decisions_mutex);
178 4 : info.set_outstanding_decisions(m_assigned_trigger_decisions.size());
179 4 : auto current_time = std::chrono::steady_clock::now();
180 6 : for (const auto& dec_ptr : m_assigned_trigger_decisions) {
181 2 : auto us_since_assignment =
182 2 : std::chrono::duration_cast<std::chrono::microseconds>(current_time - dec_ptr->assigned_time);
183 2 : time += us_since_assignment.count();
184 2 : if (us_since_assignment.count() < info.min_time_since_assignment())
185 2 : info.set_min_time_since_assignment(us_since_assignment.count());
186 2 : if (us_since_assignment.count() > info.max_time_since_assignment())
187 1 : info.set_max_time_since_assignment(us_since_assignment.count());
188 : }
189 4 : lk.unlock();
190 :
191 4 : info.set_total_time_since_assignment(time);
192 :
193 : // estimate of the capcity
194 4 : auto completed_trigger_records = m_complete_counter.exchange(0);
195 4 : if ( completed_trigger_records > 0 ) {
196 1 : m_last_average_time = 1e-6*0.5*(m_min_complete_time.exchange(0) + m_max_complete_time.exchange(0)); // in seconds
197 : }
198 :
199 4 : if ( m_last_average_time > 0. ) {
200 : // prediction rate metrics
201 1 : info.set_capacity_rate( 0.5*(m_busy_threshold.load()+m_free_threshold.load())/m_last_average_time );
202 : }
203 :
204 4 : publish(std::move(info));
205 :
206 4 : }
207 :
208 : std::chrono::microseconds
209 1 : TriggerRecordBuilderData::average_latency(std::chrono::steady_clock::time_point since) const
210 : {
211 1 : auto lk = std::lock_guard<std::mutex>(m_latency_info_mutex);
212 1 : std::chrono::microseconds sum = std::chrono::microseconds(0);
213 1 : size_t count = 0;
214 2 : for (auto it = m_latency_info.rbegin(); it != m_latency_info.rend(); ++it) {
215 1 : if (it->first < since)
216 : break;
217 :
218 1 : count++;
219 1 : sum += it->second;
220 : }
221 :
222 1 : return sum / count;
223 1 : }
224 :
225 : } // namespace dfmodules
226 : } // namespace dunedaq
|