DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
TAFileHandler.cpp
Go to the documentation of this file.
1#ifndef TRGTOOLS_TAFILEHANDLER_CPP_
2#define TRGTOOLS_TAFILEHANDLER_CPP_
3
5
6namespace dunedaq::trgtools
7{
8
9uint16_t TAFileHandler::m_id_next = 0;
10
11TAFileHandler::TAFileHandler(std::vector<std::shared_ptr<hdf5libs::HDF5RawDataFile>> input_files,
12 nlohmann::json config,
13 std::pair<uint64_t, uint64_t> sliceid_range,
14 bool run_parallel,
15 bool quiet)
16 : m_input_files(input_files),
17 m_sliceid_range(sliceid_range),
18 m_run_parallel(run_parallel),
19 m_quiet(quiet),
20 m_id(m_id_next++)
21{
22 std::string algo_name = config["trigger_activity_plugin"][0];
23 nlohmann::json algo_config = config["trigger_activity_config"][0];
24
25 // Get the input file
26 // Extract the run number etc
27 std::vector<daqdataformats::run_number_t> run_numbers;
28 std::vector<size_t> file_indices;
29 for (const auto& input_file : input_files) {
30 if (std::find(run_numbers.begin(), run_numbers.end(),
31 input_file->get_attribute<daqdataformats::run_number_t>("run_number")) ==
32 run_numbers.end()) {
33 run_numbers.push_back(input_file->get_attribute<daqdataformats::run_number_t>("run_number"));
34 }
35
36 if (std::find(file_indices.begin(), file_indices.end(),
37 input_file->get_attribute<daqdataformats::run_number_t>("run_number")) ==
38 file_indices.end()) {
39 file_indices.push_back(input_file->get_attribute<size_t>("file_index"));
40 }
41 }
42
43 std::string application_name = m_input_files.front()->get_attribute<std::string>("application_name");
44
45 if (!m_quiet) {
46 fmt::print("Run Numbers: {}\nFile Indices: {}\nApp name: '{}'\n", fmt::join(run_numbers, ","), fmt::join(file_indices, ","), application_name);
47 }
48
49 // std::set of record IDs (pair of record number & sequence number)
50 auto records = m_input_files.front()->get_all_record_ids();
51
52 // Extract the number of TAMakers to create
53 daqdataformats::TimeSlice first_timeslice = m_input_files.front()->get_timeslice(*records.begin());
54 std::vector<daqdataformats::SourceID> valid_sources = get_valid_sourceids(first_timeslice);
55 fmt::print("Number of makers to make: {}\n", valid_sources.size());
56
57 for (const daqdataformats::SourceID& sid : valid_sources) {
58 // Create TAMaker
59 std::unique_ptr<triggeralgs::TriggerActivityMaker> ta_maker =
61 ta_maker->configure(algo_config);
62
63 // Add it to the enulators
64 m_ta_emulators[sid] = std::make_unique<trgtools::EmulateTAUnit>();
65 m_ta_emulators[sid]->set_maker(ta_maker);
66
67 // Create a worker thread per emulator
68 if (m_run_parallel) {
70 }
71 }
72}
73
74std::vector<daqdataformats::SourceID>
76{
77 const auto& fragments = _timeslice.get_fragments_ref();
78
79 std::vector<daqdataformats::SourceID> ret;
80 for (const auto& fragment : fragments) {
81 if (fragment->get_fragment_type() != daqdataformats::FragmentType::kTriggerPrimitive) {
82 continue;
83 }
84
85 daqdataformats::SourceID sourceid = fragment->get_element_id();
86
87 ret.push_back(sourceid);
88 }
89
90 return ret;
91}
92
95{
96 if (!m_input_files.size()) {
97 throw "Files not set yet!";
98 }
99
100 return m_input_files.front()->get_srcid_geoid_map();
101}
102
104{
105 while (true) {
107 std::function<void()> task;
108 {
109 std::unique_lock<std::mutex> lock(m_queue_mutex);
110 m_condition.wait(lock, [this]() {return m_stop || !m_task_queue.empty(); });
111
112 if (m_stop && m_task_queue.empty()) {
113 return;
114 }
115
116 task = std::move(m_task_queue.front());
117 m_task_queue.pop();
118 }
119
120 // Run & complete a task
121 task();
122
123 // Notify that task was completed
124 {
125 std::lock_guard<std::mutex> lock(m_queue_mutex);
127 if (m_active_tasks == 0) {
128 m_task_complete_condition.notify_all();
129 }
130 }
131 }
132}
133
135{
136 // Iterate over the input files
137 for (auto& input_file: m_input_files) {
138 // std::set of record IDs (pair of record number & sequence number)
139 auto records = input_file->get_all_record_ids();
140
141 for (const auto& record : records) {
142 if (record.first < m_sliceid_range.first || record.first > m_sliceid_range.second) {
143 if (!m_quiet)
144 fmt::print(" Will not process RecordID {} because it's outside of our range!", record.first);
145 continue;
146 }
147
148 // Get all the fragments
149 daqdataformats::TimeSlice timeslice = input_file->get_timeslice(record);
150 const auto& fragments = timeslice.get_fragments_ref();
151
152 // Iterate over the fragments & process each fragment
153 for (const auto& fragment : fragments) {
154 daqdataformats::SourceID sid = fragment->get_element_id();
155
156 if (!m_ta_emulators.contains(sid)) {
157 continue;
158 }
159
160 // Pull tps out
161 size_t n_tps = fragment->get_data_size()/SIZE_TP;
162 if (!m_quiet) {
163 fmt::print(" TP fragment size: {}\n", fragment->get_data_size());
164 fmt::print(" Num TPs: {}\n", n_tps);
165 }
166
167 // Create a TP buffer
168 std::vector<trgdataformats::TriggerPrimitive> tp_buffer;
169 // Prepare the TP buffer, checking for time ordering
170 tp_buffer.reserve(n_tps);
171
172 // Populate the TP buffer
173 trgdataformats::TriggerPrimitive* tp_array = static_cast<trgdataformats::TriggerPrimitive*>(fragment->get_data());
174 uint64_t last_ts = 0;
175 for(size_t tpid(0); tpid<n_tps; ++tpid) {
176 auto& tp = tp_array[tpid];
177 if (tp.time_start <= last_ts && !m_quiet) {
178 fmt::print(" ERROR: {} {} ", +tp.time_start, last_ts );
179 }
180 tp_buffer.push_back(tp);
181 }
182
183 daqdataformats::FragmentHeader frag_hdr = fragment->get_header();
184
185 // Customise the source id (add 1000 to id)
186 frag_hdr.element_id = daqdataformats::SourceID{daqdataformats::SourceID::Subsystem::kTrigger, fragment->get_element_id().id+1000};
187
188 // Either enqueue the task if using parallel processing, or execute the task now
189 if (m_run_parallel) {
190 enqueue_task([this, sid, record, frag_hdr, tp_buffer = std::move(tp_buffer)]() mutable {
191 this->process_task(sid, record.first, frag_hdr, std::move(tp_buffer));
192 });
193 }
194 else {
195 this->process_task(sid, record.first, frag_hdr, std::move(tp_buffer));
196 }
197 }
198 // If running in parallel, wait to process entire slice before we move to
199 // the next one
200 if (m_run_parallel) {
202 }
203 }
204
205 size_t total = 0;
206 for (auto& [key, vec_tas]: m_tas) {
207 total += vec_tas.size();
208 }
209 std::cout << "We have a total of " << total << " TAs!" << std::endl;
210 }
211}
212
217
218
219void TAFileHandler::enqueue_task(std::function<void()> task)
220{
221 {
222 std::lock_guard<std::mutex> lock(m_queue_mutex);
223 m_task_queue.push(std::move(task));
225 }
226 m_condition.notify_one();
227}
228
230{
231 std::unique_lock<std::mutex> lock(m_queue_mutex);
232 m_task_complete_condition.wait(lock, [this]() { return m_active_tasks == 0; });
233}
234
236{
237 // Wait for the main threads to join
238 m_main_thread.join();
239 fmt::print("TAFileHandler_{} work completed\n", m_id);
240
241 // Wait for the tasks to complete
242 if (m_run_parallel) {
244
245 {
246 std::lock_guard<std::mutex> lock(m_queue_mutex);
247 m_stop = true;
248 }
249 m_condition.notify_all();
250
251 fmt::print("m_stop issued\n");
252 for (std::thread& thread : m_thread_pool) {
253 thread.join();
254 }
255 }
256}
257
259 uint64_t _rec,
261 std::vector<trgdataformats::TriggerPrimitive>&& _tps)
262{
263 // Get te last fragment
264 std::unique_ptr<daqdataformats::Fragment> frag = m_ta_emulators[_source_id]->emulate_vector(_tps);
265
266 // Don't do anything if no fragments found
267 if (!frag) {
268 return;
269 }
270
271 // Get all the TriggerActivities from the TA Emulator buffer
272 std::vector<triggeralgs::TriggerActivity> ta_buffer = m_ta_emulators[_source_id]->get_last_output_buffer();
273
274 // Don't continue if no TAs found
275 size_t n_tas = ta_buffer.size();
276 if (!n_tas) {
277 return;
278 }
279
280 if (!m_quiet && n_tas) {
281 fmt::print(" Found {} TAs!\n", n_tas);
282 }
283
284 // Set the fragment header & push into our output (with locking!)
285 {
286 if (m_run_parallel) {
287 std::lock_guard<std::mutex> lock(m_savetps_mutex);
288 }
289 m_tas[_rec].reserve(m_tas[_rec].size() + ta_buffer.size());
290 m_tas[_rec].insert(m_tas[_rec].end(), std::make_move_iterator(ta_buffer.begin()), std::make_move_iterator(ta_buffer.end()));
291
292 frag->set_header_fields(_header);
294
295 m_ta_fragments[_rec].push_back(std::move(frag));
296 }
297}
298
299std::map<uint64_t, std::vector<triggeralgs::TriggerActivity>> TAFileHandler::get_tas()
300{
301 return std::move(m_tas);
302}
303
304std::map<uint64_t, std::vector<std::unique_ptr<daqdataformats::Fragment>>> TAFileHandler::get_frags()
305{
306 return std::move(m_ta_fragments);
307}
308
309
310}; // namespace dunedaq::trgtools
311
312#endif //TRGTOOLS_TAFILEHANDLER_CXX_
C++ Representation of a DUNE TimeSlice, consisting of a TimeSliceHeader object and a vector of pointe...
Definition TimeSlice.hpp:27
const std::vector< std::unique_ptr< Fragment > > & get_fragments_ref() const
Get a handle to the Fragments.
Definition TimeSlice.hpp:62
std::map< daqdataformats::SourceID, std::vector< uint64_t > > source_id_geo_id_map_t
TAFileHandler(std::vector< std::shared_ptr< hdf5libs::HDF5RawDataFile > > _input_files, nlohmann::json _config, std::pair< uint64_t, uint64_t > _sliceid_range, bool _run_parallel, bool _quiet)
Constructor, takes file input path & configuration.
const bool m_run_parallel
Run the TA makers in parllel.
std::condition_variable m_task_complete_condition
std::vector< std::thread > m_thread_pool
std::vector< std::shared_ptr< hdf5libs::HDF5RawDataFile > > m_input_files
A pointer to the input file.
static const size_t SIZE_TP
Size of the TPS.
std::map< uint64_t, std::vector< std::unique_ptr< daqdataformats::Fragment > > > m_ta_fragments
Output vector of TA fragments.
std::map< uint64_t, std::vector< triggeralgs::TriggerActivity > > get_tas()
Retrieves all the TAs with std::move operator.
std::vector< daqdataformats::SourceID > get_valid_sourceids(daqdataformats::TimeSlice &_timeslice)
Get the valid sourceids object from HDF5 file.
std::condition_variable m_condition
static uint16_t m_id_next
Global variable used to get the next ID.
void wait_to_complete_work()
Waits for all the tasks to complete.
void wait_to_complete_tasks()
Waits to complete a task.
const bool m_quiet
Quiet down the cout output.
void process_tasks()
Function that processes the whole file.
std::map< daqdataformats::SourceID, std::unique_ptr< trgtools::EmulateTAUnit > > m_ta_emulators
Map of SourceID : Emulator unit (TAMaker)
void process_task(daqdataformats::SourceID _source_id, uint64_t _rec, daqdataformats::FragmentHeader _header, std::vector< trgdataformats::TriggerPrimitive > &&_tps)
Function that processes one slice for one plane.
std::thread m_main_thread
The file handler thread.
void enqueue_task(std::function< void()> task)
Enqueues task to process.
std::mutex m_savetps_mutex
Mutex for saving the TPs.
std::queue< std::function< void()> > m_task_queue
std::map< uint64_t, std::vector< triggeralgs::TriggerActivity > > m_tas
Output vector of TAs.
std::atomic< size_t > m_active_tasks
void worker_thread()
Creates & runs a worker thread.
uint16_t m_id
Unique ID for this TAFileHandler.
std::map< uint64_t, std::vector< std::unique_ptr< daqdataformats::Fragment > > > get_frags()
Retrieves all the unique pointers to the TA fragments.
std::pair< uint64_t, uint64_t > m_sliceid_range
Range of TimeSlice IDs to process.
void start_processing()
User interaction for task processing.
hdf5libs::HDF5SourceIDHandler::source_id_geo_id_map_t get_sourceid_geoid_map()
Get the sourceid to geoid map object.
std::atomic< bool > m_stop
Bool to indicate to stop the emulation.
static std::shared_ptr< AbstractFactory< TriggerActivityMaker > > get_instance()
uint32_t run_number_t
Type used to represent run number.
Definition Types.hpp:20
@ kTriggerPrimitive
Trigger format TPs produced by trigger code.
FELIX Initialization std::string initerror FELIX queue timed std::string queuename Unexpected chunk size
Cannot add TPSet with sourceid
The header for a DUNE Fragment.
SourceID element_id
Component that generated the data in this Fragment.
SourceID is a generalized representation of the source of a piece of data in the DAQ....
Definition SourceID.hpp:32
A single energy deposition on a TPC or PDS channel.