DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
TAFileHandler.hpp
Go to the documentation of this file.
1#ifndef TRGTOOLS_TAFILEHANDLER_HPP_
2#define TRGTOOLS_TAFILEHANDLER_HPP_
3
5
6#include "CLI/App.hpp"
7#include "CLI/Config.hpp"
8#include "CLI/Formatter.hpp"
9
10#include <fmt/core.h>
11#include <fmt/format.h>
12#include <fmt/chrono.h>
13#include <filesystem>
14
19
20namespace dunedaq::trgtools
21{
22
24{
25 public:
38 TAFileHandler(std::vector<std::shared_ptr<hdf5libs::HDF5RawDataFile>> _input_files,
39 nlohmann::json _config,
40 std::pair<uint64_t, uint64_t> _sliceid_range,
41 bool _run_parallel,
42 bool _quiet);
43
44 ~TAFileHandler() = default;
45
52 std::vector<daqdataformats::SourceID>
54
56 void start_processing();
57
60
66 std::map<uint64_t, std::vector<triggeralgs::TriggerActivity>>
67 get_tas();
68
74 std::map<uint64_t, std::vector<std::unique_ptr<daqdataformats::Fragment>>>
75 get_frags();
76
84
85
86 private:
88 void process_tasks();
89
99 uint64_t _rec,
101 std::vector<trgdataformats::TriggerPrimitive>&& _tps);
102
104 void worker_thread();
105
111 void enqueue_task(std::function<void()> task);
112
115
116 private:
118 std::vector<std::shared_ptr<hdf5libs::HDF5RawDataFile>> m_input_files;
119
121 nlohmann::json m_configuration;
122
124 std::vector<std::string> m_input_paths;
125
127 std::map<daqdataformats::SourceID, std::unique_ptr<trgtools::EmulateTAUnit>> m_ta_emulators;
128
130 std::pair<uint64_t, uint64_t> m_sliceid_range;
131
133 const bool m_run_parallel;
134
136 const bool m_quiet;
137
138 /*
139 * Threading objects for the main file handler
140 */
141
143 std::thread m_main_thread;
144
146 std::atomic<bool> m_stop{false};
147
148 /*
149 * Optional threading objects for the tasks
150 * i.e. one thread per TAMaker
151 */
152
154 std::mutex m_savetps_mutex;
155 std::vector<std::thread> m_thread_pool;
156 std::condition_variable m_condition;
157 std::condition_variable m_task_complete_condition;
158 std::queue<std::function<void()>> m_task_queue;
159 std::mutex m_queue_mutex;
160 std::atomic<size_t> m_active_tasks;
161
163 std::map<uint64_t, std::vector<triggeralgs::TriggerActivity>> m_tas;
164
166 std::map<uint64_t, std::vector<std::unique_ptr<daqdataformats::Fragment>>> m_ta_fragments;
167
169 uint16_t m_id;
171 static uint16_t m_id_next;
173 static const size_t SIZE_TP = sizeof(trgdataformats::TriggerPrimitive);
174};
175
176};
177
178#endif
C++ Representation of a DUNE TimeSlice, consisting of a TimeSliceHeader object and a vector of pointe...
Definition TimeSlice.hpp:27
std::map< daqdataformats::SourceID, std::vector< uint64_t > > source_id_geo_id_map_t
TAFileHandler(std::vector< std::shared_ptr< hdf5libs::HDF5RawDataFile > > _input_files, nlohmann::json _config, std::pair< uint64_t, uint64_t > _sliceid_range, bool _run_parallel, bool _quiet)
Constructor, takes file input path & configuration.
const bool m_run_parallel
Run the TA makers in parllel.
std::condition_variable m_task_complete_condition
std::vector< std::thread > m_thread_pool
std::vector< std::shared_ptr< hdf5libs::HDF5RawDataFile > > m_input_files
A pointer to the input file.
static const size_t SIZE_TP
Size of the TPS.
std::map< uint64_t, std::vector< std::unique_ptr< daqdataformats::Fragment > > > m_ta_fragments
Output vector of TA fragments.
std::map< uint64_t, std::vector< triggeralgs::TriggerActivity > > get_tas()
Retrieves all the TAs with std::move operator.
std::vector< daqdataformats::SourceID > get_valid_sourceids(daqdataformats::TimeSlice &_timeslice)
Get the valid sourceids object from HDF5 file.
std::condition_variable m_condition
static uint16_t m_id_next
Global variable used to get the next ID.
void wait_to_complete_work()
Waits for all the tasks to complete.
void wait_to_complete_tasks()
Waits to complete a task.
nlohmann::json m_configuration
configuration for the TA-makers
const bool m_quiet
Quiet down the cout output.
void process_tasks()
Function that processes the whole file.
std::map< daqdataformats::SourceID, std::unique_ptr< trgtools::EmulateTAUnit > > m_ta_emulators
Map of SourceID : Emulator unit (TAMaker)
void process_task(daqdataformats::SourceID _source_id, uint64_t _rec, daqdataformats::FragmentHeader _header, std::vector< trgdataformats::TriggerPrimitive > &&_tps)
Function that processes one slice for one plane.
std::thread m_main_thread
The file handler thread.
void enqueue_task(std::function< void()> task)
Enqueues task to process.
std::mutex m_savetps_mutex
Mutex for saving the TPs.
std::queue< std::function< void()> > m_task_queue
std::vector< std::string > m_input_paths
input vector of tpstream input paths
std::map< uint64_t, std::vector< triggeralgs::TriggerActivity > > m_tas
Output vector of TAs.
std::atomic< size_t > m_active_tasks
void worker_thread()
Creates & runs a worker thread.
uint16_t m_id
Unique ID for this TAFileHandler.
std::map< uint64_t, std::vector< std::unique_ptr< daqdataformats::Fragment > > > get_frags()
Retrieves all the unique pointers to the TA fragments.
std::pair< uint64_t, uint64_t > m_sliceid_range
Range of TimeSlice IDs to process.
void start_processing()
User interaction for task processing.
hdf5libs::HDF5SourceIDHandler::source_id_geo_id_map_t get_sourceid_geoid_map()
Get the sourceid to geoid map object.
std::atomic< bool > m_stop
Bool to indicate to stop the emulation.
The header for a DUNE Fragment.
SourceID is a generalized representation of the source of a piece of data in the DAQ....
Definition SourceID.hpp:32
A single energy deposition on a TPC or PDS channel.