DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
dunedaq::trgtools::TAFileHandler Class Reference

#include <TAFileHandler.hpp>

Collaboration diagram for dunedaq::trgtools::TAFileHandler:
[legend]

Public Member Functions

 TAFileHandler (std::vector< std::shared_ptr< hdf5libs::HDF5RawDataFile > > _input_files, nlohmann::json _config, std::pair< uint64_t, uint64_t > _sliceid_range, bool _run_parallel, bool _quiet)
 Constructor, takes file input path & configuration.
 
 ~TAFileHandler ()=default
 
std::vector< daqdataformats::SourceIDget_valid_sourceids (daqdataformats::TimeSlice &_timeslice)
 Get the valid sourceids object from HDF5 file.
 
void start_processing ()
 User interaction for task processing.
 
void wait_to_complete_work ()
 Waits for all the tasks to complete.
 
std::map< uint64_t, std::vector< triggeralgs::TriggerActivity > > get_tas ()
 Retrieves all the TAs with std::move operator.
 
std::map< uint64_t, std::vector< std::unique_ptr< daqdataformats::Fragment > > > get_frags ()
 Retrieves all the unique pointers to the TA fragments.
 
hdf5libs::HDF5SourceIDHandler::source_id_geo_id_map_t get_sourceid_geoid_map ()
 Get the sourceid to geoid map object.
 

Private Member Functions

void process_tasks ()
 Function that processes the whole file.
 
void process_task (daqdataformats::SourceID _source_id, uint64_t _rec, daqdataformats::FragmentHeader _header, std::vector< trgdataformats::TriggerPrimitive > &&_tps)
 Function that processes one slice for one plane.
 
void worker_thread ()
 Creates & runs a worker thread.
 
void enqueue_task (std::function< void()> task)
 Enqueues task to process.
 
void wait_to_complete_tasks ()
 Waits to complete a task.
 

Private Attributes

std::vector< std::shared_ptr< hdf5libs::HDF5RawDataFile > > m_input_files
 A pointer to the input file.
 
nlohmann::json m_configuration
 configuration for the TA-makers
 
std::vector< std::string > m_input_paths
 input vector of tpstream input paths
 
std::map< daqdataformats::SourceID, std::unique_ptr< trgtools::EmulateTAUnit > > m_ta_emulators
 Map of SourceID : Emulator unit (TAMaker)
 
std::pair< uint64_t, uint64_t > m_sliceid_range
 Range of TimeSlice IDs to process.
 
const bool m_run_parallel
 Run the TA makers in parllel.
 
const bool m_quiet
 Quiet down the cout output.
 
std::thread m_main_thread
 The file handler thread.
 
std::atomic< bool > m_stop {false}
 Bool to indicate to stop the emulation.
 
std::mutex m_savetps_mutex
 Mutex for saving the TPs.
 
std::vector< std::thread > m_thread_pool
 
std::condition_variable m_condition
 
std::condition_variable m_task_complete_condition
 
std::queue< std::function< void()> > m_task_queue
 
std::mutex m_queue_mutex
 
std::atomic< size_t > m_active_tasks
 
std::map< uint64_t, std::vector< triggeralgs::TriggerActivity > > m_tas
 Output vector of TAs.
 
std::map< uint64_t, std::vector< std::unique_ptr< daqdataformats::Fragment > > > m_ta_fragments
 Output vector of TA fragments.
 
uint16_t m_id
 Unique ID for this TAFileHandler.
 

Static Private Attributes

static uint16_t m_id_next = 0
 Global variable used to get the next ID.
 
static const size_t SIZE_TP = sizeof(trgdataformats::TriggerPrimitive)
 Size of the TPS.
 

Detailed Description

Definition at line 23 of file TAFileHandler.hpp.

Constructor & Destructor Documentation

◆ TAFileHandler()

dunedaq::trgtools::TAFileHandler::TAFileHandler ( std::vector< std::shared_ptr< hdf5libs::HDF5RawDataFile > > _input_files,
nlohmann::json _config,
std::pair< uint64_t, uint64_t > _sliceid_range,
bool _run_parallel,
bool _quiet )

