57 def __init__(self, filename: str, verbosity: int = 0, batch_mode: bool =
False) ->
None:
59 Loads a given HDF5 file.
62 filename (str): HDF5 file to open.
63 verbosity (int): Verbose level. 0: Only errors. 1: Warnings. 2: All.
67 super().
__init__(filename, verbosity, batch_mode)
96 Read from the given data fragment path.
98 Returns a np.ndarray of the first TA that was read and appends all TAs in the fragment to :self.ta_data:.
102 print(f
"INFO: Reading from the path\n{fragment_path}")
104 fragment = self.
_h5_file.get_frag(fragment_path)
105 fragment_data_size = fragment.get_data_size()
107 if fragment_data_size == 0:
113 +
"WARNING: Empty fragment. Returning empty array."
117 return np.array([], dtype=self.
ta_dt)
121 while byte_idx < fragment_data_size:
123 print(f
"INFO: Fragment Index: {ta_idx}.")
125 print(f
"INFO: Byte Index / Frag Size: {byte_idx} / {fragment_data_size}")
128 ta_datum = trgdataformats.TriggerActivity(fragment.get_data(byte_idx))
129 np_ta_datum = np.array([(
130 ta_datum.data.adc_integral,
131 ta_datum.data.adc_peak,
132 ta_datum.data.algorithm,
133 ta_datum.data.channel_end,
134 ta_datum.data.channel_peak,
135 ta_datum.data.channel_start,
136 np.uint16(ta_datum.data.detid),
138 ta_datum.data.time_activity,
139 ta_datum.data.time_end,
140 ta_datum.data.time_peak,
141 ta_datum.data.time_start,
143 np.uint16(ta_datum.data.version))],
148 byte_idx += ta_datum.sizeof()
150 print(f
"INFO: Upcoming byte index: {byte_idx}")
153 np_tp_data = np.zeros(np_ta_datum[
'num_tps'], dtype=self.
tp_dt)
154 for tp_idx, tp
in enumerate(ta_datum):
155 np_tp_data[tp_idx] = np.array([(
161 tp.samples_over_threshold,
166 self.
tp_data.append(np_tp_data)
169 print(
"INFO: Finished reading.")