Line data Source code
1 : /**
2 : * @file FragmentAggregatorModule.cpp FragmentAggregatorModule implementation
3 : *
4 : * This is part of the DUNE DAQ , copyright 2020.
5 : * Licensing/copyright details are in the COPYING file that you should have
6 : * received with this code.
7 : */
8 :
9 : #include "FragmentAggregatorModule.hpp"
10 : #include "dfmodules/CommonIssues.hpp"
11 : #include "dfmodules/opmon/FragmentAggregatorModule.pb.h"
12 :
13 : #include "appmodel/FragmentAggregatorModule.hpp"
14 : #include "appmodel/FragmentAggregatorConf.hpp"
15 : #include "confmodel/Connection.hpp"
16 : #include "confmodel/QueueWithSourceId.hpp"
17 : #include "daqdataformats/FragmentHeader.hpp"
18 : #include "dfmessages/Fragment_serialization.hpp"
19 : #include "logging/Logging.hpp"
20 :
21 : #include "iomanager/IOManager.hpp"
22 :
23 : #include <iostream>
24 : #include <string>
25 :
26 : namespace dunedaq {
27 : namespace dfmodules {
28 :
29 0 : FragmentAggregatorModule::FragmentAggregatorModule(const std::string& name)
30 : : DAQModule(name)
31 0 : , m_fragment_send_timeout(1000)
32 : {
33 0 : register_command("start", &FragmentAggregatorModule::do_start);
34 0 : register_command("stop_trigger_sources", &FragmentAggregatorModule::do_stop);
35 0 : }
36 :
37 : void
38 0 : FragmentAggregatorModule::init(std::shared_ptr<appfwk::ConfigurationManager> mcfg)
39 : {
40 0 : auto mdal = mcfg->get_dal<appmodel::FragmentAggregatorModule>(get_name());
41 0 : if (!mdal) {
42 0 : throw appfwk::CommandFailed(ERS_HERE, "init", get_name(), "Unable to retrieve configuration object");
43 : }
44 :
45 0 : auto inputs = mdal->get_inputs();
46 0 : for (auto con : mdal->get_inputs()) {
47 0 : if (con->get_data_type() == datatype_to_string<dfmessages::DataRequest>()) {
48 0 : m_data_req_input = con->UID();
49 : }
50 0 : if (con->get_data_type() == datatype_to_string<daqdataformats::Fragment>()) {
51 0 : m_fragment_input = con->UID();
52 : }
53 : }
54 :
55 0 : m_producer_conn_ids.clear();
56 0 : for (const auto cr : mdal->get_outputs()) {
57 0 : if (cr->get_data_type() == datatype_to_string<dfmessages::DataRequest>()) {
58 0 : auto qid = cr->cast<confmodel::QueueWithSourceId>();
59 0 : m_producer_conn_ids[qid->get_source_id()] = cr->UID();
60 : }
61 0 : if (cr->get_data_type() == datatype_to_string<std::unique_ptr<daqdataformats::Fragment>>()) {
62 0 : m_trb_conn_ids.push_back(cr->UID());
63 : }
64 : }
65 :
66 : // this is just to get the data request receiver registered early (before Start)
67 0 : auto iom = iomanager::IOManager::get();
68 0 : iom->get_receiver<dfmessages::DataRequest>(m_data_req_input);
69 :
70 0 : m_fragment_send_timeout = std::chrono::milliseconds(mdal->get_configuration()->get_fragment_send_timeout_ms());
71 0 : }
72 :
73 : void
74 0 : FragmentAggregatorModule::generate_opmon_data()
75 : {
76 0 : if (m_data_requests_processed > 0) {
77 0 : opmon::FragmentAggregatorTimeInfo dr_times;
78 0 : dr_times.set_min_us(m_data_requests_time_min_us.exchange(std::numeric_limits<uint64_t>::max()));
79 0 : dr_times.set_max_us(m_data_requests_time_max_us.exchange(0));
80 0 : dr_times.set_average_us(m_data_requests_time_average_us.exchange(0) / m_data_requests_processed);
81 0 : this->publish(std::move(dr_times), { { "data", "DataRequest" } });
82 0 : }
83 :
84 0 : if (m_fragments_processed > 0) {
85 0 : opmon::FragmentAggregatorTimeInfo frag_times;
86 0 : frag_times.set_min_us(m_fragments_time_min_us.exchange(std::numeric_limits<uint64_t>::max()));
87 0 : frag_times.set_max_us(m_fragments_time_max_us.exchange(0));
88 0 : frag_times.set_average_us(m_fragments_time_average_us.exchange(0) / m_fragments_processed);
89 0 : this->publish(std::move(frag_times), { { "data", "Fragment" } });
90 0 : }
91 :
92 0 : opmon::FADataRequestsCounterInfo dr_info;
93 0 : dr_info.set_data_requests_received(m_data_requests_received.exchange(0));
94 0 : dr_info.set_data_requests_processed(m_data_requests_processed.exchange(0));
95 0 : dr_info.set_data_requests_failed(m_data_requests_failed.load()); // the failed counters are meant NOT to reset
96 0 : this->publish(std::move(dr_info));
97 :
98 0 : opmon::FAFragmentsCounterInfo frag_info;
99 0 : frag_info.set_fragments_received(m_fragments_received.exchange(0));
100 0 : frag_info.set_fragments_processed(m_fragments_processed.exchange(0));
101 0 : frag_info.set_fragments_failed(m_fragments_failed.load());
102 0 : frag_info.set_fragments_empty(m_fragments_empty.exchange(0));
103 0 : frag_info.set_fragments_incomplete(m_fragments_incomplete.exchange(0));
104 0 : frag_info.set_fragments_invalid(m_fragments_invalid.exchange(0));
105 0 : this->publish(std::move(frag_info));
106 0 : }
107 :
108 : void
109 0 : FragmentAggregatorModule::do_start(const CommandData_t& /* args */)
110 : {
111 :
112 0 : m_data_requests_received.store(0);
113 0 : m_data_requests_processed.store(0);
114 0 : m_data_requests_failed.store(0);
115 0 : m_fragments_received.store(0);
116 0 : m_fragments_processed.store(0);
117 0 : m_fragments_failed.store(0);
118 0 : m_fragments_empty.store(0);
119 0 : m_fragments_incomplete.store(0);
120 0 : m_fragments_invalid.store(0);
121 0 : m_fragments_time_average_us.store(0);
122 0 : m_fragments_time_min_us.store(std::numeric_limits<uint64_t>::max());
123 0 : m_fragments_time_max_us.store(0);
124 0 : m_data_requests_time_average_us.store(0);
125 0 : m_data_requests_time_min_us.store(std::numeric_limits<uint64_t>::max());
126 0 : m_data_requests_time_max_us.store(0);
127 :
128 : // 19-Dec-2024, KAB: check that Fragment senders are ready to send. This is done so
129 : // that the IOManager infrastructure fetches the necessary connection details from
130 : // the ConnectivityService at 'start' time, instead of the first time that the sender
131 : // is used to send data. This avoids delays in the sending of the first fragment in
132 : // the first data-taking run in a DAQ session. Such delays can lead to undesirable
133 : // system behavior like trigger inhibits.
134 0 : auto iom = iomanager::IOManager::get();
135 0 : for (auto trb_conn : m_trb_conn_ids) {
136 0 : auto sender = iom->get_sender<std::unique_ptr<daqdataformats::Fragment>>(trb_conn);
137 0 : if (sender != nullptr) {
138 0 : bool is_ready = sender->is_ready_for_sending(std::chrono::milliseconds(100));
139 0 : TLOG_DEBUG(0) << "The Fragment sender for " << trb_conn << " " << (is_ready ? "is" : "is not") << " ready.";
140 : }
141 0 : }
142 0 : iom->add_callback<dfmessages::DataRequest>(
143 0 : m_data_req_input, std::bind(&FragmentAggregatorModule::process_data_request, this, std::placeholders::_1));
144 0 : iom->add_callback<std::unique_ptr<daqdataformats::Fragment>>(
145 0 : m_fragment_input, std::bind(&FragmentAggregatorModule::process_fragment, this, std::placeholders::_1));
146 0 : }
147 :
148 : void
149 0 : FragmentAggregatorModule::do_stop(const CommandData_t& /* args */)
150 : {
151 0 : auto iom = iomanager::IOManager::get();
152 0 : iom->remove_callback<dfmessages::DataRequest>(m_data_req_input);
153 0 : iom->remove_callback<std::unique_ptr<daqdataformats::Fragment>>(m_fragment_input);
154 0 : m_data_req_map.clear();
155 0 : }
156 :
157 : void
158 0 : FragmentAggregatorModule::process_data_request(dfmessages::DataRequest& data_request)
159 : {
160 :
161 0 : {
162 0 : std::scoped_lock lock(m_mutex);
163 :
164 0 : m_timestamp_before_dr = get_current_time_us();
165 0 : m_data_requests_received++;
166 :
167 0 : std::tuple<dfmessages::trigger_number_t, dfmessages::sequence_number_t, daqdataformats::SourceID> triplet = {
168 0 : data_request.trigger_number, data_request.sequence_number, data_request.request_information.component
169 0 : };
170 0 : m_data_req_map[triplet] = data_request.data_destination;
171 0 : }
172 : // Forward Data Request to the right DLH
173 0 : try {
174 : // std::string component_name = "inputReqToDLH-" + data_request.request_information.component.to_string();
175 0 : auto uid_elem = m_producer_conn_ids.find(data_request.request_information.component.id);
176 0 : if (uid_elem == m_producer_conn_ids.end()) {
177 0 : ers::error(dunedaq::dfmodules::DRSenderLookupFailed(ERS_HERE,
178 : data_request.request_information.component,
179 : data_request.run_number,
180 : data_request.trigger_number,
181 0 : data_request.sequence_number));
182 : } else {
183 0 : TLOG_DEBUG(30) << "Send data request to " << uid_elem->second;
184 0 : auto sender = get_iom_sender<dfmessages::DataRequest>(uid_elem->second);
185 0 : data_request.data_destination = m_fragment_input;
186 0 : sender->send(std::move(data_request), iomanager::Sender::s_no_block);
187 :
188 0 : m_data_requests_processed++;
189 0 : auto timestamp_total = get_current_time_us() - m_timestamp_before_dr;
190 0 : if (timestamp_total < m_data_requests_time_min_us) {
191 0 : m_data_requests_time_min_us = timestamp_total;
192 : }
193 0 : if (timestamp_total > m_data_requests_time_max_us) {
194 0 : m_data_requests_time_max_us = timestamp_total;
195 : }
196 0 : m_data_requests_time_average_us += timestamp_total;
197 0 : }
198 0 : } catch (const ers::Issue& excpt) {
199 0 : ers::error(dunedaq::dfmodules::DRSenderSendFailed(ERS_HERE,
200 : data_request.run_number,
201 : data_request.trigger_number,
202 0 : data_request.sequence_number,
203 : data_request.request_information.component));
204 0 : m_data_requests_failed++;
205 0 : }
206 0 : }
207 :
208 : void
209 0 : FragmentAggregatorModule::process_fragment(std::unique_ptr<daqdataformats::Fragment>& fragment)
210 : {
211 : // Forward Fragment to the right TRB
212 0 : std::string trb_identifier;
213 0 : {
214 0 : std::scoped_lock lock(m_mutex);
215 :
216 0 : m_timestamp_before_frag = get_current_time_us();
217 0 : m_fragments_received++;
218 :
219 0 : std::bitset<32> error_bits = fragment->get_error_bits();
220 0 : if (error_bits[static_cast<size_t>(dunedaq::daqdataformats::FragmentErrorBits::kDataNotFound)])
221 0 : m_fragments_empty++;
222 0 : if (error_bits[static_cast<size_t>(dunedaq::daqdataformats::FragmentErrorBits::kIncomplete)])
223 0 : m_fragments_incomplete++;
224 0 : if (error_bits[static_cast<size_t>(dunedaq::daqdataformats::FragmentErrorBits::kInvalidWindow)])
225 0 : m_fragments_invalid++;
226 :
227 0 : auto dr_iter = m_data_req_map.find(
228 0 : std::make_tuple<dfmessages::trigger_number_t, dfmessages::sequence_number_t, daqdataformats::SourceID>(
229 0 : fragment->get_trigger_number(), fragment->get_sequence_number(), fragment->get_element_id()));
230 0 : if (dr_iter != m_data_req_map.end()) {
231 0 : trb_identifier = dr_iter->second;
232 0 : m_data_req_map.erase(dr_iter);
233 : } else {
234 0 : ers::error(UnknownFragmentDestination(
235 0 : ERS_HERE, fragment->get_trigger_number(), fragment->get_sequence_number(), fragment->get_element_id()));
236 0 : return;
237 : }
238 0 : }
239 0 : try {
240 0 : TLOG_DEBUG(27) << get_name() << " Sending fragment for trigger/sequence_number " << fragment->get_trigger_number()
241 0 : << "." << fragment->get_sequence_number() << " and SourceID " << fragment->get_element_id() << " to "
242 0 : << trb_identifier;
243 0 : auto sender = get_iom_sender<std::unique_ptr<daqdataformats::Fragment>>(trb_identifier);
244 0 : sender->send(std::move(fragment), m_fragment_send_timeout);
245 :
246 0 : m_fragments_processed++;
247 0 : auto timestamp_total = get_current_time_us() - m_timestamp_before_frag;
248 0 : if (timestamp_total < m_fragments_time_min_us) {
249 0 : m_fragments_time_min_us = timestamp_total;
250 : }
251 0 : if (timestamp_total > m_fragments_time_max_us) {
252 0 : m_fragments_time_max_us = timestamp_total;
253 : }
254 0 : m_fragments_time_average_us += timestamp_total;
255 :
256 0 : } catch (const ers::Issue& excpt) {
257 0 : ers::error(AbandonedFragment(ERS_HERE,
258 : fragment->get_run_number(),
259 : fragment->get_trigger_number(),
260 0 : fragment->get_sequence_number(),
261 : fragment->get_element_id(),
262 : excpt));
263 0 : m_fragments_failed++;
264 0 : }
265 0 : }
266 :
267 : uint64_t
268 0 : FragmentAggregatorModule::get_current_time_us()
269 : {
270 0 : return std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::steady_clock::now().time_since_epoch())
271 0 : .count();
272 : }
273 :
274 : } // namespace dfmodules
275 : } // namespace dunedaq
276 :
277 0 : DEFINE_DUNE_DAQ_MODULE(dunedaq::dfmodules::FragmentAggregatorModule)
|