DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
process_tpstream.cxx File Reference
#include "trgtools/EmulateTAUnit.hpp"
#include "trgtools/EmulateTCUnit.hpp"
#include "CLI/App.hpp"
#include "CLI/Config.hpp"
#include "CLI/Formatter.hpp"
#include <fmt/core.h>
#include <fmt/format.h>
#include <fmt/chrono.h>
#include <filesystem>
#include "hdf5libs/HDF5RawDataFile.hpp"
#include "trgdataformats/TriggerPrimitive.hpp"
#include "triggeralgs/TriggerActivityFactory.hpp"
#include "triggeralgs/TriggerCandidateFactory.hpp"
#include "triggeralgs/TriggerObjectOverlay.hpp"
#include "detchannelmaps/TPCChannelMap.hpp"
Include dependency graph for process_tpstream.cxx:

Go to the source code of this file.

Classes

class  TimeSliceProcessor
 

Functions

int main (int argc, char const *argv[])
 

Function Documentation

◆ main()

int main ( int argc,
char const * argv[] )

Definition at line 153 of file process_tpstream.cxx.

154{
155
156 CLI::App app{"tapipe"};
157 // argv = app.ensure_utf8(argv);
158
159 std::string input_file_path;
160 app.add_option("-i", input_file_path, "Input TPStream file path")->required();
161 std::string output_file_path;
162 app.add_option("-o", output_file_path, "Output TPStream file path");
163 std::string channel_map_name = "VDColdboxTPCChannelMap";
164 app.add_option("-m", channel_map_name, "Detector Channel Map");
165 std::string config_name;
166 app.add_option("-j", config_name, "Trigger Activity and Candidate config JSON to use.")->required();
167 uint64_t skip_rec(0);
168 app.add_option("-s", skip_rec, "Skip records");
169 uint64_t num_rec(0);
170 app.add_option("-n", num_rec, "Process records");
171
172 bool quiet = false;
173 app.add_flag("--quiet", quiet, "Quiet outputs.");
174
175 bool latencies = false;
176 app.add_flag("--latencies", latencies, "Saves latencies per TP into csv");
177 CLI11_PARSE(app, argc, argv);
178
179
180 if (!quiet)
181 fmt::print("TPStream file: {}\n", input_file_path);
182
183 TimeSliceProcessor rp(input_file_path, output_file_path);
184
185 // TP source id (subsystem)
186 auto tp_subsystem_requirement = daqdataformats::SourceID::Subsystem::kTrigger;
187
188 auto channel_map = dunedaq::detchannelmaps::make_tpc_map(channel_map_name);
189
190 // Read configuration
191 std::ifstream config_stream(config_name);
192 nlohmann::json config = nlohmann::json::parse(config_stream);
193
194 // Only use the first plugin for now.
195 nlohmann::json ta_algo = config["trigger_activity_plugin"][0];
196 nlohmann::json ta_config = config["trigger_activity_config"][0];
197
198 nlohmann::json tc_algo = config["trigger_candidate_plugin"][0];
199 nlohmann::json tc_config = config["trigger_candidate_config"][0];
200
201
202 // Finally create a TA maker
203 std::unique_ptr<triggeralgs::TriggerActivityMaker> ta_maker =
205 ta_maker->configure(ta_config);
206 std::unique_ptr<trgtools::EmulateTAUnit> ta_emulator = std::make_unique<trgtools::EmulateTAUnit>();
207 ta_emulator->set_maker(ta_maker);
208 // TODO: Use a better file naming scheme for CSV.
209 if (latencies) {
210 std::filesystem::path output_path(output_file_path);
211 ta_emulator->set_timing_file((output_path.parent_path() / ("ta_timings_" + output_path.stem().string() + ".csv")).string());
212 ta_emulator->write_csv_header("TP Time Start,TP ADC Integral,Time Diffs,Is Last TP In TA");
213 }
214
215 // Finally create a TA maker
216 std::unique_ptr<triggeralgs::TriggerCandidateMaker> tc_maker =
218 tc_maker->configure(tc_config);
219 std::unique_ptr<trgtools::EmulateTCUnit> tc_emulator = std::make_unique<trgtools::EmulateTCUnit>();
220 tc_emulator->set_maker(tc_maker);
221 // TODO: Use a better file naming scheme for CSV.
222 if (latencies) {
223 std::filesystem::path output_path(output_file_path);
224 tc_emulator->set_timing_file((output_path.parent_path() / ("tc_timings_" + output_path.stem().string() + ".csv")).string());
225 tc_emulator->write_csv_header("Time Diffs");
226 }
227
228 // Generic filter hook
229 std::function<bool(const trgdataformats::TriggerPrimitive&)> tp_filter;
230
231 auto z_plane_filter = [&]( const trgdataformats::TriggerPrimitive& tp ) -> bool {
232 return (channel_map->get_plane_from_offline_channel(tp.channel) != 2);
233 };
234
235 tp_filter = z_plane_filter;
236
237 rp.set_processor([&]( daqdataformats::TimeSlice& tsl ) -> void {
238 const std::vector<std::unique_ptr<daqdataformats::Fragment>>& frags = tsl.get_fragments_ref();
239 const size_t num_frags = frags.size();
240 if(!quiet)
241 fmt::print("The number of fragments: {}\n", num_frags);
242
243 uint64_t average_ta_time = 0;
244 uint64_t average_tc_time = 0;
245
246 size_t num_tas = 0;
247 size_t num_tcs = 0;
248
249 // Need a static for-loop: adding fragments to tsl will mutate frags even though it's const.
250 for (size_t i = 0; i < num_frags; i++) {
251 const auto& frag = frags[i];
252
253 // The fragment has to be for the trigger (not e.g. for retreival from readout)
254 if (frag->get_element_id().subsystem != tp_subsystem_requirement) {
255 if(!quiet)
256 fmt::print(" Warning, got non kTrigger SourceID {}\n", frag->get_element_id().to_string());
257 continue;
258 }
259
260 // The fragment has to be TriggerPrimitive
261 if(frag->get_fragment_type() != daqdataformats::FragmentType::kTriggerPrimitive){
262 if(!quiet)
263 fmt::print(" Error: FragmentType is: {}!\n", fragment_type_to_string(frag->get_fragment_type()));
264 continue;
265 }
266
267 // This bit should be outside the loop
268 if (!quiet)
269 fmt::print(" Fragment id: {} [{}]\n", frag->get_element_id().to_string(), daqdataformats::fragment_type_to_string(frag->get_fragment_type()));
270
271 // Pull tps out
272 size_t n_tps = frag->get_data_size()/sizeof(trgdataformats::TriggerPrimitive);
273 if (!quiet) {
274 fmt::print(" TP fragment size: {}\n", frag->get_data_size());
275 fmt::print(" Num TPs: {}\n", n_tps);
276 }
277
278 // Create a TP buffer
279 std::vector<trgdataformats::TriggerPrimitive> tp_buffer;
280 // Prepare the TP buffer, checking for time ordering
281 tp_buffer.reserve(tp_buffer.size()+n_tps);
282
283 // Populate the TP buffer
284 trgdataformats::TriggerPrimitive* tp_array = static_cast<trgdataformats::TriggerPrimitive*>(frag->get_data());
285 uint64_t last_ts = 0;
286 for(size_t i(0); i<n_tps; ++i) {
287 auto& tp = tp_array[i];
288 if (tp.time_start <= last_ts && !quiet) {
289 fmt::print(" ERROR: {} {} ", +tp.time_start, last_ts );
290 }
291 tp_buffer.push_back(tp);
292 }
293
294 // Print some useful info
295 uint64_t d_ts = tp_array[n_tps-1].time_start - tp_array[0].time_start;
296 if (!quiet)
297 fmt::print(" TS gap: {} {} ms\n", d_ts, d_ts*16.0/1'000'000);
298
299 //
300 // TA Processing
301 //
302
303 const auto ta_start = std::chrono::steady_clock::now();
304 std::unique_ptr<daqdataformats::Fragment> ta_frag = ta_emulator->emulate_vector(tp_buffer);
305 const auto ta_end = std::chrono::steady_clock::now();
306
307 if (ta_frag == nullptr) // Buffer was empty.
308 continue;
309 num_tas += ta_emulator->get_last_output_buffer().size();
310
311 // TA time calculation.
312 const uint64_t ta_diff = std::chrono::nanoseconds(ta_end - ta_start).count();
313 average_ta_time += ta_diff;
314 if (!quiet) {
315 fmt::print("\tTA Time Process: {} ns.\n", ta_diff);
316 }
317
318 daqdataformats::FragmentHeader frag_hdr = frag->get_header();
319
320 // Customise the source id (add 1000 to id)
321 frag_hdr.element_id = daqdataformats::SourceID{daqdataformats::SourceID::Subsystem::kTrigger, frag->get_element_id().id+1000};
322
323 ta_frag->set_header_fields(frag_hdr);
324 ta_frag->set_type(daqdataformats::FragmentType::kTriggerActivity);
325
326
327 tsl.add_fragment(std::move(ta_frag));
328 //
329 // TA Processing Ends
330 //
331
332 //
333 // TC Processing
334 //
335
336 std::vector<triggeralgs::TriggerActivity> ta_buffer = ta_emulator->get_last_output_buffer();
337 const auto tc_start = std::chrono::steady_clock::now();
338 std::unique_ptr<daqdataformats::Fragment> tc_frag = tc_emulator->emulate_vector(ta_buffer);
339 const auto tc_end = std::chrono::steady_clock::now();
340
341 if (tc_frag == nullptr) // Buffer was empty.
342 continue;
343 num_tcs += tc_emulator->get_last_output_buffer().size();
344
345 // TC time calculation.
346 const uint64_t tc_diff = std::chrono::nanoseconds(tc_end - tc_start).count();
347 average_tc_time += tc_diff;
348 if (!quiet) {
349 fmt::print("\tTC Time Process: {} ns.\n", tc_diff);
350 }
351
352 // Shares the same frag_hdr.
353 tc_frag->set_header_fields(frag_hdr);
354 tc_frag->set_type(daqdataformats::FragmentType::kTriggerCandidate);
355
356 tsl.add_fragment(std::move(tc_frag));
357
358 } // Fragment for loop
359
360 if (num_tas == 0) average_ta_time = 0;
361 else average_ta_time /= num_tas;
362 if (num_tcs == 0) average_tc_time = 0;
363 else average_tc_time /= num_tcs;
364 if (!quiet) {
365 fmt::print("\t\tAverage TA Time Process ({} TAs): {} ns.\n", num_tas, average_ta_time);
366 fmt::print("\t\tAverage TC Time Process ({} TCs): {} ns.\n", num_tcs, average_tc_time);
367 }
368 });
369
370 rp.loop(num_rec, skip_rec);
371
372 /* code */
373 return 0;
374}
static std::shared_ptr< AbstractFactory< TriggerActivityMaker > > get_instance()
std::string fragment_type_to_string(const FragmentType &type)
Convert a FragmentType enum value to string.