Line data Source code
1 : #ifndef TRGTOOLS_TAFILEHANDLER_CPP_
2 : #define TRGTOOLS_TAFILEHANDLER_CPP_
3 :
4 : #include "trgtools/TAFileHandler.hpp"
5 :
6 : namespace dunedaq::trgtools
7 : {
8 :
9 : uint16_t TAFileHandler::m_id_next = 0;
10 :
11 0 : TAFileHandler::TAFileHandler(std::vector<std::shared_ptr<hdf5libs::HDF5RawDataFile>> input_files,
12 : nlohmann::json config,
13 : std::pair<uint64_t, uint64_t> sliceid_range,
14 : bool run_parallel,
15 0 : bool quiet)
16 0 : : m_input_files(input_files),
17 0 : m_sliceid_range(sliceid_range),
18 0 : m_run_parallel(run_parallel),
19 0 : m_quiet(quiet),
20 0 : m_id(m_id_next++)
21 : {
22 0 : std::string algo_name = config["trigger_activity_plugin"][0];
23 0 : nlohmann::json algo_config = config["trigger_activity_config"][0];
24 :
25 : // Get the input file
26 : // Extract the run number etc
27 0 : std::vector<daqdataformats::run_number_t> run_numbers;
28 0 : std::vector<size_t> file_indices;
29 0 : for (const auto& input_file : input_files) {
30 0 : if (std::find(run_numbers.begin(), run_numbers.end(),
31 0 : input_file->get_attribute<daqdataformats::run_number_t>("run_number")) ==
32 0 : run_numbers.end()) {
33 0 : run_numbers.push_back(input_file->get_attribute<daqdataformats::run_number_t>("run_number"));
34 : }
35 :
36 0 : if (std::find(file_indices.begin(), file_indices.end(),
37 0 : input_file->get_attribute<daqdataformats::run_number_t>("run_number")) ==
38 0 : file_indices.end()) {
39 0 : file_indices.push_back(input_file->get_attribute<size_t>("file_index"));
40 : }
41 : }
42 :
43 0 : std::string application_name = m_input_files.front()->get_attribute<std::string>("application_name");
44 :
45 0 : if (!m_quiet) {
46 0 : fmt::print("Run Numbers: {}\nFile Indices: {}\nApp name: '{}'\n", fmt::join(run_numbers, ","), fmt::join(file_indices, ","), application_name);
47 : }
48 :
49 : // std::set of record IDs (pair of record number & sequence number)
50 0 : auto records = m_input_files.front()->get_all_record_ids();
51 :
52 : // Extract the number of TAMakers to create
53 0 : daqdataformats::TimeSlice first_timeslice = m_input_files.front()->get_timeslice(*records.begin());
54 0 : std::vector<daqdataformats::SourceID> valid_sources = get_valid_sourceids(first_timeslice);
55 0 : fmt::print("Number of makers to make: {}\n", valid_sources.size());
56 :
57 0 : for (const daqdataformats::SourceID& sid : valid_sources) {
58 : // Create TAMaker
59 0 : std::unique_ptr<triggeralgs::TriggerActivityMaker> ta_maker =
60 0 : triggeralgs::TriggerActivityFactory::get_instance()->build_maker(algo_name);
61 0 : ta_maker->configure(algo_config);
62 :
63 : // Add it to the enulators
64 0 : m_ta_emulators[sid] = std::make_unique<trgtools::EmulateTAUnit>();
65 0 : m_ta_emulators[sid]->set_maker(ta_maker);
66 :
67 : // Create a worker thread per emulator
68 0 : if (m_run_parallel) {
69 0 : m_thread_pool.emplace_back(&TAFileHandler::worker_thread, this);
70 : }
71 0 : }
72 0 : }
73 :
74 : std::vector<daqdataformats::SourceID>
75 0 : TAFileHandler::get_valid_sourceids(daqdataformats::TimeSlice& _timeslice)
76 : {
77 0 : const auto& fragments = _timeslice.get_fragments_ref();
78 :
79 0 : std::vector<daqdataformats::SourceID> ret;
80 0 : for (const auto& fragment : fragments) {
81 0 : if (fragment->get_fragment_type() != daqdataformats::FragmentType::kTriggerPrimitive) {
82 0 : continue;
83 : }
84 :
85 0 : daqdataformats::SourceID sourceid = fragment->get_element_id();
86 :
87 0 : ret.push_back(sourceid);
88 : }
89 :
90 0 : return ret;
91 0 : }
92 :
93 : hdf5libs::HDF5SourceIDHandler::source_id_geo_id_map_t
94 0 : TAFileHandler::get_sourceid_geoid_map()
95 : {
96 0 : if (!m_input_files.size()) {
97 0 : throw "Files not set yet!";
98 : }
99 :
100 0 : return m_input_files.front()->get_srcid_geoid_map();
101 : }
102 :
103 0 : void TAFileHandler::worker_thread()
104 : {
105 0 : while (true) {
106 : /// Get a task from the queue (with locking)
107 0 : std::function<void()> task;
108 0 : {
109 0 : std::unique_lock<std::mutex> lock(m_queue_mutex);
110 0 : m_condition.wait(lock, [this]() {return m_stop || !m_task_queue.empty(); });
111 :
112 0 : if (m_stop && m_task_queue.empty()) {
113 0 : return;
114 : }
115 :
116 0 : task = std::move(m_task_queue.front());
117 0 : m_task_queue.pop();
118 0 : }
119 :
120 : // Run & complete a task
121 0 : task();
122 :
123 : // Notify that task was completed
124 0 : {
125 0 : std::lock_guard<std::mutex> lock(m_queue_mutex);
126 0 : --m_active_tasks;
127 0 : if (m_active_tasks == 0) {
128 0 : m_task_complete_condition.notify_all();
129 : }
130 0 : }
131 0 : }
132 : }
133 :
134 0 : void TAFileHandler::process_tasks()
135 : {
136 : // Iterate over the input files
137 0 : for (auto& input_file: m_input_files) {
138 : // std::set of record IDs (pair of record number & sequence number)
139 0 : auto records = input_file->get_all_record_ids();
140 :
141 0 : for (const auto& record : records) {
142 0 : if (record.first < m_sliceid_range.first || record.first > m_sliceid_range.second) {
143 0 : if (!m_quiet)
144 0 : fmt::print(" Will not process RecordID {} because it's outside of our range!", record.first);
145 0 : continue;
146 : }
147 :
148 : // Get all the fragments
149 0 : daqdataformats::TimeSlice timeslice = input_file->get_timeslice(record);
150 0 : const auto& fragments = timeslice.get_fragments_ref();
151 :
152 : // Iterate over the fragments & process each fragment
153 0 : for (const auto& fragment : fragments) {
154 0 : daqdataformats::SourceID sid = fragment->get_element_id();
155 :
156 0 : if (!m_ta_emulators.contains(sid)) {
157 0 : continue;
158 : }
159 :
160 : // Pull tps out
161 0 : size_t n_tps = fragment->get_data_size()/SIZE_TP;
162 0 : if (!m_quiet) {
163 0 : fmt::print(" TP fragment size: {}\n", fragment->get_data_size());
164 0 : fmt::print(" Num TPs: {}\n", n_tps);
165 : }
166 :
167 : // Create a TP buffer
168 0 : std::vector<trgdataformats::TriggerPrimitive> tp_buffer;
169 : // Prepare the TP buffer, checking for time ordering
170 0 : tp_buffer.reserve(n_tps);
171 :
172 : // Populate the TP buffer
173 0 : trgdataformats::TriggerPrimitive* tp_array = static_cast<trgdataformats::TriggerPrimitive*>(fragment->get_data());
174 0 : uint64_t last_ts = 0;
175 0 : for(size_t tpid(0); tpid<n_tps; ++tpid) {
176 0 : auto& tp = tp_array[tpid];
177 0 : if (tp.time_start <= last_ts && !m_quiet) {
178 0 : fmt::print(" ERROR: {} {} ", +tp.time_start, last_ts );
179 : }
180 0 : tp_buffer.push_back(tp);
181 : }
182 :
183 0 : daqdataformats::FragmentHeader frag_hdr = fragment->get_header();
184 :
185 : // Customise the source id (add 1000 to id)
186 0 : frag_hdr.element_id = daqdataformats::SourceID{daqdataformats::SourceID::Subsystem::kTrigger, fragment->get_element_id().id+1000};
187 :
188 : // Either enqueue the task if using parallel processing, or execute the task now
189 0 : if (m_run_parallel) {
190 0 : enqueue_task([this, sid, record, frag_hdr, tp_buffer = std::move(tp_buffer)]() mutable {
191 0 : this->process_task(sid, record.first, frag_hdr, std::move(tp_buffer));
192 0 : });
193 : }
194 : else {
195 0 : this->process_task(sid, record.first, frag_hdr, std::move(tp_buffer));
196 : }
197 0 : }
198 : // If running in parallel, wait to process entire slice before we move to
199 : // the next one
200 0 : if (m_run_parallel) {
201 0 : wait_to_complete_tasks();
202 : }
203 0 : }
204 :
205 0 : size_t total = 0;
206 0 : for (auto& [key, vec_tas]: m_tas) {
207 0 : total += vec_tas.size();
208 : }
209 0 : std::cout << "We have a total of " << total << " TAs!" << std::endl;
210 0 : }
211 0 : }
212 :
213 0 : void TAFileHandler::start_processing()
214 : {
215 0 : m_main_thread = std::thread(&TAFileHandler::process_tasks, this);
216 0 : }
217 :
218 :
219 0 : void TAFileHandler::enqueue_task(std::function<void()> task)
220 : {
221 0 : {
222 0 : std::lock_guard<std::mutex> lock(m_queue_mutex);
223 0 : m_task_queue.push(std::move(task));
224 0 : ++m_active_tasks;
225 0 : }
226 0 : m_condition.notify_one();
227 0 : }
228 :
229 0 : void TAFileHandler::wait_to_complete_tasks()
230 : {
231 0 : std::unique_lock<std::mutex> lock(m_queue_mutex);
232 0 : m_task_complete_condition.wait(lock, [this]() { return m_active_tasks == 0; });
233 0 : }
234 :
235 0 : void TAFileHandler::wait_to_complete_work()
236 : {
237 : // Wait for the main threads to join
238 0 : m_main_thread.join();
239 0 : fmt::print("TAFileHandler_{} work completed\n", m_id);
240 :
241 : // Wait for the tasks to complete
242 0 : if (m_run_parallel) {
243 0 : wait_to_complete_tasks();
244 :
245 0 : {
246 0 : std::lock_guard<std::mutex> lock(m_queue_mutex);
247 0 : m_stop = true;
248 0 : }
249 0 : m_condition.notify_all();
250 :
251 0 : fmt::print("m_stop issued\n");
252 0 : for (std::thread& thread : m_thread_pool) {
253 0 : thread.join();
254 : }
255 : }
256 0 : }
257 :
258 0 : void TAFileHandler::process_task(daqdataformats::SourceID _source_id,
259 : uint64_t _rec,
260 : daqdataformats::FragmentHeader _header,
261 : std::vector<trgdataformats::TriggerPrimitive>&& _tps)
262 : {
263 : // Get te last fragment
264 0 : std::unique_ptr<daqdataformats::Fragment> frag = m_ta_emulators[_source_id]->emulate_vector(_tps);
265 :
266 : // Don't do anything if no fragments found
267 0 : if (!frag) {
268 : return;
269 : }
270 :
271 : // Get all the TriggerActivities from the TA Emulator buffer
272 0 : std::vector<triggeralgs::TriggerActivity> ta_buffer = m_ta_emulators[_source_id]->get_last_output_buffer();
273 :
274 : // Don't continue if no TAs found
275 0 : size_t n_tas = ta_buffer.size();
276 0 : if (!n_tas) {
277 0 : return;
278 : }
279 :
280 0 : if (!m_quiet && n_tas) {
281 0 : fmt::print(" Found {} TAs!\n", n_tas);
282 : }
283 :
284 : // Set the fragment header & push into our output (with locking!)
285 0 : {
286 0 : if (m_run_parallel) {
287 0 : std::lock_guard<std::mutex> lock(m_savetps_mutex);
288 0 : }
289 0 : m_tas[_rec].reserve(m_tas[_rec].size() + ta_buffer.size());
290 0 : m_tas[_rec].insert(m_tas[_rec].end(), std::make_move_iterator(ta_buffer.begin()), std::make_move_iterator(ta_buffer.end()));
291 :
292 0 : frag->set_header_fields(_header);
293 0 : frag->set_type(daqdataformats::FragmentType::kTriggerActivity);
294 :
295 0 : m_ta_fragments[_rec].push_back(std::move(frag));
296 : }
297 0 : }
298 :
299 0 : std::map<uint64_t, std::vector<triggeralgs::TriggerActivity>> TAFileHandler::get_tas()
300 : {
301 0 : return std::move(m_tas);
302 : }
303 :
304 0 : std::map<uint64_t, std::vector<std::unique_ptr<daqdataformats::Fragment>>> TAFileHandler::get_frags()
305 : {
306 0 : return std::move(m_ta_fragments);
307 : }
308 :
309 :
310 : }; // namespace dunedaq::trgtools
311 :
312 : #endif //TRGTOOLS_TAFILEHANDLER_CXX_
|