Constructor, takes file input path & configuration.

Each TAFileHandler will create its own thread, so all TAFileHandlers are run on separate threads

Parameters
_input_filesa vector of input HDF5 shared pointers to process
_configTAMaker configuration
_sliceid_rangerange of sliceids to process
_run_parallelrun each TAMaker (one per SourceID) in parallel
_quietquiet down the cout

Definition at line 11 of file TAFileHandler.cpp.

16 : m_input_files(input_files),
17 m_sliceid_range(sliceid_range),
18 m_run_parallel(run_parallel),
19 m_quiet(quiet),
21{
22 std::string algo_name = config["trigger_activity_plugin"][0];
23 nlohmann::json algo_config = config["trigger_activity_config"][0];
24
25 // Get the input file
26 // Extract the run number etc
27 std::vector<daqdataformats::run_number_t> run_numbers;
28 std::vector<size_t> file_indices;
29 for (const auto& input_file : input_files) {
30 if (std::find(run_numbers.begin(), run_numbers.end(),
31 input_file->get_attribute<daqdataformats::run_number_t>("run_number")) ==
32 run_numbers.end()) {
33 run_numbers.push_back(input_file->get_attribute<daqdataformats::run_number_t>("run_number"));
34 }
35
36 if (std::find(file_indices.begin(), file_indices.end(),
37 input_file->get_attribute<daqdataformats::run_number_t>("run_number")) ==
38 file_indices.end()) {
39 file_indices.push_back(input_file->get_attribute<size_t>("file_index"));
40 }
41 }
42
43 std::string application_name = m_input_files.front()->get_attribute<std::string>("application_name");
44
45 if (!m_quiet) {
46 fmt::print("Run Numbers: {}\nFile Indices: {}\nApp name: '{}'\n", fmt::join(run_numbers, ","), fmt::join(file_indices, ","), application_name);
47 }
48
49 // std::set of record IDs (pair of record number & sequence number)
50 auto records = m_input_files.front()->get_all_record_ids();
51
52 // Extract the number of TAMakers to create
53 daqdataformats::TimeSlice first_timeslice = m_input_files.front()->get_timeslice(*records.begin());
54 std::vector<daqdataformats::SourceID> valid_sources = get_valid_sourceids(first_timeslice);
55 fmt::print("Number of makers to make: {}\n", valid_sources.size());
56
57 for (const daqdataformats::SourceID& sid : valid_sources) {
58 // Create TAMaker
59 std::unique_ptr<triggeralgs::TriggerActivityMaker> ta_maker =
61 ta_maker->configure(algo_config);
62
63 // Add it to the enulators
64 m_ta_emulators[sid] = std::make_unique<trgtools::EmulateTAUnit>();
65 m_ta_emulators[sid]->set_maker(ta_maker);
66
67 // Create a worker thread per emulator
68 if (m_run_parallel) {
70 }
71 }
72}
const bool m_run_parallel
Run the TA makers in parllel.
std::vector< std::thread > m_thread_pool
std::vector< std::shared_ptr< hdf5libs::HDF5RawDataFile > > m_input_files
A pointer to the input file.
std::vector< daqdataformats::SourceID > get_valid_sourceids(daqdataformats::TimeSlice &_timeslice)
Get the valid sourceids object from HDF5 file.
static uint16_t m_id_next
Global variable used to get the next ID.
const bool m_quiet
Quiet down the cout output.
std::map< daqdataformats::SourceID, std::unique_ptr< trgtools::EmulateTAUnit > > m_ta_emulators
Map of SourceID : Emulator unit (TAMaker)
void worker_thread()
Creates & runs a worker thread.
uint16_t m_id
Unique ID for this TAFileHandler.
std::pair< uint64_t, uint64_t > m_sliceid_range
Range of TimeSlice IDs to process.
static std::shared_ptr< AbstractFactory< TriggerActivityMaker > > get_instance()
uint32_t run_number_t
Type used to represent run number.
Definition Types.hpp:20

