Line data Source code
1 : #include "trgtools/EmulateTAUnit.hpp"
2 : #include "trgtools/EmulateTCUnit.hpp"
3 :
4 : #include "CLI/App.hpp"
5 : #include "CLI/Config.hpp"
6 : #include "CLI/Formatter.hpp"
7 :
8 : #include <fmt/core.h>
9 : #include <fmt/format.h>
10 : #include <fmt/chrono.h>
11 : #include <filesystem>
12 :
13 : #include "hdf5libs/HDF5RawDataFile.hpp"
14 : #include "trgdataformats/TriggerPrimitive.hpp"
15 : #include "triggeralgs/TriggerActivityFactory.hpp"
16 : #include "triggeralgs/TriggerCandidateFactory.hpp"
17 : #include "triggeralgs/TriggerObjectOverlay.hpp"
18 : #include "detchannelmaps/TPCChannelMap.hpp"
19 :
20 : using namespace dunedaq;
21 :
22 : class TimeSliceProcessor
23 : {
24 : private:
25 : /* data */
26 : std::unique_ptr<hdf5libs::HDF5RawDataFile> m_input_file;
27 : std::unique_ptr<hdf5libs::HDF5RawDataFile> m_output_file;
28 :
29 : void open_files(std::string input_path, std::string output_path);
30 : void close_files();
31 :
32 : void process( daqdataformats::TimeSlice& tls );
33 :
34 : // Can modify?
35 : std::function<void(daqdataformats::TimeSlice&)> m_processor;
36 :
37 : public:
38 :
39 : TimeSliceProcessor(std::string input_path, std::string output_path);
40 : ~TimeSliceProcessor();
41 :
42 : void set_processor(std::function<void(daqdataformats::TimeSlice&)> processor);
43 : void loop(uint64_t num_records = 0, uint64_t offset = 0, bool quiet = false);
44 :
45 : };
46 :
47 : //-----------------------------------------------------------------------------
48 0 : TimeSliceProcessor::TimeSliceProcessor(std::string input_path, std::string output_path)
49 : {
50 0 : this->open_files(input_path, output_path);
51 0 : }
52 :
53 : //-----------------------------------------------------------------------------
54 0 : TimeSliceProcessor::~TimeSliceProcessor()
55 : {
56 0 : this->close_files();
57 0 : }
58 :
59 : //-----------------------------------------------------------------------------
60 : void
61 0 : TimeSliceProcessor::open_files(std::string input_path, std::string output_path) {
62 : // Open input file
63 0 : m_input_file = std::make_unique<hdf5libs::HDF5RawDataFile>(input_path);
64 :
65 0 : if (!m_input_file->is_timeslice_type()) {
66 0 : fmt::print("ERROR: input file '{}' not of type 'TimeSlice'\n", input_path);
67 0 : throw std::runtime_error(fmt::format("ERROR: input file '{}' not of type 'TimeSlice'", input_path));
68 : }
69 :
70 0 : auto run_number = m_input_file->get_attribute<daqdataformats::run_number_t>("run_number");
71 0 : auto file_index = m_input_file->get_attribute<size_t>("file_index");
72 0 : auto application_name = m_input_file->get_attribute<std::string>("application_name");
73 :
74 0 : fmt::print("Run Number: {}\nFile Index: {}\nApp name: '{}'\n", run_number, file_index, application_name);
75 :
76 0 : if (!output_path.empty()) {
77 : // Open output file
78 0 : m_output_file = std::make_unique<hdf5libs::HDF5RawDataFile>(
79 : output_path,
80 0 : m_input_file->get_attribute<daqdataformats::run_number_t>("run_number"),
81 0 : m_input_file->get_attribute<size_t>("file_index"),
82 0 : m_input_file->get_attribute<std::string>("application_name"),
83 0 : m_input_file->get_file_layout().get_file_layout_params(),
84 0 : m_input_file->get_srcid_geoid_map()
85 0 : );
86 : }
87 0 : }
88 :
89 : //-----------------------------------------------------------------------------
90 : void
91 0 : TimeSliceProcessor::close_files() {
92 : // Do something?
93 0 : }
94 :
95 : //-----------------------------------------------------------------------------
96 : void
97 0 : TimeSliceProcessor::set_processor(std::function<void(daqdataformats::TimeSlice& )> processor) {
98 0 : m_processor = processor;
99 0 : }
100 :
101 : //-----------------------------------------------------------------------------
102 : void
103 0 : TimeSliceProcessor::process( daqdataformats::TimeSlice& tls ) {
104 0 : if (m_processor)
105 0 : m_processor(tls);
106 0 : }
107 :
108 : //-----------------------------------------------------------------------------
109 : void
110 0 : TimeSliceProcessor::loop(uint64_t num_records, uint64_t offset, bool quiet) {
111 :
112 : // Replace with a record selection?
113 0 : auto records = m_input_file->get_all_record_ids();
114 :
115 0 : if (!num_records) {
116 0 : num_records = (records.size()-offset);
117 : }
118 :
119 0 : uint64_t first_rec = offset, last_rec = offset+num_records;
120 :
121 0 : uint64_t i_rec(0);
122 0 : for( const auto& rid : records ) {
123 :
124 0 : if (i_rec < first_rec || i_rec >= last_rec ) {
125 0 : ++i_rec;
126 0 : continue;
127 : }
128 :
129 0 : if (!quiet)
130 0 : fmt::print("\n-- Processing TSL {}:{}\n\n", rid.first, rid.second);
131 0 : auto tsl = m_input_file->get_timeslice(rid);
132 : // Or filter on a selection here using a lambda?
133 :
134 : // if (!quiet)
135 : // fmt::print("TSL number {}\n", tsl.get_header().timeslice_number);
136 :
137 : // Add a process method
138 0 : this->process(tsl);
139 :
140 0 : if (m_output_file)
141 0 : m_output_file->write(tsl);
142 :
143 0 : ++i_rec;
144 0 : if(!quiet)
145 0 : fmt::print("\n-- Finished TSL {}:{}\n\n", rid.first, rid.second);
146 :
147 0 : }
148 :
149 0 : }
150 :
151 :
152 : //-----------------------------------------------------------------------------
153 0 : int main(int argc, char const *argv[])
154 : {
155 :
156 0 : CLI::App app{"tapipe"};
157 : // argv = app.ensure_utf8(argv);
158 :
159 0 : std::string input_file_path;
160 0 : app.add_option("-i", input_file_path, "Input TPStream file path")->required();
161 0 : std::string output_file_path;
162 0 : app.add_option("-o", output_file_path, "Output TPStream file path");
163 0 : std::string channel_map_name = "VDColdboxTPCChannelMap";
164 0 : app.add_option("-m", channel_map_name, "Detector Channel Map");
165 0 : std::string config_name;
166 0 : app.add_option("-j", config_name, "Trigger Activity and Candidate config JSON to use.")->required();
167 0 : uint64_t skip_rec(0);
168 0 : app.add_option("-s", skip_rec, "Skip records");
169 0 : uint64_t num_rec(0);
170 0 : app.add_option("-n", num_rec, "Process records");
171 :
172 0 : bool quiet = false;
173 0 : app.add_flag("--quiet", quiet, "Quiet outputs.");
174 :
175 0 : bool latencies = false;
176 0 : app.add_flag("--latencies", latencies, "Saves latencies per TP into csv");
177 0 : CLI11_PARSE(app, argc, argv);
178 :
179 :
180 0 : if (!quiet)
181 0 : fmt::print("TPStream file: {}\n", input_file_path);
182 :
183 0 : TimeSliceProcessor rp(input_file_path, output_file_path);
184 :
185 : // TP source id (subsystem)
186 0 : auto tp_subsystem_requirement = daqdataformats::SourceID::Subsystem::kTrigger;
187 :
188 0 : auto channel_map = dunedaq::detchannelmaps::make_tpc_map(channel_map_name);
189 :
190 : // Read configuration
191 0 : std::ifstream config_stream(config_name);
192 0 : nlohmann::json config = nlohmann::json::parse(config_stream);
193 :
194 : // Only use the first plugin for now.
195 0 : nlohmann::json ta_algo = config["trigger_activity_plugin"][0];
196 0 : nlohmann::json ta_config = config["trigger_activity_config"][0];
197 :
198 0 : nlohmann::json tc_algo = config["trigger_candidate_plugin"][0];
199 0 : nlohmann::json tc_config = config["trigger_candidate_config"][0];
200 :
201 :
202 : // Finally create a TA maker
203 0 : std::unique_ptr<triggeralgs::TriggerActivityMaker> ta_maker =
204 0 : triggeralgs::TriggerActivityFactory::get_instance()->build_maker(ta_algo);
205 0 : ta_maker->configure(ta_config);
206 0 : std::unique_ptr<trgtools::EmulateTAUnit> ta_emulator = std::make_unique<trgtools::EmulateTAUnit>();
207 0 : ta_emulator->set_maker(ta_maker);
208 : // TODO: Use a better file naming scheme for CSV.
209 0 : if (latencies) {
210 0 : std::filesystem::path output_path(output_file_path);
211 0 : ta_emulator->set_timing_file((output_path.parent_path() / ("ta_timings_" + output_path.stem().string() + ".csv")).string());
212 0 : ta_emulator->write_csv_header("TP Time Start,TP ADC Integral,Time Diffs,Is Last TP In TA");
213 0 : }
214 :
215 : // Finally create a TA maker
216 0 : std::unique_ptr<triggeralgs::TriggerCandidateMaker> tc_maker =
217 0 : triggeralgs::TriggerCandidateFactory::get_instance()->build_maker(tc_algo);
218 0 : tc_maker->configure(tc_config);
219 0 : std::unique_ptr<trgtools::EmulateTCUnit> tc_emulator = std::make_unique<trgtools::EmulateTCUnit>();
220 0 : tc_emulator->set_maker(tc_maker);
221 : // TODO: Use a better file naming scheme for CSV.
222 0 : if (latencies) {
223 0 : std::filesystem::path output_path(output_file_path);
224 0 : tc_emulator->set_timing_file((output_path.parent_path() / ("tc_timings_" + output_path.stem().string() + ".csv")).string());
225 0 : tc_emulator->write_csv_header("Time Diffs");
226 0 : }
227 :
228 : // Generic filter hook
229 0 : std::function<bool(const trgdataformats::TriggerPrimitive&)> tp_filter;
230 :
231 0 : auto z_plane_filter = [&]( const trgdataformats::TriggerPrimitive& tp ) -> bool {
232 0 : return (channel_map->get_plane_from_offline_channel(tp.channel) != 2);
233 0 : };
234 :
235 0 : tp_filter = z_plane_filter;
236 :
237 0 : rp.set_processor([&]( daqdataformats::TimeSlice& tsl ) -> void {
238 0 : const std::vector<std::unique_ptr<daqdataformats::Fragment>>& frags = tsl.get_fragments_ref();
239 0 : const size_t num_frags = frags.size();
240 0 : if(!quiet)
241 0 : fmt::print("The number of fragments: {}\n", num_frags);
242 :
243 0 : uint64_t average_ta_time = 0;
244 0 : uint64_t average_tc_time = 0;
245 :
246 0 : size_t num_tas = 0;
247 0 : size_t num_tcs = 0;
248 :
249 : // Need a static for-loop: adding fragments to tsl will mutate frags even though it's const.
250 0 : for (size_t i = 0; i < num_frags; i++) {
251 0 : const auto& frag = frags[i];
252 :
253 : // The fragment has to be for the trigger (not e.g. for retreival from readout)
254 0 : if (frag->get_element_id().subsystem != tp_subsystem_requirement) {
255 0 : if(!quiet)
256 0 : fmt::print(" Warning, got non kTrigger SourceID {}\n", frag->get_element_id().to_string());
257 0 : continue;
258 : }
259 :
260 : // The fragment has to be TriggerPrimitive
261 0 : if(frag->get_fragment_type() != daqdataformats::FragmentType::kTriggerPrimitive){
262 0 : if(!quiet)
263 0 : fmt::print(" Error: FragmentType is: {}!\n", fragment_type_to_string(frag->get_fragment_type()));
264 0 : continue;
265 : }
266 :
267 : // This bit should be outside the loop
268 0 : if (!quiet)
269 0 : fmt::print(" Fragment id: {} [{}]\n", frag->get_element_id().to_string(), daqdataformats::fragment_type_to_string(frag->get_fragment_type()));
270 :
271 : // Pull tps out
272 0 : size_t n_tps = frag->get_data_size()/sizeof(trgdataformats::TriggerPrimitive);
273 0 : if (!quiet) {
274 0 : fmt::print(" TP fragment size: {}\n", frag->get_data_size());
275 0 : fmt::print(" Num TPs: {}\n", n_tps);
276 : }
277 :
278 : // Create a TP buffer
279 0 : std::vector<trgdataformats::TriggerPrimitive> tp_buffer;
280 : // Prepare the TP buffer, checking for time ordering
281 0 : tp_buffer.reserve(tp_buffer.size()+n_tps);
282 :
283 : // Populate the TP buffer
284 0 : trgdataformats::TriggerPrimitive* tp_array = static_cast<trgdataformats::TriggerPrimitive*>(frag->get_data());
285 0 : uint64_t last_ts = 0;
286 0 : for(size_t i(0); i<n_tps; ++i) {
287 0 : auto& tp = tp_array[i];
288 0 : if (tp.time_start <= last_ts && !quiet) {
289 0 : fmt::print(" ERROR: {} {} ", +tp.time_start, last_ts );
290 : }
291 0 : tp_buffer.push_back(tp);
292 : }
293 :
294 : // Print some useful info
295 0 : uint64_t d_ts = tp_array[n_tps-1].time_start - tp_array[0].time_start;
296 0 : if (!quiet)
297 0 : fmt::print(" TS gap: {} {} ms\n", d_ts, d_ts*16.0/1'000'000);
298 :
299 : //
300 : // TA Processing
301 : //
302 :
303 0 : const auto ta_start = std::chrono::steady_clock::now();
304 0 : std::unique_ptr<daqdataformats::Fragment> ta_frag = ta_emulator->emulate_vector(tp_buffer);
305 0 : const auto ta_end = std::chrono::steady_clock::now();
306 :
307 0 : if (ta_frag == nullptr) // Buffer was empty.
308 0 : continue;
309 0 : num_tas += ta_emulator->get_last_output_buffer().size();
310 :
311 : // TA time calculation.
312 0 : const uint64_t ta_diff = std::chrono::nanoseconds(ta_end - ta_start).count();
313 0 : average_ta_time += ta_diff;
314 0 : if (!quiet) {
315 0 : fmt::print("\tTA Time Process: {} ns.\n", ta_diff);
316 : }
317 :
318 0 : daqdataformats::FragmentHeader frag_hdr = frag->get_header();
319 :
320 : // Customise the source id (add 1000 to id)
321 0 : frag_hdr.element_id = daqdataformats::SourceID{daqdataformats::SourceID::Subsystem::kTrigger, frag->get_element_id().id+1000};
322 :
323 0 : ta_frag->set_header_fields(frag_hdr);
324 0 : ta_frag->set_type(daqdataformats::FragmentType::kTriggerActivity);
325 :
326 :
327 0 : tsl.add_fragment(std::move(ta_frag));
328 : //
329 : // TA Processing Ends
330 : //
331 :
332 : //
333 : // TC Processing
334 : //
335 :
336 0 : std::vector<triggeralgs::TriggerActivity> ta_buffer = ta_emulator->get_last_output_buffer();
337 0 : const auto tc_start = std::chrono::steady_clock::now();
338 0 : std::unique_ptr<daqdataformats::Fragment> tc_frag = tc_emulator->emulate_vector(ta_buffer);
339 0 : const auto tc_end = std::chrono::steady_clock::now();
340 :
341 0 : if (tc_frag == nullptr) // Buffer was empty.
342 0 : continue;
343 0 : num_tcs += tc_emulator->get_last_output_buffer().size();
344 :
345 : // TC time calculation.
346 0 : const uint64_t tc_diff = std::chrono::nanoseconds(tc_end - tc_start).count();
347 0 : average_tc_time += tc_diff;
348 0 : if (!quiet) {
349 0 : fmt::print("\tTC Time Process: {} ns.\n", tc_diff);
350 : }
351 :
352 : // Shares the same frag_hdr.
353 0 : tc_frag->set_header_fields(frag_hdr);
354 0 : tc_frag->set_type(daqdataformats::FragmentType::kTriggerCandidate);
355 :
356 0 : tsl.add_fragment(std::move(tc_frag));
357 :
358 0 : } // Fragment for loop
359 :
360 0 : if (num_tas == 0) average_ta_time = 0;
361 0 : else average_ta_time /= num_tas;
362 0 : if (num_tcs == 0) average_tc_time = 0;
363 0 : else average_tc_time /= num_tcs;
364 0 : if (!quiet) {
365 0 : fmt::print("\t\tAverage TA Time Process ({} TAs): {} ns.\n", num_tas, average_ta_time);
366 0 : fmt::print("\t\tAverage TC Time Process ({} TCs): {} ns.\n", num_tcs, average_tc_time);
367 : }
368 0 : });
369 :
370 0 : rp.loop(num_rec, skip_rec);
371 :
372 : /* code */
373 0 : return 0;
374 0 : }
|