33 std::map<uint64_t, std::vector<std::unique_ptr<daqdataformats::Fragment>>> _frags,
48 std::vector<hdf5libs::HDF5PathParameters> params;
49 params.push_back(params_trigger);
59 std::unique_ptr<hdf5libs::HDF5RawDataFile> output_file = std::make_unique<hdf5libs::HDF5RawDataFile>(
60 _outputfilename +
".hdf5",
61 _frags.begin()->second[0]->get_run_number(),
63 "emulate_from_tpstream",
68 for (
auto& [slice_id, vec_frags]: _frags) {
72 tsh.
run_number = vec_frags[0]->get_run_number();
78 std::cout <<
"Time slice number: " << slice_id << std::endl;
81 for (std::unique_ptr<daqdataformats::Fragment>& frag_ptr: vec_frags) {
83 std::cout <<
" Writing elementid: " << frag_ptr->get_element_id() <<
" trigger number: " << frag_ptr->get_trigger_number() <<
" trigger_timestamp: " << frag_ptr->get_trigger_timestamp() <<
" window_begin: " << frag_ptr->get_window_begin() <<
" sequence_no: " << frag_ptr->get_sequence_number() << std::endl;
85 ts.add_fragment(std::move(frag_ptr));
89 output_file->write(
ts);
101 std::map<std::string, std::vector<std::shared_ptr<hdf5libs::HDF5RawDataFile>>> files_sorted;
104 for (
const std::string& file : _files) {
105 std::shared_ptr<hdf5libs::HDF5RawDataFile> fl = std::make_shared<hdf5libs::HDF5RawDataFile>(file);
106 if (!fl->is_timeslice_type()) {
107 throw std::runtime_error(fmt::format(
"ERROR: input file '{}' not of type 'TimeSlice'", file));
110 std::string application_name = fl->get_attribute<std::string>(
"application_name");
111 files_sorted[application_name].push_back(fl);
115 for (
auto& [app_name, vec_files]: files_sorted) {
117 if (vec_files.size() <= 1) {
122 std::sort(vec_files.begin(), vec_files.end(),
123 [](
const std::shared_ptr<hdf5libs::HDF5RawDataFile>& a,
const std::shared_ptr<hdf5libs::HDF5RawDataFile>& b) {
124 return a->get_attribute<size_t>(
"file_index") <
125 b->get_attribute<size_t>(
"file_index");
148 if (_files.empty()) {
149 throw std::runtime_error(
"No files provided");
152 uint64_t global_start = std::numeric_limits<uint64_t>::min();
153 uint64_t global_end = std::numeric_limits<uint64_t>::max();
156 for (
auto& [appid, vec_files]: _files) {
157 uint64_t app_start = std::numeric_limits<uint64_t>::max();
158 uint64_t app_end = std::numeric_limits<uint64_t>::min();
161 for (
auto& file : vec_files) {
162 auto record_ids = file->get_all_record_ids();
163 if (record_ids.empty()) {
164 throw std::runtime_error(fmt::format(
"File from application {} contains no records.", appid));
166 app_start = std::min(app_start, record_ids.begin()->first);
167 app_end = std::max(app_end, record_ids.rbegin()->first);
171 global_start = std::max(global_start, app_start);
172 global_end = std::min(global_end, app_end);
175 std::cout <<
"Application: " << appid <<
" " <<
" TimeSliceID start: " << app_start <<
" end: " << app_end << std::endl;
180 std::cout <<
"Global start: " << global_start <<
" Global end: " << global_end << std::endl;
182 if (global_start > global_end) {
183 throw std::runtime_error(
"One of the provided files' id range did not overlap with the rest. Please select files with overlapping TimeSlice IDs");
187 for (
auto& [appid, vec_files]: _files) {
188 for (
auto& file : vec_files) {
189 auto record_ids = file->get_all_record_ids();
191 uint64_t file_start = record_ids.begin()->first;
192 uint64_t file_end = record_ids.rbegin()->first;
193 if ((file_start > global_end || file_end < global_start)) {
194 uint64_t file_index = file->get_attribute<
size_t>(
"file_index");
195 throw std::runtime_error(fmt::format(
196 "File from TPStreamWrite application '{}' (index '{}') has record id range [{}, {}], which does not overlap with global range [{}, {}].",
197 appid, file_index, file_start, file_end, global_start, global_end
203 return {global_start, global_end};
252int main(
int argc,
char const *argv[])
255 CLI::App app{
"Offline trigger TriggerActivity & TriggerCandidate emulatior"};
261 app.parse(argc, argv);
263 catch (
const CLI::ParseError &e) {
268 std::ifstream config_stream(opts.config_name);
269 nlohmann::json config = nlohmann::json::parse(config_stream);
272 std::cout <<
"Files to process:\n";
273 for (
const std::string& file : opts.input_files) {
274 std::cout <<
"- " << file <<
"\n";
279 std::map<std::string, std::vector<std::shared_ptr<hdf5libs::HDF5RawDataFile>>> sorted_files =
286 std::vector<std::unique_ptr<TAFileHandler>> file_handlers;
287 for (
auto [name, files] : sorted_files) {
288 file_handlers.push_back(std::make_unique<TAFileHandler>(files, config, recordid_range, opts.run_parallel, opts.quiet));
292 for (
const auto& handler : file_handlers) {
293 handler->start_processing();
297 std::map<uint64_t, std::vector<triggeralgs::TriggerActivity>> tas;
298 auto append_tas = [&tas](std::map<uint64_t, std::vector<triggeralgs::TriggerActivity>>&& _tas) {
299 for (
auto& [sliceid, src_vec] : _tas) {
300 auto& dest_vec = tas[sliceid];
301 dest_vec.reserve(dest_vec.size() + src_vec.size());
303 dest_vec.insert(dest_vec.end(), std::make_move_iterator(src_vec.begin()), std::make_move_iterator(src_vec.end()));
309 std::map<uint64_t, std::vector<std::unique_ptr<daqdataformats::Fragment>>> frags;
310 auto append_frags = [&frags](std::map<uint64_t, std::vector<std::unique_ptr<daqdataformats::Fragment>>>&& _frags) {
311 for (
auto& [sliceid, src_vec] : _frags) {
312 auto& dest_vec = frags[sliceid];
314 dest_vec.reserve(dest_vec.size() + src_vec.size());
316 dest_vec.insert(dest_vec.end(), std::make_move_iterator(src_vec.begin()), std::make_move_iterator(src_vec.end()));
324 for (
const auto& handler : file_handlers) {
326 handler->wait_to_complete_work();
329 append_tas(std::move(handler->get_tas()));
330 append_frags(std::move(handler->get_frags()));
334 sourceid_geoid_map.insert(map.begin(), map.end());
339 for (
auto& [sliceid, vec_tas]: tas) {
340 std::sort(vec_tas.begin(), vec_tas.end(),
342 return std::tie(a.time_start, a.channel_start, a.time_end) <
343 std::tie(b.time_start, b.channel_start, b.time_end);
345 n_tas += vec_tas.size();
348 std::cout <<
"Total number of TAs made: " << n_tas << std::endl;
349 std::cout <<
"Creating a TCMaker..." << std::endl;
352 std::string algo_name = config[
"trigger_candidate_plugin"][0];
353 nlohmann::json algo_config = config[
"trigger_candidate_config"][0];
355 std::unique_ptr<triggeralgs::TriggerCandidateMaker> tc_maker =
357 tc_maker->configure(algo_config);
363 std::vector<triggeralgs::TriggerCandidate> tcs;
364 for (
auto& [sliceid, vec_tas]: tas) {
365 std::unique_ptr<daqdataformats::Fragment> tc_frag = tc_emulator.
emulate_vector(vec_tas);
374 tc_frag->set_header_fields(frag_hdr);
375 tc_frag->set_type(daqdataformats::FragmentType::kTriggerCandidate);
376 tc_frag->set_trigger_number(sliceid);
377 tc_frag->set_window_begin(frags[sliceid][0]->get_window_begin());
378 tc_frag->set_window_end(frags[sliceid][0]->get_window_end());
381 frags[sliceid].push_back(std::move(tc_frag));
384 tcs.reserve(tcs.size() + tmp_tcs.size());
385 tcs.insert(tcs.end(), std::make_move_iterator(tmp_tcs.begin()), std::make_move_iterator(tmp_tcs.end()));
388 std::cout <<
"Total number of TCs made: " << tcs.size() << std::endl;
391 SaveFragments(opts.output_filename, sourceid_geoid_map, std::move(frags), opts.quiet);