◆ ~TAFileHandler()

dunedaq::trgtools::TAFileHandler::~TAFileHandler ( )
default

Member Function Documentation

◆ enqueue_task()

void dunedaq::trgtools::TAFileHandler::enqueue_task ( std::function< void()> task)
private

Enqueues task to process.

Parameters
tasktask to porcess

Definition at line 219 of file TAFileHandler.cpp.

220{
221 {
222 std::lock_guard<std::mutex> lock(m_queue_mutex);
223 m_task_queue.push(std::move(task));
225 }
226 m_condition.notify_one();
227}
std::condition_variable m_condition
std::queue< std::function< void()> > m_task_queue
std::atomic< size_t > m_active_tasks

◆ get_frags()

std::map< uint64_t, std::vector< std::unique_ptr< daqdataformats::Fragment > > > dunedaq::trgtools::TAFileHandler::get_frags ( )

Retrieves all the unique pointers to the TA fragments.

Returns
A map of sourceID : TA fragment

Definition at line 304 of file TAFileHandler.cpp.

305{
306 return std::move(m_ta_fragments);
307}
std::map< uint64_t, std::vector< std::unique_ptr< daqdataformats::Fragment > > > m_ta_fragments
Output vector of TA fragments.

◆ get_sourceid_geoid_map()

hdf5libs::HDF5SourceIDHandler::source_id_geo_id_map_t dunedaq::trgtools::TAFileHandler::get_sourceid_geoid_map ( )

Get the sourceid to geoid map object.

Returns
geoid to sourceid map

Definition at line 94 of file TAFileHandler.cpp.

95{
96 if (!m_input_files.size()) {
97 throw "Files not set yet!";
98 }
99
100 return m_input_files.front()->get_srcid_geoid_map();
101}

◆ get_tas()

std::map< uint64_t, std::vector< triggeralgs::TriggerActivity > > dunedaq::trgtools::TAFileHandler::get_tas ( )

Retrieves all the TAs with std::move operator.

Returns
a map of sourceID : TA

Definition at line 299 of file TAFileHandler.cpp.

300{
301 return std::move(m_tas);
302}
std::map< uint64_t, std::vector< triggeralgs::TriggerActivity > > m_tas
Output vector of TAs.

◆ get_valid_sourceids()

std::vector< daqdataformats::SourceID > dunedaq::trgtools::TAFileHandler::get_valid_sourceids ( daqdataformats::TimeSlice & _timeslice)

Get the valid sourceids object from HDF5 file.

Parameters
_timeslicetimeslice to load the sourceIDs from
Returns
vector of SoureIDs from this file

Definition at line 75 of file TAFileHandler.cpp.

76{
77 const auto& fragments = _timeslice.get_fragments_ref();
78
79 std::vector<daqdataformats::SourceID> ret;
80 for (const auto& fragment : fragments) {
81 if (fragment->get_fragment_type() != daqdataformats::FragmentType::kTriggerPrimitive) {
82 continue;
83 }
84
85 daqdataformats::SourceID sourceid = fragment->get_element_id();
86
87 ret.push_back(sourceid);
88 }
89
90 return ret;
91}
@ kTriggerPrimitive
Trigger format TPs produced by trigger code.
Cannot add TPSet with sourceid

◆ process_task()

void dunedaq::trgtools::TAFileHandler::process_task ( daqdataformats::SourceID _source_id,
uint64_t _rec,
daqdataformats::FragmentHeader _header,
std::vector< trgdataformats::TriggerPrimitive > && _tps )
private

Function that processes one slice for one plane.

Parameters
_source_idsourceID to process
_recrecord ID to process
_headerfragment header
_tpsvectors of trigger primitives to process (with move operator)

Definition at line 258 of file TAFileHandler.cpp.

