DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
TAReader.py
Go to the documentation of this file.
1"""
2Reader class for TA data.
3"""
4from .HDF5Reader import HDF5Reader
5
6import daqdataformats # noqa: F401 : Not used, but needed to recognize formats.
7import trgdataformats
8
9import numpy as np
10from numpy.typing import NDArray
11
12
14 """
15 Class that reads a given HDF5 data file and can
16 process the TA fragments within.
17
18 Loading fragments appends to :self.ta_data: and :self.tp_data:.
19 NumPy dtypes of :self.ta_data: and :self.tp_data: are available
20 as :TAReader.ta_dt: and :TAReader.tp_dt:.
21
22 TA reading can print information that is relevant about the
23 loading process by specifying the verbose level. 0 for errors
24 only. 1 for warnings. 2 for all information.
25 """
26 # TA data type
27 ta_dt = np.dtype([
28 ('adc_integral', np.uint64),
29 ('adc_peak', np.uint64),
30 ('algorithm', trgdataformats.TriggerActivityData.Algorithm),
31 ('channel_end', np.int32),
32 ('channel_peak', np.int32),
33 ('channel_start', np.int32),
34 ('detid', np.uint16),
35 ('num_tps', np.uint64), # Greedy
36 ('time_activity', np.uint64),
37 ('time_end', np.uint64),
38 ('time_peak', np.uint64),
39 ('time_start', np.uint64),
40 ('type', trgdataformats.TriggerActivityData.Type),
41 ('version', np.uint16)
42 ])
43
44 # TP data type
45 tp_dt = np.dtype([
46 ('adc_integral', np.uint32),
47 ('adc_peak', np.uint16),
48 ('channel', np.uint32),
49 ('detid', np.uint8),
50 ('flag', np.uint8),
51 ('samples_over_threshold', np.uint16),
52 ('samples_to_peak', np.uint16),
53 ('time_start', np.uint64),
54 ('version', np.uint8)
55 ])
56
57 def __init__(self, filename: str, verbosity: int = 0, batch_mode: bool = False) -> None:
58 """
59 Loads a given HDF5 file.
60
61 Parameters:
62 filename (str): HDF5 file to open.
63 verbosity (int): Verbose level. 0: Only errors. 1: Warnings. 2: All.
64
65 Returns nothing.
66 """
67 super().__init__(filename, verbosity, batch_mode)
68 self.ta_data = np.array([], dtype=self.ta_dt)
69 self.tp_data = []
70 return None
71
72 def __getitem__(self, key: int | str) -> NDArray[ta_dt]:
73 return self.ta_data[key]
74
75 def __setitem__(self, key: int | str, value: NDArray[ta_dt]) -> None:
76 self.ta_data[key] = value
77 return
78
79 def __len__(self) -> int:
80 return len(self.ta_data)
81
82 def _filter_fragment_paths(self) -> None:
83 """ Filter the fragment paths for TAs. """
84 fragment_paths = []
85
86 # TA fragment paths contain their name in the path.
88 if "Trigger_Activity" in path:
89 fragment_paths.append(path)
90
91 self._fragment_paths_fragment_paths = fragment_paths
92 return None
93
94 def read_fragment(self, fragment_path: str) -> NDArray:
95 """
96 Read from the given data fragment path.
97
98 Returns a np.ndarray of the first TA that was read and appends all TAs in the fragment to :self.ta_data:.
99 """
100 if self._verbosity >= 2:
101 print("="*60)
102 print(f"INFO: Reading from the path\n{fragment_path}")
103
104 fragment = self._h5_file.get_frag(fragment_path)
105 fragment_data_size = fragment.get_data_size()
106
107 if fragment_data_size == 0:
108 self._num_empty += 1
109 if self._verbosity >= 1:
110 print(
112 + self._BOLD_TEXT
113 + "WARNING: Empty fragment. Returning empty array."
114 + self._END_TEXT_COLOR
115 )
116 print("="*60)
117 return np.array([], dtype=self.ta_dt)
118
119 ta_idx = 0 # Debugging output.
120 byte_idx = 0 # Variable TA sizing, must do while loop.
121 while byte_idx < fragment_data_size:
122 if self._verbosity >= 2:
123 print(f"INFO: Fragment Index: {ta_idx}.")
124 ta_idx += 1
125 print(f"INFO: Byte Index / Frag Size: {byte_idx} / {fragment_data_size}")
126
127 # Read TA data
128 ta_datum = trgdataformats.TriggerActivity(fragment.get_data(byte_idx))
129 np_ta_datum = np.array([(
130 ta_datum.data.adc_integral,
131 ta_datum.data.adc_peak,
132 ta_datum.data.algorithm,
133 ta_datum.data.channel_end,
134 ta_datum.data.channel_peak,
135 ta_datum.data.channel_start,
136 np.uint16(ta_datum.data.detid),
137 ta_datum.n_inputs(),
138 ta_datum.data.time_activity,
139 ta_datum.data.time_end,
140 ta_datum.data.time_peak,
141 ta_datum.data.time_start,
142 ta_datum.data.type,
143 np.uint16(ta_datum.data.version))],
144 dtype=self.ta_dt)
145
146 self.ta_data = np.hstack((self.ta_data, np_ta_datum))
147
148 byte_idx += ta_datum.sizeof()
149 if self._verbosity >= 2:
150 print(f"INFO: Upcoming byte index: {byte_idx}")
151
152 # Process TP data
153 np_tp_data = np.zeros(np_ta_datum['num_tps'], dtype=self.tp_dt)
154 for tp_idx, tp in enumerate(ta_datum):
155 np_tp_data[tp_idx] = np.array([(
156 tp.adc_integral,
157 tp.adc_peak,
158 tp.channel,
159 tp.detid,
160 tp.flag,
161 tp.samples_over_threshold,
162 tp.samples_to_peak,
163 tp.time_start,
164 tp.version)],
165 dtype=self.tp_dt)
166 self.tp_data.append(np_tp_data) # Jagged array
167
168 if self._verbosity >= 2:
169 print("INFO: Finished reading.")
170 print("="*60)
171 return np_ta_datum
172
173 def clear_data(self) -> None:
174 self.ta_data = np.array([], dtype=self.ta_dt)
175 self.tp_data = []
NDArray read_fragment(self, str fragment_path)
Definition TAReader.py:94
NDArray[ta_dt] __getitem__(self, int|str key)
Definition TAReader.py:72
None __init__(self, str filename, int verbosity=0, bool batch_mode=False)
Definition TAReader.py:57
None _filter_fragment_paths(self)
Definition TAReader.py:82
None __setitem__(self, int|str key, NDArray[ta_dt] value)
Definition TAReader.py:75