DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
process_tpstream.cxx
Go to the documentation of this file.
3
4#include "CLI/App.hpp"
5#include "CLI/Config.hpp"
6#include "CLI/Formatter.hpp"
7
8#include <fmt/core.h>
9#include <fmt/format.h>
10#include <fmt/chrono.h>
11#include <filesystem>
12
19
20using namespace dunedaq;
21
23{
24private:
25 /* data */
26 std::unique_ptr<hdf5libs::HDF5RawDataFile> m_input_file;
27 std::unique_ptr<hdf5libs::HDF5RawDataFile> m_output_file;
28
29 void open_files(std::string input_path, std::string output_path);
30 void close_files();
31
33
34 // Can modify?
36
37public:
38
39 TimeSliceProcessor(std::string input_path, std::string output_path);
41
42 void set_processor(std::function<void(daqdataformats::TimeSlice&)> processor);
43 void loop(uint64_t num_records = 0, uint64_t offset = 0, bool quiet = false);
44
45};
46
47//-----------------------------------------------------------------------------
48TimeSliceProcessor::TimeSliceProcessor(std::string input_path, std::string output_path)
49{
50 this->open_files(input_path, output_path);
51}
52
53//-----------------------------------------------------------------------------
58
59//-----------------------------------------------------------------------------
60void
61TimeSliceProcessor::open_files(std::string input_path, std::string output_path) {
62 // Open input file
63 m_input_file = std::make_unique<hdf5libs::HDF5RawDataFile>(input_path);
64
65 if (!m_input_file->is_timeslice_type()) {
66 fmt::print("ERROR: input file '{}' not of type 'TimeSlice'\n", input_path);
67 throw std::runtime_error(fmt::format("ERROR: input file '{}' not of type 'TimeSlice'", input_path));
68 }
69
70 auto run_number = m_input_file->get_attribute<daqdataformats::run_number_t>("run_number");
71 auto file_index = m_input_file->get_attribute<size_t>("file_index");
72 auto application_name = m_input_file->get_attribute<std::string>("application_name");
73
74 fmt::print("Run Number: {}\nFile Index: {}\nApp name: '{}'\n", run_number, file_index, application_name);
75
76 if (!output_path.empty()) {
77 // Open output file
78 m_output_file = std::make_unique<hdf5libs::HDF5RawDataFile>(
79 output_path,
80 m_input_file->get_attribute<daqdataformats::run_number_t>("run_number"),
81 m_input_file->get_attribute<size_t>("file_index"),
82 m_input_file->get_attribute<std::string>("application_name"),
83 m_input_file->get_file_layout().get_file_layout_params(),
84 m_input_file->get_srcid_geoid_map()
85 );
86 }
87}
88
89//-----------------------------------------------------------------------------
90void
92 // Do something?
93}
94
95//-----------------------------------------------------------------------------
96void
98 m_processor = processor;
99}
100
101//-----------------------------------------------------------------------------
102void
107
108//-----------------------------------------------------------------------------
109void
110TimeSliceProcessor::loop(uint64_t num_records, uint64_t offset, bool quiet) {
111
112 // Replace with a record selection?
113 auto records = m_input_file->get_all_record_ids();
114
115 if (!num_records) {
116 num_records = (records.size()-offset);
117 }
118
119 uint64_t first_rec = offset, last_rec = offset+num_records;
120
121 uint64_t i_rec(0);
122 for( const auto& rid : records ) {
123
124 if (i_rec < first_rec || i_rec >= last_rec ) {
125 ++i_rec;
126 continue;
127 }
128
129 if (!quiet)
130 fmt::print("\n-- Processing TSL {}:{}\n\n", rid.first, rid.second);
131 auto tsl = m_input_file->get_timeslice(rid);
132 // Or filter on a selection here using a lambda?
133
134 // if (!quiet)
135 // fmt::print("TSL number {}\n", tsl.get_header().timeslice_number);
136
137 // Add a process method
138 this->process(tsl);
139
140 if (m_output_file)
141 m_output_file->write(tsl);
142
143 ++i_rec;
144 if(!quiet)
145 fmt::print("\n-- Finished TSL {}:{}\n\n", rid.first, rid.second);
146
147 }
148
149}
150
151
152//-----------------------------------------------------------------------------
153int main(int argc, char const *argv[])
154{
155
156 CLI::App app{"tapipe"};
157 // argv = app.ensure_utf8(argv);
158
159 std::string input_file_path;
160 app.add_option("-i", input_file_path, "Input TPStream file path")->required();
161 std::string output_file_path;
162 app.add_option("-o", output_file_path, "Output TPStream file path");
163 std::string channel_map_name = "VDColdboxTPCChannelMap";
164 app.add_option("-m", channel_map_name, "Detector Channel Map");
165 std::string config_name;
166 app.add_option("-j", config_name, "Trigger Activity and Candidate config JSON to use.")->required();
167 uint64_t skip_rec(0);
168 app.add_option("-s", skip_rec, "Skip records");
169 uint64_t num_rec(0);
170 app.add_option("-n", num_rec, "Process records");
171
172 bool quiet = false;
173 app.add_flag("--quiet", quiet, "Quiet outputs.");
174
175 bool latencies = false;
176 app.add_flag("--latencies", latencies, "Saves latencies per TP into csv");
177 CLI11_PARSE(app, argc, argv);
178
179
180 if (!quiet)
181 fmt::print("TPStream file: {}\n", input_file_path);
182
183 TimeSliceProcessor rp(input_file_path, output_file_path);
184
185 // TP source id (subsystem)
186 auto tp_subsystem_requirement = daqdataformats::SourceID::Subsystem::kTrigger;
187
188 auto channel_map = dunedaq::detchannelmaps::make_tpc_map(channel_map_name);
189
190 // Read configuration
191 std::ifstream config_stream(config_name);
192 nlohmann::json config = nlohmann::json::parse(config_stream);
193
194 // Only use the first plugin for now.
195 nlohmann::json ta_algo = config["trigger_activity_plugin"][0];
196 nlohmann::json ta_config = config["trigger_activity_config"][0];
197
198 nlohmann::json tc_algo = config["trigger_candidate_plugin"][0];
199 nlohmann::json tc_config = config["trigger_candidate_config"][0];
200
201
202 // Finally create a TA maker
203 std::unique_ptr<triggeralgs::TriggerActivityMaker> ta_maker =
205 ta_maker->configure(ta_config);
206 std::unique_ptr<trgtools::EmulateTAUnit> ta_emulator = std::make_unique<trgtools::EmulateTAUnit>();
207 ta_emulator->set_maker(ta_maker);
208 // TODO: Use a better file naming scheme for CSV.
209 if (latencies) {
210 std::filesystem::path output_path(output_file_path);
211 ta_emulator->set_timing_file((output_path.parent_path() / ("ta_timings_" + output_path.stem().string() + ".csv")).string());
212 ta_emulator->write_csv_header("TP Time Start,TP ADC Integral,Time Diffs,Is Last TP In TA");
213 }
214
215 // Finally create a TA maker
216 std::unique_ptr<triggeralgs::TriggerCandidateMaker> tc_maker =
218 tc_maker->configure(tc_config);
219 std::unique_ptr<trgtools::EmulateTCUnit> tc_emulator = std::make_unique<trgtools::EmulateTCUnit>();
220 tc_emulator->set_maker(tc_maker);
221 // TODO: Use a better file naming scheme for CSV.
222 if (latencies) {
223 std::filesystem::path output_path(output_file_path);
224 tc_emulator->set_timing_file((output_path.parent_path() / ("tc_timings_" + output_path.stem().string() + ".csv")).string());
225 tc_emulator->write_csv_header("Time Diffs");
226 }
227
228 // Generic filter hook
229 std::function<bool(const trgdataformats::TriggerPrimitive&)> tp_filter;
230
231 auto z_plane_filter = [&]( const trgdataformats::TriggerPrimitive& tp ) -> bool {
232 return (channel_map->get_plane_from_offline_channel(tp.channel) != 2);
233 };
234
235 tp_filter = z_plane_filter;
236
237 rp.set_processor([&]( daqdataformats::TimeSlice& tsl ) -> void {
238 const std::vector<std::unique_ptr<daqdataformats::Fragment>>& frags = tsl.get_fragments_ref();
239 const size_t num_frags = frags.size();
240 if(!quiet)
241 fmt::print("The number of fragments: {}\n", num_frags);
242
243 uint64_t average_ta_time = 0;
244 uint64_t average_tc_time = 0;
245
246 size_t num_tas = 0;
247 size_t num_tcs = 0;
248
249 // Need a static for-loop: adding fragments to tsl will mutate frags even though it's const.
250 for (size_t i = 0; i < num_frags; i++) {
251 const auto& frag = frags[i];
252
253 // The fragment has to be for the trigger (not e.g. for retreival from readout)
254 if (frag->get_element_id().subsystem != tp_subsystem_requirement) {
255 if(!quiet)
256 fmt::print(" Warning, got non kTrigger SourceID {}\n", frag->get_element_id().to_string());
257 continue;
258 }
259
260 // The fragment has to be TriggerPrimitive
261 if(frag->get_fragment_type() != daqdataformats::FragmentType::kTriggerPrimitive){
262 if(!quiet)
263 fmt::print(" Error: FragmentType is: {}!\n", fragment_type_to_string(frag->get_fragment_type()));
264 continue;
265 }
266
267 // This bit should be outside the loop
268 if (!quiet)
269 fmt::print(" Fragment id: {} [{}]\n", frag->get_element_id().to_string(), daqdataformats::fragment_type_to_string(frag->get_fragment_type()));
270
271 // Pull tps out
272 size_t n_tps = frag->get_data_size()/sizeof(trgdataformats::TriggerPrimitive);
273 if (!quiet) {
274 fmt::print(" TP fragment size: {}\n", frag->get_data_size());
275 fmt::print(" Num TPs: {}\n", n_tps);
276 }
277
278 // Create a TP buffer
279 std::vector<trgdataformats::TriggerPrimitive> tp_buffer;
280 // Prepare the TP buffer, checking for time ordering
281 tp_buffer.reserve(tp_buffer.size()+n_tps);
282
283 // Populate the TP buffer
284 trgdataformats::TriggerPrimitive* tp_array = static_cast<trgdataformats::TriggerPrimitive*>(frag->get_data());
285 uint64_t last_ts = 0;
286 for(size_t i(0); i<n_tps; ++i) {
287 auto& tp = tp_array[i];
288 if (tp.time_start <= last_ts && !quiet) {
289 fmt::print(" ERROR: {} {} ", +tp.time_start, last_ts );
290 }
291 tp_buffer.push_back(tp);
292 }
293
294 // Print some useful info
295 uint64_t d_ts = tp_array[n_tps-1].time_start - tp_array[0].time_start;
296 if (!quiet)
297 fmt::print(" TS gap: {} {} ms\n", d_ts, d_ts*16.0/1'000'000);
298
299 //
300 // TA Processing
301 //
302
303 const auto ta_start = std::chrono::steady_clock::now();
304 std::unique_ptr<daqdataformats::Fragment> ta_frag = ta_emulator->emulate_vector(tp_buffer);
305 const auto ta_end = std::chrono::steady_clock::now();
306
307 if (ta_frag == nullptr) // Buffer was empty.
308 continue;
309 num_tas += ta_emulator->get_last_output_buffer().size();
310
311 // TA time calculation.
312 const uint64_t ta_diff = std::chrono::nanoseconds(ta_end - ta_start).count();
313 average_ta_time += ta_diff;
314 if (!quiet) {
315 fmt::print("\tTA Time Process: {} ns.\n", ta_diff);
316 }
317
318 daqdataformats::FragmentHeader frag_hdr = frag->get_header();
319
320 // Customise the source id (add 1000 to id)
321 frag_hdr.element_id = daqdataformats::SourceID{daqdataformats::SourceID::Subsystem::kTrigger, frag->get_element_id().id+1000};
322
323 ta_frag->set_header_fields(frag_hdr);
324 ta_frag->set_type(daqdataformats::FragmentType::kTriggerActivity);
325
326
327 tsl.add_fragment(std::move(ta_frag));
328 //
329 // TA Processing Ends
330 //
331
332 //
333 // TC Processing
334 //
335
336 std::vector<triggeralgs::TriggerActivity> ta_buffer = ta_emulator->get_last_output_buffer();
337 const auto tc_start = std::chrono::steady_clock::now();
338 std::unique_ptr<daqdataformats::Fragment> tc_frag = tc_emulator->emulate_vector(ta_buffer);
339 const auto tc_end = std::chrono::steady_clock::now();
340
341 if (tc_frag == nullptr) // Buffer was empty.
342 continue;
343 num_tcs += tc_emulator->get_last_output_buffer().size();
344
345 // TC time calculation.
346 const uint64_t tc_diff = std::chrono::nanoseconds(tc_end - tc_start).count();
347 average_tc_time += tc_diff;
348 if (!quiet) {
349 fmt::print("\tTC Time Process: {} ns.\n", tc_diff);
350 }
351
352 // Shares the same frag_hdr.
353 tc_frag->set_header_fields(frag_hdr);
354 tc_frag->set_type(daqdataformats::FragmentType::kTriggerCandidate);
355
356 tsl.add_fragment(std::move(tc_frag));
357
358 } // Fragment for loop
359
360 if (num_tas == 0) average_ta_time = 0;
361 else average_ta_time /= num_tas;
362 if (num_tcs == 0) average_tc_time = 0;
363 else average_tc_time /= num_tcs;
364 if (!quiet) {
365 fmt::print("\t\tAverage TA Time Process ({} TAs): {} ns.\n", num_tas, average_ta_time);
366 fmt::print("\t\tAverage TC Time Process ({} TCs): {} ns.\n", num_tcs, average_tc_time);
367 }
368 });
369
370 rp.loop(num_rec, skip_rec);
371
372 /* code */
373 return 0;
374}
void set_processor(std::function< void(daqdataformats::TimeSlice &)> processor)
void process(daqdataformats::TimeSlice &tls)
std::unique_ptr< hdf5libs::HDF5RawDataFile > m_input_file
TimeSliceProcessor(std::string input_path, std::string output_path)
std::unique_ptr< hdf5libs::HDF5RawDataFile > m_output_file
void open_files(std::string input_path, std::string output_path)
std::function< void(daqdataformats::TimeSlice &)> m_processor
void loop(uint64_t num_records=0, uint64_t offset=0, bool quiet=false)
C++ Representation of a DUNE TimeSlice, consisting of a TimeSliceHeader object and a vector of pointe...
Definition TimeSlice.hpp:27
void add_fragment(std::unique_ptr< Fragment > &&fragment)
Add a Fragment pointer to the Fragments vector.
Definition TimeSlice.hpp:74
const std::vector< std::unique_ptr< Fragment > > & get_fragments_ref() const
Get a handle to the Fragments.
Definition TimeSlice.hpp:62
static std::shared_ptr< AbstractFactory< TriggerActivityMaker > > get_instance()
double offset
uint32_t run_number_t
Type used to represent run number.
Definition Types.hpp:20
Including Qt Headers.
int main(int argc, char const *argv[])
The header for a DUNE Fragment.
SourceID element_id
Component that generated the data in this Fragment.
SourceID is a generalized representation of the source of a piece of data in the DAQ....
Definition SourceID.hpp:32
A single energy deposition on a TPC or PDS channel.