262{
263 // Get te last fragment
264 std::unique_ptr<daqdataformats::Fragment> frag = m_ta_emulators[_source_id]->emulate_vector(_tps);
265
266 // Don't do anything if no fragments found
267 if (!frag) {
268 return;
269 }
270
271 // Get all the TriggerActivities from the TA Emulator buffer
272 std::vector<triggeralgs::TriggerActivity> ta_buffer = m_ta_emulators[_source_id]->get_last_output_buffer();
273
274 // Don't continue if no TAs found
275 size_t n_tas = ta_buffer.size();
276 if (!n_tas) {
277 return;
278 }
279
280 if (!m_quiet && n_tas) {
281 fmt::print(" Found {} TAs!\n", n_tas);
282 }
283
284 // Set the fragment header & push into our output (with locking!)
285 {
286 if (m_run_parallel) {
287 std::lock_guard<std::mutex> lock(m_savetps_mutex);
288 }
289 m_tas[_rec].reserve(m_tas[_rec].size() + ta_buffer.size());
290 m_tas[_rec].insert(m_tas[_rec].end(), std::make_move_iterator(ta_buffer.begin()), std::make_move_iterator(ta_buffer.end()));
291
292 frag->set_header_fields(_header);
294
295 m_ta_fragments[_rec].push_back(std::move(frag));
296 }
297}
std::mutex m_savetps_mutex
Mutex for saving the TPs.
FELIX Initialization std::string initerror FELIX queue timed std::string queuename Unexpected chunk size

◆ process_tasks()

void dunedaq::trgtools::TAFileHandler::process_tasks ( )
private

Function that processes the whole file.

Definition at line 134 of file TAFileHandler.cpp.

135{
136 // Iterate over the input files
137 for (auto& input_file: m_input_files) {
138 // std::set of record IDs (pair of record number & sequence number)
139 auto records = input_file->get_all_record_ids();
140
141 for (const auto& record : records) {
142 if (record.first < m_sliceid_range.first || record.first > m_sliceid_range.second) {
143 if (!m_quiet)
144 fmt::print(" Will not process RecordID {} because it's outside of our range!", record.first);
145 continue;
146 }
147
148 // Get all the fragments
149 daqdataformats::TimeSlice timeslice = input_file->get_timeslice(record);
150 const auto& fragments = timeslice.get_fragments_ref();
151
152 // Iterate over the fragments & process each fragment
153 for (const auto& fragment : fragments) {
154 daqdataformats::SourceID sid = fragment->get_element_id();
155
156 if (!m_ta_emulators.contains(sid)) {
157 continue;
158 }
159
160 // Pull tps out
161 size_t n_tps = fragment->get_data_size()/SIZE_TP;
162 if (!m_quiet) {
163 fmt::print(" TP fragment size: {}\n", fragment->get_data_size());
164 fmt::print(" Num TPs: {}\n", n_tps);
165 }
166
167 // Create a TP buffer
168 std::vector<trgdataformats::TriggerPrimitive> tp_buffer;
169 // Prepare the TP buffer, checking for time ordering
170 tp_buffer.reserve(n_tps);
171
172 // Populate the TP buffer
173 trgdataformats::TriggerPrimitive* tp_array = static_cast<trgdataformats::TriggerPrimitive*>(fragment->get_data());
174 uint64_t last_ts = 0;
175 for(size_t tpid(0); tpid<n_tps; ++tpid) {
176 auto& tp = tp_array[tpid];
177 if (tp.time_start <= last_ts && !m_quiet) {
178 fmt::print(" ERROR: {} {} ", +tp.time_start, last_ts );
179 }
180 tp_buffer.push_back(tp);
181 }
182
183 daqdataformats::FragmentHeader frag_hdr = fragment->get_header();
184
185 // Customise the source id (add 1000 to id)
186 frag_hdr.element_id = daqdataformats::SourceID{daqdataformats::SourceID::Subsystem::kTrigger, fragment->get_element_id().id+1000};
187
188 // Either enqueue the task if using parallel processing, or execute the task now
189 if (m_run_parallel) {
190 enqueue_task([this, sid, record, frag_hdr, tp_buffer = std::move(tp_buffer)]() mutable {
191 this->process_task(sid, record.first, frag_hdr, std::move(tp_buffer));
192 });
193 }
194 else {
195 this->process_task(sid, record.first, frag_hdr, std::move(tp_buffer));
196 }
197 }
198 // If running in parallel, wait to process entire slice before we move to
199 // the next one
200 if (m_run_parallel) {
202 }
203 }
204
205 size_t total = 0;
206 for (auto& [key, vec_tas]: m_tas) {
207 total += vec_tas.size();
208 }
209 std::cout << "We have a total of " << total << " TAs!" << std::endl;
210 }
211}
static const size_t SIZE_TP
Size of the TPS.
void wait_to_complete_tasks()
Waits to complete a task.
void process_task(daqdataformats::SourceID _source_id, uint64_t _rec, daqdataformats::FragmentHeader _header, std::vector< trgdataformats::TriggerPrimitive > &&_tps)
Function that processes one slice for one plane.
void enqueue_task(std::function< void()> task)
Enqueues task to process.

