58 def __init__(self, filename: str, verbosity: int = 0, batch_mode: bool =
False) ->
None:
60 Loads a given HDF5 file.
63 filename (str): HDF5 file to open.
64 verbosity (int): Verbose level. 0: Only errors. 1: Warnings. 2: All.
68 super().
__init__(filename, verbosity, batch_mode)
97 Read from the given data fragment path.
99 Returns a np.ndarray of the first TA that was read and appends all TAs in the fragment to :self.ta_data:.
103 print(f
"INFO: Reading from the path\n{fragment_path}")
105 fragment = self.
_h5_file.get_frag(fragment_path)
106 fragment_data_size = fragment.get_data_size()
107 trigger_number = fragment.get_trigger_number()
109 if fragment_data_size == 0:
115 +
"WARNING: Empty fragment. Returning empty array."
119 return np.array([], dtype=self.
ta_dt)
123 while byte_idx < fragment_data_size:
125 print(f
"INFO: Fragment Index: {ta_idx}.")
127 print(f
"INFO: Byte Index / Frag Size: {byte_idx} / {fragment_data_size}")
130 ta_datum = trgdataformats.TriggerActivity(fragment.get_data(byte_idx))
131 np_ta_datum = np.array([(
132 ta_datum.data.adc_integral,
133 ta_datum.data.adc_peak,
134 ta_datum.data.algorithm,
135 ta_datum.data.channel_end,
136 ta_datum.data.channel_peak,
137 ta_datum.data.channel_start,
138 np.uint16(ta_datum.data.detid),
140 ta_datum.data.time_activity,
141 ta_datum.data.time_end,
142 ta_datum.data.time_peak,
143 ta_datum.data.time_start,
145 np.uint16(ta_datum.data.version),
151 byte_idx += ta_datum.sizeof()
153 print(f
"INFO: Upcoming byte index: {byte_idx}")
156 np_tp_data = np.zeros(np_ta_datum[
'num_tps'], dtype=self.
tp_dt)
157 for tp_idx, tp
in enumerate(ta_datum):
158 np_tp_data[tp_idx] = np.array([(
164 tp.samples_over_threshold,
169 self.
tp_data.append(np_tp_data)
172 print(
"INFO: Finished reading.")