1#ifndef TRGTOOLS_TAFILEHANDLER_CPP_
2#define TRGTOOLS_TAFILEHANDLER_CPP_
12 nlohmann::json config,
13 std::pair<uint64_t, uint64_t> sliceid_range,
16 : m_input_files(input_files),
17 m_sliceid_range(sliceid_range),
18 m_run_parallel(run_parallel),
22 std::string algo_name = config[
"trigger_activity_plugin"][0];
23 nlohmann::json algo_config = config[
"trigger_activity_config"][0];
27 std::vector<daqdataformats::run_number_t> run_numbers;
28 std::vector<size_t> file_indices;
29 for (
const auto& input_file : input_files) {
30 if (std::find(run_numbers.begin(), run_numbers.end(),
36 if (std::find(file_indices.begin(), file_indices.end(),
39 file_indices.push_back(input_file->get_attribute<
size_t>(
"file_index"));
43 std::string application_name =
m_input_files.front()->get_attribute<std::string>(
"application_name");
46 fmt::print(
"Run Numbers: {}\nFile Indices: {}\nApp name: '{}'\n", fmt::join(run_numbers,
","), fmt::join(file_indices,
","), application_name);
55 fmt::print(
"Number of makers to make: {}\n", valid_sources.size());
59 std::unique_ptr<triggeralgs::TriggerActivityMaker> ta_maker =
61 ta_maker->configure(algo_config);
74std::vector<daqdataformats::SourceID>
79 std::vector<daqdataformats::SourceID> ret;
80 for (
const auto& fragment : fragments) {
97 throw "Files not set yet!";
107 std::function<void()> task;
139 auto records = input_file->get_all_record_ids();
141 for (
const auto& record : records) {
144 fmt::print(
" Will not process RecordID {} because it's outside of our range!", record.first);
153 for (
const auto& fragment : fragments) {
161 size_t n_tps = fragment->get_data_size()/
SIZE_TP;
163 fmt::print(
" TP fragment size: {}\n", fragment->get_data_size());
164 fmt::print(
" Num TPs: {}\n", n_tps);
168 std::vector<trgdataformats::TriggerPrimitive> tp_buffer;
170 tp_buffer.reserve(n_tps);
174 uint64_t last_ts = 0;
175 for(
size_t tpid(0); tpid<n_tps; ++tpid) {
176 auto& tp = tp_array[tpid];
177 if (tp.time_start <= last_ts && !
m_quiet) {
178 fmt::print(
" ERROR: {} {} ", +tp.time_start, last_ts );
180 tp_buffer.push_back(tp);
190 enqueue_task([
this, sid, record, frag_hdr, tp_buffer = std::move(tp_buffer)]()
mutable {
191 this->
process_task(sid, record.first, frag_hdr, std::move(tp_buffer));
195 this->
process_task(sid, record.first, frag_hdr, std::move(tp_buffer));
206 for (
auto& [key, vec_tas]:
m_tas) {
207 total += vec_tas.size();
209 std::cout <<
"We have a total of " << total <<
" TAs!" << std::endl;
239 fmt::print(
"TAFileHandler_{} work completed\n",
m_id);
251 fmt::print(
"m_stop issued\n");
261 std::vector<trgdataformats::TriggerPrimitive>&& _tps)
264 std::unique_ptr<daqdataformats::Fragment> frag =
m_ta_emulators[_source_id]->emulate_vector(_tps);
272 std::vector<triggeralgs::TriggerActivity> ta_buffer =
m_ta_emulators[_source_id]->get_last_output_buffer();
275 size_t n_tas = ta_buffer.size();
281 fmt::print(
" Found {} TAs!\n", n_tas);
290 m_tas[_rec].insert(
m_tas[_rec].end(), std::make_move_iterator(ta_buffer.begin()), std::make_move_iterator(ta_buffer.end()));
292 frag->set_header_fields(_header);
301 return std::move(
m_tas);
std::map< daqdataformats::SourceID, std::vector< uint64_t > > source_id_geo_id_map_t
static std::shared_ptr< AbstractFactory< TriggerActivityMaker > > get_instance()
FELIX Initialization std::string initerror FELIX queue timed std::string queuename Unexpected chunk size
Cannot add TPSet with sourceid