◆ start_processing()

void dunedaq::trgtools::TAFileHandler::start_processing ( )

User interaction for task processing.

Definition at line 213 of file TAFileHandler.cpp.

214{
215 m_main_thread = std::thread(&TAFileHandler::process_tasks, this);
216}
void process_tasks()
Function that processes the whole file.
std::thread m_main_thread
The file handler thread.

◆ wait_to_complete_tasks()

void dunedaq::trgtools::TAFileHandler::wait_to_complete_tasks ( )
private

Waits to complete a task.

Definition at line 229 of file TAFileHandler.cpp.

230{
231 std::unique_lock<std::mutex> lock(m_queue_mutex);
232 m_task_complete_condition.wait(lock, [this]() { return m_active_tasks == 0; });
233}
std::condition_variable m_task_complete_condition

◆ wait_to_complete_work()

void dunedaq::trgtools::TAFileHandler::wait_to_complete_work ( )

Waits for all the tasks to complete.

Definition at line 235 of file TAFileHandler.cpp.

236{
237 // Wait for the main threads to join
238 m_main_thread.join();
239 fmt::print("TAFileHandler_{} work completed\n", m_id);
240
241 // Wait for the tasks to complete
242 if (m_run_parallel) {
244
245 {
246 std::lock_guard<std::mutex> lock(m_queue_mutex);
247 m_stop = true;
248 }
249 m_condition.notify_all();
250
251 fmt::print("m_stop issued\n");
252 for (std::thread& thread : m_thread_pool) {
253 thread.join();
254 }
255 }
256}
std::atomic< bool > m_stop
Bool to indicate to stop the emulation.

◆ worker_thread()

void dunedaq::trgtools::TAFileHandler::worker_thread ( )
private

Creates & runs a worker thread.

Get a task from the queue (with locking)

Definition at line 103 of file TAFileHandler.cpp.

104{
105 while (true) {
107 std::function<void()> task;
108 {
109 std::unique_lock<std::mutex> lock(m_queue_mutex);
110 m_condition.wait(lock, [this]() {return m_stop || !m_task_queue.empty(); });
111
112 if (m_stop && m_task_queue.empty()) {
113 return;
114 }
115
116 task = std::move(m_task_queue.front());
117 m_task_queue.pop();
118 }
119
120 // Run & complete a task
121 task();
122
123 // Notify that task was completed
124 {
125 std::lock_guard<std::mutex> lock(m_queue_mutex);
127 if (m_active_tasks == 0) {
128 m_task_complete_condition.notify_all();
129 }
130 }
131 }
132}

Member Data Documentation

◆ m_active_tasks

std::atomic<size_t> dunedaq::trgtools::TAFileHandler::m_active_tasks
private

Definition at line 160 of file TAFileHandler.hpp.

◆ m_condition

std::condition_variable dunedaq::trgtools::TAFileHandler::m_condition
private

Definition at line 156 of file TAFileHandler.hpp.

◆ m_configuration

nlohmann::json dunedaq::trgtools::TAFileHandler::m_configuration
private

configuration for the TA-makers

Definition at line 121 of file TAFileHandler.hpp.

