DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
simple_transform_test_gen Namespace Reference

Functions

 get_file_info (filename)
 
 generate_transform_objs (oksfile, files, trigger_mode)
 

Variables

 GeoID = namedtuple('GeoID', ['det_id', 'crate_id', 'slot_id', 'stream_id'], defaults=[0,0,0,0])
 

Function Documentation

◆ generate_transform_objs()

simple_transform_test_gen.generate_transform_objs ( oksfile,
files,
trigger_mode )

Definition at line 74 of file simple_transform_test_gen.py.

74def generate_transform_objs(oksfile, files, trigger_mode):
75
76 if os.path.exists(oksfile):
77 print(f"Target OKS database {oksfile} exists, aborting")
78 exit(1)
79
80 schemafiles = [
81 "schema/confmodel/dunedaq.schema.xml",
82 "schema/appmodel/application.schema.xml",
83 "schema/appmodel/fdmodules.schema.xml",
84 "schema/appmodel/trigger.schema.xml",
85 ]
86 dal = conffwk.dal.module("generated", schemafiles)
87 db = conffwk.Configuration("oksconflibs")
88 db.create_db(oksfile, schemafiles)
89
90 file_infos = {}
91 files_by_geoid = {}
92 for file in files:
93 print(f"Reading file info for {file}")
94 this_info = get_file_info(file)
95 if this_info["geo_id"] in files_by_geoid.keys():
96 files_by_geoid[this_info["geo_id"]].append(file)
97 else:
98 files_by_geoid[this_info["geo_id"]] = [file]
99 file_infos[file] = this_info
100
101 groups = []
102 streams = []
103 senders = []
104 source_id = 0
105 print(f"New nic adding nic with id nic-{0}")
106 nic_dal = dal.NetworkInterface(f"snb-nic-{0}")
107 db.update_dal(nic_dal)
108
109 for geoid,geoid_files in files_by_geoid.items():
110 print (f"Generating {geoid=}")
111 first_file = geoid_files[0]
112 first_file_info = file_infos[first_file]
113
114 geo_dal = dal.GeoId(
115 f"geioId-{source_id}",
116 detector_id=geoid.det_id,
117 crate_id=geoid.crate_id,
118 slot_id=geoid.slot_id,
119 stream_id=geoid.stream_id,
120 )
121 db.update_dal(geo_dal)
122 stream = dal.DetectorStream(
123 f"stream-{source_id}",
124 source_id=source_id,
125 geo_id=geo_dal,
126 )
127 db.update_dal(stream)
128 streams.append(stream)
129 db.commit()
130
131 sender_dal = dal.FakeDataSender(
132 f"sender-{source_id}",
133 streams=[stream],
134 uses=nic_dal
135 )
136 db.update_dal(sender_dal)
137 senders.append(sender_dal)
138 db.commit()
139
140 rec_dal = dal.FileReaderReceiver(f"dataRec-{source_id}", uses=nic_dal)
141 db.update_dal(rec_dal)
142 detconn_dal = dal.NetworkDetectorToDaqConnection(
143 f"det-conn-{source_id}",
144 net_receiver=rec_dal,
145 net_senders=senders
146 )
147 db.update_dal(detconn_dal)
148 groups.append(detconn_dal)
149
150 file_source_dal = dal.SNBFileSourceParameters(f"snb-files-det-conn-{source_id}", data_files=geoid_files, input_buffer_size=5777280,
151 file_compression_algorithm="None",)
152 db.update_dal(file_source_dal)
153
154 senders = []
155 source_id = source_id + 1
156
157
158 triggers=[]
159
160 if trigger_mode == "per-file": # Make a trigger matching the contents of each file
161
162 for file,file_info in file_infos.items():
163 first = file_info["first"]
164 last = file_info["last"]
165 fttc_dal = dal.FixedTimeTCConf(f"ft-trig-{first}", timestamp_start=first, timestamp_end=last)
166 match = False
167 for trig in triggers:
168 if trig.timestamp_start == first:
169 match = True
170 break;
171
172 if not match:
173 db.update_dal(fttc_dal)
174 triggers.append(fttc_dal)
175 elif trigger_mode == "aligned-chunks": # Make a "start", "body" and "end" trigger
176 earliest_start = -1
177 latest_start = -1
178 earliest_end = -1
179 latest_end = -1
180
181 for file,file_info in file_infos.items():
182 first = file_info["first"]
183 last = file_info["last"]
184 if earliest_start == -1 or first < earliest_start:
185 earliest_start = first
186 if latest_start == -1 or first > latest_start:
187 latest_start = first
188 if earliest_end == -1 or last < earliest_end:
189 earliest_end = last
190 if latest_end == -1 or last > latest_end:
191 latest_end = last
192
193 if earliest_end < latest_start:
194 print(f"Warning: Latest window start time is after earliest end time! Only creating \"start\" and \"end\" triggers!")
195 earliest_end = latest_start
196
197 start_fttc_dal = dal.FixedTimeTCConf(f"ft-trig-{earliest_start}", timestamp_start=earliest_start, timestamp_end=latest_start)
198 db.update_dal(start_fttc_dal)
199 triggers.append(start_fttc_dal)
200 if earliest_end > latest_start:
201 body_fttc_dal = dal.FixedTimeTCConf(f"ft-trig-{latest_start}", timestamp_start=latest_start, timestamp_end=earliest_end)
202 db.update_dal(body_fttc_dal)
203 triggers.append(body_fttc_dal)
204 end_fttc_dal = dal.FixedTimeTCConf(f"ft-trig-{earliest_end}", timestamp_start=earliest_end, timestamp_end=latest_end)
205 db.update_dal(end_fttc_dal)
206 triggers.append(end_fttc_dal)
207 else: # One big trigger
208 earliest_start = -1
209 latest_end = -1
210
211 for file,file_info in file_infos.items():
212 first = file_info["first"]
213 last = file_info["last"]
214 if earliest_start == -1 or first < earliest_start:
215 earliest_start = first
216 if latest_end == -1 or last > latest_end:
217 latest_end = last
218 fttc_dal = dal.FixedTimeTCConf(f"ft-trig-{earliest_start}", timestamp_start=earliest_start, timestamp_end=latest_end)
219 db.update_dal(fttc_dal)
220 triggers.append(fttc_dal)
221
222 ftmc_dal = dal.FixedTimeTCMakerModuleConf(f'ft-trig-conf', template_for="FixedTimeTCMakerModule", wait_time_ms=1000, triggers=triggers, tc_type_name="kSupernova")
223 db.update_dal(ftmc_dal)
224
225 db.commit()
module(name, schema, other_dals=[], backend='oksconflibs', db=None)
Definition dal.py:673

