34 std::map<uint64_t, std::vector<std::unique_ptr<daqdataformats::Fragment>>> _frags,
37 std::string output_filename = _outputfilename;
38 if (output_filename.size() < 5 ||
39 output_filename.compare(output_filename.size() - 5, 5,
".hdf5") != 0) {
40 output_filename +=
".hdf5";
55 std::vector<hdf5libs::HDF5PathParameters> params;
56 params.push_back(params_trigger);
66 std::unique_ptr<hdf5libs::HDF5RawDataFile> output_file = std::make_unique<hdf5libs::HDF5RawDataFile>(
68 _frags.begin()->second[0]->get_run_number(),
70 "emulate_from_tpstream",
75 for (
auto& [slice_id, vec_frags]: _frags) {
79 tsh.
run_number = vec_frags[0]->get_run_number();
85 std::cout <<
"Time slice number: " << slice_id << std::endl;
88 for (std::unique_ptr<daqdataformats::Fragment>& frag_ptr: vec_frags) {
90 std::cout <<
" Writing elementid: " << frag_ptr->get_element_id() <<
" trigger number: " << frag_ptr->get_trigger_number() <<
" trigger_timestamp: " << frag_ptr->get_trigger_timestamp() <<
" window_begin: " << frag_ptr->get_window_begin() <<
" sequence_no: " << frag_ptr->get_sequence_number() << std::endl;
96 output_file->write(ts);
116 std::map<std::string, std::vector<std::shared_ptr<hdf5libs::HDF5RawDataFile>>> files_sorted;
119 for (
const std::string& file : _files) {
120 std::shared_ptr<hdf5libs::HDF5RawDataFile> fl = std::make_shared<hdf5libs::HDF5RawDataFile>(file);
121 if (!fl->is_timeslice_type()) {
122 throw std::runtime_error(fmt::format(
"ERROR: input file '{}' not of type 'TimeSlice'", file));
125 std::string application_name = fl->get_attribute<std::string>(
"application_name");
126 files_sorted[application_name].push_back(fl);
130 for (
auto& [app_name, vec_files]: files_sorted) {
132 if (vec_files.size() <= 1) {
137 std::sort(vec_files.begin(), vec_files.end(),
138 [](
const std::shared_ptr<hdf5libs::HDF5RawDataFile>& a,
const std::shared_ptr<hdf5libs::HDF5RawDataFile>& b) {
139 return a->get_attribute<size_t>(
"file_index") <
140 b->get_attribute<size_t>(
"file_index");
171 if (_files.empty()) {
172 throw std::runtime_error(
"No files provided");
175 uint64_t global_start = std::numeric_limits<uint64_t>::min();
176 uint64_t global_end = std::numeric_limits<uint64_t>::max();
179 for (
auto& [appid, vec_files]: _files) {
180 uint64_t app_start = std::numeric_limits<uint64_t>::max();
181 uint64_t app_end = std::numeric_limits<uint64_t>::min();
184 for (
auto& file : vec_files) {
185 auto record_ids = file->get_all_record_ids();
186 if (record_ids.empty()) {
187 throw std::runtime_error(fmt::format(
"File from application {} contains no records.", appid));
189 app_start = std::min(app_start, record_ids.begin()->first);
190 app_end = std::max(app_end, record_ids.rbegin()->first);
194 global_start = std::max(global_start, app_start);
195 global_end = std::min(global_end, app_end);
198 std::cout <<
"Application: " << appid <<
" " <<
" TimeSliceID start: " << app_start <<
" end: " << app_end << std::endl;
203 std::cout <<
"Global start: " << global_start <<
" Global end: " << global_end << std::endl;
205 if (global_start > global_end) {
206 throw std::runtime_error(
"One of the provided files' id range did not overlap with the rest. Please select files with overlapping TimeSlice IDs");
210 for (
auto& [appid, vec_files]: _files) {
211 for (
auto& file : vec_files) {
212 auto record_ids = file->get_all_record_ids();
214 uint64_t file_start = record_ids.begin()->first;
215 uint64_t file_end = record_ids.rbegin()->first;
216 if ((file_start > global_end || file_end < global_start)) {
217 uint64_t file_index = file->get_attribute<
size_t>(
"file_index");
218 throw std::runtime_error(fmt::format(
219 "File from TPStreamWrite application '{}' (index '{}') has record id range [{}, {}], which does not overlap with global range [{}, {}].",
220 appid, file_index, file_start, file_end, global_start, global_end
226 return {global_start, global_end};
282int main(
int argc,
char const *argv[])
285 CLI::App app{
"Offline trigger TriggerActivity & TriggerCandidate emulatior"};
291 app.parse(argc, argv);
293 catch (
const CLI::ParseError &e) {
298 std::ifstream config_stream(opts.config_name);
299 nlohmann::json config = nlohmann::json::parse(config_stream);
302 std::cout <<
"Files to process:\n";
303 for (
const std::string& file : opts.input_files) {
304 std::cout <<
"- " << file <<
"\n";
309 std::map<std::string, std::vector<std::shared_ptr<hdf5libs::HDF5RawDataFile>>> sorted_files =
314 std::pair<uint64_t, uint64_t> processing_range = recordid_range;
316 if (opts.slice_start.has_value()) {
317 processing_range.first = std::max(processing_range.first, opts.slice_start.value());
320 if (opts.num_slices.has_value()) {
321 if (opts.num_slices.value() == 0) {
322 throw std::runtime_error(
"Invalid --num-slices value: must be greater than 0");
325 uint64_t requested_end = processing_range.first + (opts.num_slices.value() - 1);
326 if (requested_end < processing_range.first) {
327 requested_end = std::numeric_limits<uint64_t>::max();
330 processing_range.second = std::min(processing_range.second, requested_end);
333 if (processing_range.first > processing_range.second) {
334 throw std::runtime_error(fmt::format(
335 "Requested timeslice subset (slice-start={}, num-slices={}) does not overlap available range [{}, {}]",
336 opts.slice_start.value_or(0),
337 opts.num_slices.value_or(0),
338 recordid_range.first,
339 recordid_range.second));
343 std::cout <<
"Processing TimeSliceID range: [" << processing_range.first <<
", " << processing_range.second <<
"]" << std::endl;
347 std::vector<std::unique_ptr<TAEmulationWorker>> ta_emu_workers;
348 for (
auto [name, files] : sorted_files) {
349 ta_emu_workers.push_back(std::make_unique<TAEmulationWorker>(files, config, processing_range, opts.run_parallel, opts.quiet));
353 for (
const auto& handler : ta_emu_workers) {
354 handler->start_processing();
358 std::map<uint64_t, std::vector<triggeralgs::TriggerActivity>> tas;
359 auto append_tas = [&tas](std::map<uint64_t, std::vector<triggeralgs::TriggerActivity>>&& _tas) {
360 for (
auto& [sliceid, src_vec] : _tas) {
361 auto& dest_vec = tas[sliceid];
362 dest_vec.reserve(dest_vec.size() + src_vec.size());
364 dest_vec.insert(dest_vec.end(), std::make_move_iterator(src_vec.begin()), std::make_move_iterator(src_vec.end()));
370 std::map<uint64_t, std::vector<std::unique_ptr<daqdataformats::Fragment>>> frags;
371 auto append_frags = [&frags](std::map<uint64_t, std::vector<std::unique_ptr<daqdataformats::Fragment>>>&& _frags) {
372 for (
auto& [sliceid, src_vec] : _frags) {
373 auto& dest_vec = frags[sliceid];
375 dest_vec.reserve(dest_vec.size() + src_vec.size());
377 dest_vec.insert(dest_vec.end(), std::make_move_iterator(src_vec.begin()), std::make_move_iterator(src_vec.end()));
385 for (
const auto& handler : ta_emu_workers) {
387 handler->wait_to_complete_work();
390 append_tas(std::move(handler->get_tas()));
391 append_frags(std::move(handler->get_frags()));
395 sourceid_geoid_map.insert(map.begin(), map.end());
400 for (
auto& [sliceid, vec_tas]: tas) {
401 std::sort(vec_tas.begin(), vec_tas.end(),
403 return std::tie(a.time_start, a.channel_start, a.time_end) <
404 std::tie(b.time_start, b.channel_start, b.time_end);
406 n_tas += vec_tas.size();
410 std::cout <<
"Total number of TAs made: " << n_tas << std::endl;
411 std::cout <<
"Creating a TCMaker..." << std::endl;
414 std::string algo_name = config[
"trigger_candidate_plugin"][0];
415 nlohmann::json algo_config = config[
"trigger_candidate_config"][0];
417 std::unique_ptr<triggeralgs::TriggerCandidateMaker> tc_maker =
419 tc_maker->configure(algo_config);
425 std::vector<triggeralgs::TriggerCandidate> tcs;
426 for (
auto& [sliceid, vec_tas]: tas) {
427 std::unique_ptr<daqdataformats::Fragment> tc_frag = tc_emulator.
emulate_vector(vec_tas);
436 tc_frag->set_header_fields(frag_hdr);
437 tc_frag->set_type(daqdataformats::FragmentType::kTriggerCandidate);
438 tc_frag->set_trigger_number(sliceid);
439 tc_frag->set_window_begin(frags[sliceid][0]->get_window_begin());
440 tc_frag->set_window_end(frags[sliceid][0]->get_window_end());
443 frags[sliceid].push_back(std::move(tc_frag));
446 tcs.reserve(tcs.size() + tmp_tcs.size());
447 tcs.insert(tcs.end(), std::make_move_iterator(tmp_tcs.begin()), std::make_move_iterator(tmp_tcs.end()));
450 std::cout <<
"Total number of TCs made: " << tcs.size() << std::endl;
453 if (!frags.empty()) {
454 save_fragments(opts.output_filename, sourceid_geoid_map, std::move(frags), opts.quiet);
456 std::cout <<
"No TA/TC fragments generated. Output file will not be generated" << std::endl;