◆ m_id

uint16_t dunedaq::trgtools::TAFileHandler::m_id
private

Unique ID for this TAFileHandler.

Definition at line 169 of file TAFileHandler.hpp.

◆ m_id_next

uint16_t dunedaq::trgtools::TAFileHandler::m_id_next = 0
staticprivate

Global variable used to get the next ID.

Definition at line 171 of file TAFileHandler.hpp.

◆ m_input_files

std::vector<std::shared_ptr<hdf5libs::HDF5RawDataFile> > dunedaq::trgtools::TAFileHandler::m_input_files
private

A pointer to the input file.

Definition at line 118 of file TAFileHandler.hpp.

◆ m_input_paths

std::vector<std::string> dunedaq::trgtools::TAFileHandler::m_input_paths
private

input vector of tpstream input paths

Definition at line 124 of file TAFileHandler.hpp.

◆ m_main_thread

std::thread dunedaq::trgtools::TAFileHandler::m_main_thread
private

The file handler thread.

Definition at line 143 of file TAFileHandler.hpp.

◆ m_queue_mutex

std::mutex dunedaq::trgtools::TAFileHandler::m_queue_mutex
private

Definition at line 159 of file TAFileHandler.hpp.

◆ m_quiet

const bool dunedaq::trgtools::TAFileHandler::m_quiet
private

Quiet down the cout output.

Definition at line 136 of file TAFileHandler.hpp.

◆ m_run_parallel

const bool dunedaq::trgtools::TAFileHandler::m_run_parallel
private

Run the TA makers in parllel.

Definition at line 133 of file TAFileHandler.hpp.

◆ m_savetps_mutex

std::mutex dunedaq::trgtools::TAFileHandler::m_savetps_mutex
private

Mutex for saving the TPs.

Definition at line 154 of file TAFileHandler.hpp.

◆ m_sliceid_range

std::pair<uint64_t, uint64_t> dunedaq::trgtools::TAFileHandler::m_sliceid_range
private

Range of TimeSlice IDs to process.

Definition at line 130 of file TAFileHandler.hpp.

◆ m_stop

std::atomic<bool> dunedaq::trgtools::TAFileHandler::m_stop {false}
private

Bool to indicate to stop the emulation.

Definition at line 146 of file TAFileHandler.hpp.

146{false};

◆ m_ta_emulators

std::map<daqdataformats::SourceID, std::unique_ptr<trgtools::EmulateTAUnit> > dunedaq::trgtools::TAFileHandler::m_ta_emulators
private

Map of SourceID : Emulator unit (TAMaker)

Definition at line 127 of file TAFileHandler.hpp.

◆ m_ta_fragments

std::map<uint64_t, std::vector<std::unique_ptr<daqdataformats::Fragment> > > dunedaq::trgtools::TAFileHandler::m_ta_fragments
private

Output vector of TA fragments.

Definition at line 166 of file TAFileHandler.hpp.

◆ m_tas

std::map<uint64_t, std::vector<triggeralgs::TriggerActivity> > dunedaq::trgtools::TAFileHandler::m_tas
private

Output vector of TAs.

Definition at line 163 of file TAFileHandler.hpp.

◆ m_task_complete_condition

std::condition_variable dunedaq::trgtools::TAFileHandler::m_task_complete_condition
private

Definition at line 157 of file TAFileHandler.hpp.

◆ m_task_queue

std::queue<std::function<void()> > dunedaq::trgtools::TAFileHandler::m_task_queue
private

Definition at line 158 of file TAFileHandler.hpp.

◆ m_thread_pool

std::vector<std::thread> dunedaq::trgtools::TAFileHandler::m_thread_pool
private

Definition at line 155 of file TAFileHandler.hpp.

◆ SIZE_TP

const size_t dunedaq::trgtools::TAFileHandler::SIZE_TP = sizeof(trgdataformats::TriggerPrimitive)
staticprivate

Size of the TPS.

Definition at line 173 of file TAFileHandler.hpp.


The documentation for this class was generated from the following files: