Line data Source code
1 : /**
2 : * @file TPReplayModule.cpp
3 : *
4 : * This is part of the DUNE DAQ Application Framework, copyright 2020.
5 : * Licensing/copyright details are in the COPYING file that you should have
6 : * received with this code.
7 : */
8 :
9 : #include "TPReplayModule.hpp"
10 :
11 : #include "trigger/Issues.hpp" // For TLVL_*
12 : #include "trigger/TriggerPrimitiveTypeAdapter.hpp"
13 :
14 : #include "iomanager/IOManager.hpp"
15 : #include "logging/Logging.hpp"
16 : #include "rcif/cmd/Nljs.hpp"
17 :
18 : #include <algorithm>
19 : #include <chrono>
20 : #include <fstream>
21 : #include <limits>
22 : #include <memory>
23 : #include <string>
24 : #include <thread>
25 : #include <vector>
26 :
27 : using namespace triggeralgs;
28 :
29 : namespace dunedaq::trigger {
30 :
31 0 : TPReplayModule::TPReplayModule(const std::string& name)
32 : : DAQModule(name)
33 0 : , m_queue_timeout(100)
34 : {
35 : // clang-format off
36 0 : register_command("conf", &TPReplayModule::do_configure);
37 0 : register_command("start", &TPReplayModule::do_start);
38 0 : register_command("stop_trigger_sources", &TPReplayModule::do_stop);
39 0 : register_command("scrap", &TPReplayModule::do_scrap);
40 : // clang-format on
41 0 : }
42 :
43 : void
44 0 : TPReplayModule::init(std::shared_ptr<appfwk::ConfigurationManager> mcfg)
45 : {
46 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering init() method";
47 :
48 : // ### Access configuration
49 0 : m_mtrg = mcfg->get_dal<appmodel::TPReplayModule>(get_name());
50 :
51 : // ### Extract relevant objects
52 : // Clock speed
53 0 : m_clocks_per_us = mcfg->get_session()->get_detector_configuration()->get_clock_speed_hz() /
54 : double(1'000'000.0); // this is redundant but safer...
55 :
56 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting init() method";
57 0 : }
58 :
59 : void
60 0 : TPReplayModule::do_configure(const CommandData_t& /*obj*/)
61 : {
62 :
63 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering conf() method";
64 :
65 0 : m_conf = m_mtrg->get_configuration();
66 0 : if (!m_conf) {
67 0 : throw ReplayConfigurationProblem(ERS_HERE, get_name(), "Missing configuration!");
68 : }
69 :
70 : // Channel map
71 0 : m_channel_map_name = m_conf->get_channel_map();
72 0 : if (m_channel_map_name.empty()) {
73 0 : throw ReplayConfigurationProblem(ERS_HERE, get_name(), "No Channel map provided!");
74 : }
75 :
76 : // Valid Subdetectors (for now)
77 0 : m_validSubdetectors = { detdataformats::DetID::Subdetector::kHD_TPC,
78 : detdataformats::DetID::Subdetector::kVD_BottomTPC,
79 : detdataformats::DetID::Subdetector::kVD_TopTPC,
80 0 : detdataformats::DetID::Subdetector::kNDLAr_TPC };
81 :
82 0 : TLOG() << "### REPLAY CONFIGURATION ###";
83 0 : TLOG() << "Will use channel map: " << m_channel_map_name;
84 0 : try {
85 0 : m_channel_map = dunedaq::detchannelmaps::make_tpc_map(m_channel_map_name);
86 0 : } catch (const detchannelmaps::ChannelMapCreationFailed& e) {
87 0 : ers::error(dunedaq::trigger::ReplayChannelMapProblem(ERS_HERE, get_name(), m_channel_map_name));
88 0 : }
89 :
90 : // Loops
91 0 : m_loops = m_conf->get_number_of_loops();
92 :
93 : // Plane filtering
94 0 : m_filter_planes_ids = std::set<int>(m_conf->get_filter_out_plane().begin(), m_conf->get_filter_out_plane().end());
95 0 : m_filter_planes = (m_filter_planes_ids.size() > 0) ? true : false;
96 :
97 0 : TLOG() << "Plane filtering: " << m_filter_planes;
98 0 : if (m_filter_planes) {
99 0 : TLOG() << "Planes to filter: ";
100 0 : for (auto plane : m_filter_planes_ids) {
101 0 : TLOG() << plane;
102 : }
103 : }
104 :
105 : // For each of the files that are specified in the config, we extract data and sort them.
106 : // Data is sorted by ROU -> Plane (if not filtered). Multiple files can contribute to each.
107 : // Then we create an outgoing sink for each unique ROU + plane combination.
108 : // We also keep track of the total timestamp range of all the streams, so we can keep
109 : // the timestamps of the multiple streams in sync when replaying,
110 : // even when they don't all start or end at the same time.
111 :
112 : // Output queues
113 0 : auto con = m_mtrg->get_outputs();
114 :
115 : // Global times
116 0 : m_earliest_first_tp_timestamp = std::numeric_limits<triggeralgs::timestamp_t>::max();
117 0 : m_latest_last_tp_timestamp = 0;
118 :
119 : // Loading sorted TP stream files
120 0 : for (auto& stream : m_conf->get_tp_streams()) {
121 0 : auto result = m_tpstream_files.insert(std::make_pair(stream->get_index(), stream->get_filename()));
122 0 : if (!result.second) {
123 0 : ers::error(dunedaq::trigger::ReplayStreamFileError(
124 0 : ERS_HERE, get_name(), stream->get_index(), stream->get_filename(), result.first->second));
125 : }
126 : }
127 :
128 : // Print grouped files
129 0 : TLOG() << "Files to use:";
130 0 : for (const auto& pair : m_tpstream_files) {
131 0 : std::cout << "Index: " << pair.first << ", Filename: " << pair.second << std::endl;
132 : }
133 :
134 : // Load data here
135 0 : m_all_tp_data = read_tps(m_tpstream_files);
136 0 : if (m_tpstream_files.size() == 0 || m_all_tp_data.size() == 0) {
137 0 : ers::error(dunedaq::trigger::ReplayNoValidFiles(ERS_HERE, get_name()));
138 : }
139 :
140 : // Data loaded and sorted.
141 : // Now we create streams.
142 0 : int global_iter = 0;
143 : // Loop over ROUs
144 0 : for (auto& [ROU, plane_map] : m_all_tp_data) {
145 0 : int plane_iter = 0;
146 : // Loop over Planes
147 0 : for (auto& [plane, vector_of_tps] : plane_map) {
148 :
149 0 : TLOG_DEBUG(1) << "Stream: " << (global_iter + plane_iter) << "; ROU: " << ROU << "; plane: " << plane
150 0 : << "; TP sink is " << con[global_iter + plane_iter]->class_name() << "@"
151 0 : << con[global_iter + plane_iter]->UID();
152 :
153 0 : TPStream this_stream{ get_iom_sender<std::vector<trigger::TriggerPrimitiveTypeAdapter>>(
154 0 : con[global_iter + plane_iter]->UID()),
155 0 : vector_of_tps };
156 :
157 0 : m_earliest_first_tp_timestamp =
158 0 : std::min(m_earliest_first_tp_timestamp, this_stream.tpvs.front().front().tp.time_start);
159 0 : m_latest_last_tp_timestamp = std::max(m_latest_last_tp_timestamp, this_stream.tpvs.back().back().tp.time_start);
160 :
161 0 : m_tp_streams.push_back(std::move(this_stream));
162 0 : plane_iter++;
163 0 : }
164 0 : global_iter += plane_iter;
165 : }
166 :
167 0 : TLOG() << "Total of " << m_tp_streams.size() << " TP streams.";
168 :
169 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting conf() method";
170 0 : }
171 :
172 : void
173 0 : TPReplayModule::do_start(const CommandData_t& /*obj*/)
174 : {
175 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering start() method";
176 :
177 0 : m_running_flag.store(true);
178 :
179 : // Reset opmon
180 0 : m_tp_made_count.store(0);
181 0 : m_tpv_made_count.store(0);
182 0 : m_tpv_failed_sent_count.store(0);
183 :
184 : // We need the wall-clock time at which we'll send out the TPs
185 : // with the earliest timestamp, so we can keep all of the output
186 : // streams in sync. We pick "now" plus a bit, to allow time for all
187 : // of the threads to start up
188 0 : auto earliest_timestamp_time = std::chrono::steady_clock::now() + std::chrono::milliseconds(100);
189 0 : m_run_start_time = std::chrono::steady_clock::now();
190 :
191 : // Start threads for each stream
192 0 : for (auto& stream : m_tp_streams) {
193 0 : m_threads.push_back(std::make_unique<std::thread>(&TPReplayModule::do_work,
194 0 : this,
195 0 : std::ref(m_running_flag),
196 0 : std::ref(stream.tpvs),
197 0 : std::ref(stream.tp_sink),
198 : earliest_timestamp_time));
199 : }
200 0 : for (size_t i = 0; i < m_threads.size(); i++) {
201 0 : std::string name("tpreplay-");
202 0 : name += std::to_string(i);
203 0 : pthread_setname_np(m_threads[i]->native_handle(), name.c_str());
204 0 : }
205 0 : TLOG() << "Total of " << m_threads.size() << " replay threads.";
206 :
207 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting start() method";
208 0 : }
209 :
210 : void
211 0 : TPReplayModule::do_stop(const CommandData_t& /*args*/)
212 : {
213 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering stop() method";
214 :
215 0 : m_running_flag.store(false);
216 0 : for (auto& thr : m_threads) {
217 0 : if (thr != nullptr && thr->joinable()) {
218 0 : thr->join();
219 : }
220 : }
221 0 : m_threads.clear();
222 :
223 0 : auto run_end_time = std::chrono::steady_clock::now();
224 0 : auto time_ms = std::chrono::duration_cast<std::chrono::milliseconds>(run_end_time - m_run_start_time).count();
225 0 : float rate_hz = 1e3 * static_cast<float>(m_tpv_made_count) / time_ms;
226 :
227 0 : TLOG() << "### SUMMARY ###";
228 0 : TLOG() << "------------------------------";
229 0 : TLOG() << "Generated TP vectors: " << m_tpv_made_count;
230 0 : TLOG() << "Generated TPs: " << m_tp_made_count;
231 0 : TLOG() << "Time taken: " << time_ms << " ms";
232 0 : TLOG() << "Rate: " << rate_hz << " TP vectors/s";
233 0 : TLOG() << "Failed to push TP vectors: " << m_tpv_failed_sent_count;
234 0 : TLOG();
235 :
236 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting stop() method";
237 0 : }
238 :
239 : void
240 0 : TPReplayModule::do_scrap(const CommandData_t& /*args*/)
241 : {
242 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering scrap() method";
243 :
244 0 : m_tp_streams.clear();
245 0 : m_threads.clear();
246 0 : m_tpstream_files.clear();
247 0 : m_all_tp_data.clear();
248 0 : m_filter_planes_ids.clear();
249 0 : m_validSubdetectors.clear();
250 :
251 0 : m_channel_map.reset();
252 :
253 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting scrap() method";
254 0 : }
255 :
256 : void
257 0 : TPReplayModule::generate_opmon_data()
258 : {
259 0 : opmon::TPReplayModuleInfo info;
260 :
261 0 : info.set_tp_made_count(m_tp_made_count);
262 0 : info.set_tpv_made_count(m_tpv_made_count);
263 0 : info.set_tpv_failed_sent_count(m_tpv_failed_sent_count);
264 :
265 0 : this->publish(std::move(info));
266 0 : }
267 :
268 : // This is the heavy-lifting function of TPRM
269 : // Goes over all provided TPstream files
270 : // Does basic file checks
271 : // Extracts needed data
272 : // Sorts data per ROU and plane
273 : // Additional data-related checks
274 : // Plane filtering happens here
275 : std::map<std::string, std::map<int, std::deque<std::vector<TriggerPrimitiveTypeAdapter>>>>
276 0 : TPReplayModule::read_tps(std::map<int, std::string> m_tpstream_files)
277 : {
278 :
279 0 : std::map<std::string, std::map<int, std::deque<std::vector<TriggerPrimitiveTypeAdapter>>>> all_data;
280 :
281 : // Loop over each file
282 0 : for (const auto& a_file : m_tpstream_files) {
283 0 : std::string filename = a_file.second;
284 :
285 : // Check file exists
286 0 : std::unique_ptr<hdf5libs::HDF5RawDataFile> input_file;
287 0 : try {
288 0 : input_file = std::make_unique<hdf5libs::HDF5RawDataFile>(filename);
289 0 : } catch (const hdf5libs::FileOpenFailed& e) {
290 0 : ers::error(dunedaq::trigger::ReplayFileProblem(ERS_HERE, get_name(), filename));
291 0 : return {};
292 0 : }
293 :
294 : // Check that the file is a TimeSlice type
295 0 : if (!input_file->is_timeslice_type()) {
296 0 : ers::error(dunedaq::trigger::BadTPInputFile(ERS_HERE, get_name(), filename));
297 0 : return {};
298 : }
299 :
300 0 : std::vector<std::string> fragment_paths = input_file->get_all_fragment_dataset_paths();
301 : // Check there are fragments
302 0 : if (fragment_paths.empty()) {
303 0 : ers::error(dunedaq::trigger::ReplayNoFragments(ERS_HERE, get_name(), filename));
304 0 : return {};
305 : }
306 :
307 : // Local counters
308 0 : std::set<std::string> local_rous;
309 0 : std::set<int> local_planes;
310 0 : int local_tp_vectors = 0;
311 0 : int local_tps = 0;
312 :
313 : // Loop over paths/fragments
314 0 : for (const auto& path : fragment_paths) {
315 0 : std::unique_ptr<daqdataformats::Fragment> frag = input_file->get_frag_ptr(path);
316 :
317 : // Check fragment has data
318 0 : auto frag_size = frag->get_data_size();
319 0 : if (frag_size == 0) {
320 0 : ers::error(dunedaq::trigger::ReplayEmptyFrag(ERS_HERE, get_name(), filename));
321 0 : continue;
322 : }
323 :
324 0 : trgdataformats::TriggerPrimitive* tp_array = static_cast<trgdataformats::TriggerPrimitive*>(frag->get_data());
325 0 : size_t num_tps = frag_size / sizeof(trgdataformats::TriggerPrimitive);
326 :
327 : // Check there is TP data
328 0 : if (num_tps < 1) {
329 0 : ers::error(dunedaq::trigger::ReplayNoValidTPs(ERS_HERE, get_name(), filename));
330 0 : continue;
331 : }
332 :
333 : // Store TPs
334 0 : auto& tp = tp_array[0];
335 :
336 : // Only select TPC TPs (for now)
337 0 : dunedaq::detdataformats::DetID::Subdetector subdet =
338 0 : static_cast<dunedaq::detdataformats::DetID::Subdetector>(tp.detid);
339 0 : if (!m_validSubdetectors.count(subdet)) {
340 0 : continue;
341 : }
342 :
343 : // Get ROU and plane
344 0 : std::string ROU;
345 0 : try {
346 0 : ROU = m_channel_map->get_element_name_from_offline_channel(tp.channel);
347 0 : local_rous.insert(ROU);
348 0 : } catch (...) {
349 0 : ers::error(dunedaq::trigger::ReplayROUError(ERS_HERE, get_name(), filename));
350 0 : continue;
351 0 : }
352 :
353 0 : int plane;
354 0 : try {
355 0 : plane = m_channel_map->get_plane_from_offline_channel(tp.channel);
356 0 : local_planes.insert(plane);
357 0 : } catch (...) {
358 0 : ers::error(dunedaq::trigger::ReplayPlaneError(ERS_HERE, get_name(), filename));
359 0 : continue;
360 0 : }
361 :
362 : // hack for APA1, basically making plane 1 collection plane
363 : // decide whether we want this here long term
364 0 : if (ROU == "APA_P02SU") {
365 0 : plane = (plane == 1) ? 2 : (plane == 2) ? 1 : plane;
366 : }
367 :
368 : // Check if this plane should be filtered
369 0 : if (std::find(m_filter_planes_ids.begin(), m_filter_planes_ids.end(), plane) != m_filter_planes_ids.end()) {
370 0 : continue;
371 : }
372 :
373 : // Extract trigger primitives
374 : // Create a vector of the correct size, and directly associate it with tp_array
375 0 : std::vector<TriggerPrimitiveTypeAdapter> tps(reinterpret_cast<TriggerPrimitiveTypeAdapter*>(tp_array),
376 0 : reinterpret_cast<TriggerPrimitiveTypeAdapter*>(tp_array) + num_tps);
377 :
378 0 : if (tps.empty()) {
379 0 : continue;
380 : }
381 :
382 : // Efficient insertion into deque
383 0 : auto& data_deque = all_data[ROU][plane];
384 :
385 : // First insertion if deque is empty
386 0 : if (data_deque.empty()) {
387 0 : data_deque.push_back(std::move(tps));
388 : }
389 : // Append if already in order
390 0 : else if (data_deque.back().front().tp.time_start <= tps.front().tp.time_start) {
391 0 : data_deque.push_back(std::move(tps));
392 : }
393 : // Prepend if this vector is older than the first element
394 0 : else if (data_deque.front().front().tp.time_start >= tps.front().tp.time_start) {
395 0 : data_deque.push_front(std::move(tps));
396 : }
397 : // General case: Insert using binary search (O(log N) search + O(1) insertion)
398 : else {
399 0 : auto insert_pos = std::lower_bound(
400 0 : data_deque.begin(),
401 0 : data_deque.end(),
402 : tps,
403 0 : [](const std::vector<TriggerPrimitiveTypeAdapter>& a, const std::vector<TriggerPrimitiveTypeAdapter>& b) {
404 0 : return a.front().tp.time_start < b.front().tp.time_start;
405 0 : });
406 :
407 0 : data_deque.insert(insert_pos, std::move(tps));
408 : }
409 0 : data_deque.back().shrink_to_fit(); // Reclaims unused memory
410 0 : frag.reset();
411 :
412 0 : local_tp_vectors++;
413 0 : local_tps += num_tps;
414 0 : } // frags loop
415 :
416 0 : TLOG() << "Data loading summary (end of file):";
417 0 : TLOG() << "------------------------------";
418 0 : TLOG() << "File: " << filename;
419 0 : TLOG() << "ROUs: " << local_rous.size();
420 0 : TLOG() << "Planes: " << local_planes.size();
421 0 : TLOG() << "TP vectors: " << local_tp_vectors;
422 0 : TLOG() << "Total read TPs: " << local_tps;
423 0 : TLOG();
424 :
425 0 : } // files loop
426 :
427 0 : TLOG() << "Data loading summary (all):";
428 0 : TLOG() << "------------------------------";
429 0 : TLOG() << "Files: " << m_tpstream_files.size();
430 : // Loop through the map and print sizes
431 0 : for (const auto& [ROU, plane_map] : all_data) {
432 0 : TLOG() << "ROU: " << ROU << ", Number of planes: " << plane_map.size();
433 :
434 : // Loop through each plane for the current ROU
435 0 : for (const auto& [plane, vector_of_tps] : plane_map) {
436 0 : TLOG() << " Plane: " << plane << ", Number of vectors: " << vector_of_tps.size();
437 : }
438 : }
439 :
440 0 : return all_data;
441 0 : }
442 :
443 : void
444 0 : TPReplayModule::do_work(
445 : std::atomic<bool>& running_flag,
446 : std::deque<std::vector<TriggerPrimitiveTypeAdapter>>& tpvs,
447 : std::shared_ptr<iomanager::SenderConcept<std::vector<trigger::TriggerPrimitiveTypeAdapter>>>& tp_sink,
448 : std::chrono::steady_clock::time_point earliest_timestamp_time)
449 : {
450 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering do_work() method";
451 :
452 0 : int current_iteration = 0; // NOLINT(build/unsigned)
453 :
454 0 : uint64_t prev_tpv_start_time = 0; // NOLINT(build/unsigned)
455 0 : auto prev_tpv_send_time = std::chrono::steady_clock::now();
456 :
457 0 : auto const total_stream_duration = m_latest_last_tp_timestamp - m_earliest_first_tp_timestamp;
458 0 : auto run_start_time = std::chrono::steady_clock::now();
459 :
460 : // Local counters
461 0 : int local_tp_made = 0;
462 0 : int local_tpv_made = 0;
463 0 : int local_tpv_failed = 0;
464 :
465 0 : while (running_flag.load()) {
466 :
467 : // Looping logic: exit if m_loops is set and we've reached the limit
468 0 : if ((m_loops != -1) && (current_iteration >= m_loops)) {
469 : break;
470 : }
471 :
472 : // Going over TP vectors
473 0 : for (auto& tpv : tpvs) {
474 :
475 0 : if (!running_flag.load()) {
476 : break;
477 : }
478 :
479 : // The argument `earliest_timestamp_time` is the wall-clock time
480 : // of the earliest first tpset timestamp in _any_ of the input
481 : // streams. So for the first TP vector we send out, we wait until
482 : // _this_ stream's first timestamp comes up
483 0 : auto wait_time_us = 0;
484 0 : std::chrono::steady_clock::time_point next_tpv_send_time;
485 0 : if (prev_tpv_start_time == 0) {
486 0 : wait_time_us = (tpv.front().tp.time_start - m_earliest_first_tp_timestamp) / m_clocks_per_us;
487 0 : next_tpv_send_time = earliest_timestamp_time + std::chrono::microseconds(wait_time_us);
488 : } else {
489 0 : wait_time_us = (tpv.front().tp.time_start - prev_tpv_start_time) / m_clocks_per_us;
490 0 : next_tpv_send_time = prev_tpv_send_time + std::chrono::microseconds(wait_time_us);
491 : }
492 :
493 : // Check running_flag periodically so we can stop punctually
494 0 : auto slice_period = std::chrono::microseconds(m_conf->get_maximum_wait_time_us());
495 0 : auto next_slice_send_time = prev_tpv_send_time + slice_period;
496 0 : bool break_flag = false;
497 0 : while (next_tpv_send_time > next_slice_send_time + slice_period) {
498 0 : if (!running_flag.load()) {
499 : break_flag = true;
500 : break;
501 : }
502 0 : std::this_thread::sleep_until(next_slice_send_time);
503 0 : next_slice_send_time = next_slice_send_time + slice_period;
504 : }
505 0 : if (!break_flag) {
506 0 : std::this_thread::sleep_until(next_tpv_send_time);
507 : }
508 :
509 : // Update times
510 0 : prev_tpv_send_time = next_tpv_send_time;
511 0 : prev_tpv_start_time = tpv.front().tp.time_start;
512 :
513 : // Update counters
514 0 : m_tpv_made_count++;
515 0 : m_tp_made_count += tpv.size();
516 0 : local_tpv_made++;
517 0 : local_tp_made += tpv.size();
518 :
519 : // Actually send data
520 0 : try {
521 : // Decide whether to move or copy based on loop count
522 0 : if (m_loops == 1) {
523 : // Only one loop: safe to move original data
524 0 : tp_sink->send(std::move(tpv), m_queue_timeout);
525 : } else {
526 : // Multiple or infinite loops: send a copy to preserve original
527 0 : auto tpv_copy = tpv;
528 0 : tp_sink->send(std::move(tpv_copy), m_queue_timeout);
529 0 : }
530 0 : } catch (const dunedaq::iomanager::TimeoutExpired& e) {
531 0 : ers::warning(e);
532 0 : m_tpv_failed_sent_count++;
533 0 : local_tpv_failed++;
534 0 : }
535 :
536 : // Increase timestamps in the TPs so they don't
537 : // repeat when we do multiple loops over the file
538 0 : bool will_repeat = (m_loops == -1) || (current_iteration + 1 < m_loops);
539 : if (will_repeat) {
540 0 : for (auto& tpa : tpv) {
541 0 : tpa.tp.time_start += total_stream_duration;
542 : }
543 : }
544 :
545 : } // end loop over tpsets
546 0 : ++current_iteration;
547 :
548 : } // end while(running_flag.load())
549 :
550 0 : auto run_end_time = std::chrono::steady_clock::now();
551 0 : auto time_ms = std::chrono::duration_cast<std::chrono::milliseconds>(run_end_time - run_start_time).count();
552 0 : float rate_hz = 1e3 * static_cast<float>(local_tpv_made) / time_ms;
553 :
554 0 : TLOG() << "Thread summary:";
555 0 : TLOG() << "------------------------------";
556 0 : TLOG() << "Sent TPs: " << local_tp_made;
557 0 : TLOG() << "TP vectors: " << local_tpv_made;
558 0 : TLOG() << "Time taken: " << time_ms << " ms";
559 0 : TLOG() << "Rate: " << rate_hz << " TP vectors/s";
560 0 : TLOG() << "Failed to push TP vectors: " << local_tpv_failed;
561 0 : TLOG();
562 :
563 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting do_work() method";
564 0 : }
565 :
566 : } // namespace dunedaq::trigger
567 :
568 0 : DEFINE_DUNE_DAQ_MODULE(dunedaq::trigger::TPReplayModule)
|