◆ get_file_info()

simple_transform_test_gen.get_file_info ( filename)

Definition at line 11 of file simple_transform_test_gen.py.

11def get_file_info(filename):
12
13 if "asset:" in filename:
14 filename = resolve_asset_file(filename)
15
16 with open(filename, 'rb') as ff:
17
18 # Get file type from first DaqEthHeader
19 hdr_size = detdataformats.DAQEthHeader.sizeof()
20 hdr_bin = ff.read(hdr_size)
21 hdr = detdataformats.DAQEthHeader(hdr_bin)
22 detector_id = hdr.det_id
23 ff.seek(0)
24
25 frame_size = 0
26 frame_type = "wibeth"
27 if detector_id in [2, 8, 9]: # PDS
28 frame_size = fddetdataformats.DAPHNEFrame.sizeof()
29 frame_type = "daphne"
30 elif detector_id == 11: # TDE
31 frame_size = fddetdataformats.TDEEthFrame.sizeof()
32 frame_type = "tdeeth"
33 else:
34 frame_size = fddetdataformats.WIBEthFrame.sizeof()
35
36 frame_counter = 0
37 first_timestamp = -1
38 last_timestamp = -1
39 geo_id = GeoID()
40 while True: # Loop over frames in file
41 frame_bin = ff.read(frame_size)
42 if not frame_bin:
43 break
44 frame_counter += 1
45
46 if frame_type == 'wibeth':
47 frame = fddetdataformats.WIBEthFrame(frame_bin)
48 if frame_type == 'tdeeth':
49 frame = fddetdataformats.TDEEthFrame(frame_bin)
50 if frame_type == 'tde16':
51 frame = fddetdataformats.TDE16Frame(frame_bin)
52 if frame_type == 'daphne':
53 frame = fddetdataformats.DAPHNEFrame(frame_bin)
54 if frame_type == 'daphne-stream':
55 frame = fddetdataformats.DAPHNEStreamFrame(frame_bin)
56
57
58 timestamp = frame.get_timestamp()
59 if first_timestamp == -1 or timestamp < first_timestamp:
60 first_timestamp = timestamp
61 if last_timestamp == -1 or timestamp > last_timestamp:
62 last_timestamp = timestamp
63
64 dheader = frame.get_daqheader()
65 stream_id = 0
66 if isinstance(dheader, detdataformats._daq_detdataformats_py.DAQHeader):
67 stream_id = dheader.link_id
68 else:
69 stream_id = dheader.stream_id
70 geo_id = GeoID(det_id=dheader.det_id, crate_id=dheader.crate_id, slot_id=dheader.slot_id, stream_id=stream_id)
71
72 return {"frame_count": frame_counter, "first": first_timestamp, "last": last_timestamp, "geo_id": geo_id}
73

Variable Documentation

◆ GeoID

simple_transform_test_gen.GeoID = namedtuple('GeoID', ['det_id', 'crate_id', 'slot_id', 'stream_id'], defaults=[0,0,0,0])

Definition at line 9 of file simple_transform_test_gen.py.