88 ):
89
90 trigger_interval_ticks = math.floor((1/TRIGGER_RATE_HZ) * CLOCK_SPEED_HZ/DATA_RATE_SLOWDOWN_FACTOR)
91
92
93 queue_bare_specs = [
94 app.QueueSpec(inst="time_sync_to_netq", kind='FollyMPMCQueue', capacity=100),
95 app.QueueSpec(inst="time_sync_from_netq", kind='FollySPSCQueue', capacity=100),
96
97 app.QueueSpec(inst="token_to_netq", kind='FollySPSCQueue', capacity=20),
98 app.QueueSpec(inst="token_from_netq", kind='FollySPSCQueue', capacity=20),
99
100 app.QueueSpec(inst="trigger_decision_to_netq", kind='FollySPSCQueue', capacity=20),
101 app.QueueSpec(inst="trigger_decision_from_netq", kind='FollySPSCQueue', capacity=20),
102
103 app.QueueSpec(inst="trigger_decision_copy_for_bookkeeping", kind='FollySPSCQueue', capacity=20),
104 app.QueueSpec(inst="trigger_record_q", kind='FollySPSCQueue', capacity=20),
105 app.QueueSpec(inst="data_fragments_q", kind='FollyMPMCQueue', capacity=100),
106 ] + [
107 app.QueueSpec(inst=f"data_requests_{idx}", kind='FollySPSCQueue', capacity=20)
108 for idx in range(NUMBER_OF_DATA_PRODUCERS)
109 ]
110
111
112
113 queue_specs = app.QueueSpecs(sorted(queue_bare_specs, key=lambda x: x.inst))
114
115
116 mod_specs = [
117 mspec("ntoq_trigdec", "NetworkToQueue", [
118 app.QueueInfo(name="output", inst="trigger_decision_from_netq", dir="output")
119 ]),
120
121 mspec("qton_trigdec", "QueueToNetwork", [
122 app.QueueInfo(name="input", inst="trigger_decision_to_netq", dir="input")
123 ]),
124
125 mspec("ntoq_token", "NetworkToQueue", [
126 app.QueueInfo(name="output", inst="token_from_netq", dir="output")
127 ]),
128
129 mspec("qton_token", "QueueToNetwork", [
130 app.QueueInfo(name="input", inst="token_to_netq", dir="input")
131 ]),
132
133 mspec("ntoq_timesync", "NetworkToQueue", [
134 app.QueueInfo(name="output", inst="time_sync_from_netq", dir="output")
135 ]),
136
137 mspec("qton_timesync", "QueueToNetwork", [
138 app.QueueInfo(name="input", inst="time_sync_to_netq", dir="input")
139 ]),
140
141 mspec("tde", "TriggerDecisionEmulator", [
142 app.QueueInfo(name="time_sync_source", inst="time_sync_from_netq", dir="input"),
143 app.QueueInfo(name="token_source", inst="token_from_netq", dir="input"),
144 app.QueueInfo(name="trigger_decision_sink", inst="trigger_decision_to_netq", dir="output"),
145 ]),
146
147 mspec("rqg", "RequestGenerator", [
148 app.QueueInfo(name="trigger_decision_input_queue", inst="trigger_decision_from_netq", dir="input"),
149 app.QueueInfo(name="trigger_decision_for_event_building", inst="trigger_decision_copy_for_bookkeeping", dir="output"),
150 ] + [
151 app.QueueInfo(name=f"data_request_{idx}_output_queue", inst=f"data_requests_{idx}", dir="output")
152 for idx in range(NUMBER_OF_DATA_PRODUCERS)
153 ]),
154
155 mspec("ffr", "FragmentReceiver", [
156 app.QueueInfo(name="trigger_decision_input_queue", inst="trigger_decision_copy_for_bookkeeping", dir="input"),
157 app.QueueInfo(name="trigger_record_output_queue", inst="trigger_record_q", dir="output"),
158 app.QueueInfo(name="data_fragment_input_queue", inst="data_fragments_q", dir="input"),
159 ]),
160
161 mspec("datawriter", "DataWriterModule", [
162 app.QueueInfo(name="trigger_record_input_queue", inst="trigger_record_q", dir="input"),
163 app.QueueInfo(name="token_output_queue", inst="token_to_netq", dir="output"),
164 ]),
165
166 mspec("fake_timesync_source", "FakeTimeSyncSource", [
167 app.QueueInfo(name="time_sync_sink", inst="time_sync_to_netq", dir="output"),
168 ]),
169
170 ] + [
171
172 mspec(f"fakedataprod_{idx}", "FakeDataProdModule", [
173 app.QueueInfo(name="data_request_input_queue", inst=f"data_requests_{idx}", dir="input"),
174 app.QueueInfo(name="data_fragment_output_queue", inst="data_fragments_q", dir="output"),
175 ]) for idx in range(NUMBER_OF_DATA_PRODUCERS)
176 ]
177
178 init_specs = app.Init(queues=queue_specs, modules=mod_specs)
179
180 jstr = json.dumps(init_specs.pod(), indent=4, sort_keys=True)
181 print(jstr)
182
183 initcmd = rccmd.RCCommand(
184 id=basecmd.CmdId("init"),
185 entry_state="NONE",
186 exit_state="INITIAL",
187 data=init_specs
188 )
189
190 confcmd = mrccmd("conf", "INITIAL", "CONFIGURED",[
191 ("qton_trigdec", qton.Conf(msg_type="dunedaq::dfmessages::TriggerDecision",
192 msg_module_name="TriggerDecisionNQ",
193 sender_config=nos.Conf(ipm_plugin_type="ZmqSender",
194 address= "tcp://127.0.0.1:12345",
195 stype="msgpack")
196 )
197 ),
198
199 ("ntoq_trigdec", ntoq.Conf(msg_type="dunedaq::dfmessages::TriggerDecision",
200 msg_module_name="TriggerDecisionNQ",
201 receiver_config=nor.Conf(ipm_plugin_type="ZmqReceiver",
202 address= "tcp://127.0.0.1:12345")
203 )
204 ),
205
206 ("qton_token", qton.Conf(msg_type="dunedaq::dfmessages::TriggerDecisionToken",
207 msg_module_name="TriggerDecisionTokenNQ",
208 sender_config=nos.Conf(ipm_plugin_type="ZmqSender",
209 address= "tcp://127.0.0.1:12346",
210 stype="msgpack")
211 )
212 ),
213
214 ("ntoq_token", ntoq.Conf(msg_type="dunedaq::dfmessages::TriggerDecisionToken",
215 msg_module_name="TriggerDecisionTokenNQ",
216 receiver_config=nor.Conf(ipm_plugin_type="ZmqReceiver",
217 address= "tcp://127.0.0.1:12346")
218 )
219 ),
220
221 ("qton_timesync", qton.Conf(msg_type="dunedaq::dfmessages::TimeSync",
222 msg_module_name="TimeSyncNQ",
223 sender_config=nos.Conf(ipm_plugin_type="ZmqSender",
224 address= "tcp://127.0.0.1:12347",
225 stype="msgpack")
226 )
227 ),
228
229 ("ntoq_timesync", ntoq.Conf(msg_type="dunedaq::dfmessages::TimeSync",
230 msg_module_name="TimeSyncNQ",
231 receiver_config=nor.Conf(ipm_plugin_type="ZmqReceiver",
232 address= "tcp://127.0.0.1:12347")
233 )
234 ),
235
236 ("tde", tde.ConfParams(
237 sourceids=[idx for idx in range(NUMBER_OF_DATA_PRODUCERS)],
238 min_links_in_request=NUMBER_OF_DATA_PRODUCERS,
239 max_links_in_request=NUMBER_OF_DATA_PRODUCERS,
240 min_readout_window_ticks=1200,
241 max_readout_window_ticks=1200,
242 trigger_window_offset=1000,
243
244 trigger_delay_ticks=math.floor( 2* CLOCK_SPEED_HZ/DATA_RATE_SLOWDOWN_FACTOR),
245
246
247
248
249 trigger_interval_ticks=trigger_interval_ticks,
250 clock_frequency_hz=CLOCK_SPEED_HZ/DATA_RATE_SLOWDOWN_FACTOR
251 )),
252 ("rqg", rqg.ConfParams(
253 map=rqg.mapsourceidqueue([
254 rqg.sourceidinst(source_id=idx, queueinstance=f"data_requests_{idx}") for idx in range(NUMBER_OF_DATA_PRODUCERS)
255 ])
256 )),
257 ("ffr", ffr.ConfParams(
258 general_queue_timeout=QUEUE_POP_WAIT_MS
259 )),
260 ("datawriter", dw.ConfParams(
261 initial_token_count=TOKEN_COUNT,
262 data_store_parameters=hdf5ds.ConfParams(
263 name="data_store",
264
265 directory_path = OUTPUT_PATH,
266
267 max_file_size_bytes = 1073741834,
268 disable_unique_filename_suffix = False,
269 filename_parameters = hdf5ds.HDF5DataStoreFileNameParams(
270 overall_prefix = "fake_minidaqapp",
271
272 file_index_prefix = "file"
273 ),
274 file_layout_parameters = hdf5ds.HDF5DataStoreFileLayoutParams(
275 trigger_record_name_prefix= "TriggerRecord",
276 digits_for_trigger_number = 5,
277 )
278 )
279 )),
280 ("fake_timesync_source", ftss.ConfParams(
281 sync_interval_ticks = (CLOCK_SPEED_HZ/DATA_RATE_SLOWDOWN_FACTOR),
282 clock_frequency_hz = (CLOCK_SPEED_HZ/DATA_RATE_SLOWDOWN_FACTOR),
283 )),
284 ] + [
285 (f"fakedataprod_{idx}", fdp.ConfParams(
286 temporarily_hacked_link_number = idx
287 )) for idx in range(NUMBER_OF_DATA_PRODUCERS)
288 ])
289
290 jstr = json.dumps(confcmd.pod(), indent=4, sort_keys=True)
291 print(jstr)
292
293 startpars = rccmd.StartParams(run=RUN_NUMBER, disable_data_storage=DISABLE_OUTPUT)
294 startcmd = mrccmd("start", "CONFIGURED", "RUNNING", [
295 ("ntoq_trigdec", startpars),
296 ("qton_trigdec", startpars),
297 ("ntoq_token", startpars),
298 ("qton_token", startpars),
299 ("ntoq_timesync", startpars),
300 ("qton_timesync", startpars),
301 ("datawriter", startpars),
302 ("ffr", startpars),
303 ("fakedataprod_.*", startpars),
304 ("rqg", startpars),
305 ("fake_timesync_source", startpars),
306 ("tde", startpars),
307 ])
308
309 jstr = json.dumps(startcmd.pod(), indent=4, sort_keys=True)
310 print("="*80+"\nStart\n\n", jstr)
311
312
313 stopcmd = mrccmd("stop", "RUNNING", "CONFIGURED", [
314 ("ntoq_trigdec", None),
315 ("qton_trigdec", None),
316 ("ntoq_timesync", None),
317 ("qton_timesync", None),
318 ("ntoq_token", None),
319 ("qton_token", None),
320 ("fake_timesync_source", None),
321 ("tde", None),
322 ("rqg", None),
323 ("fakedataprod_.*", None),
324 ("ffr", None),
325 ("datawriter", None),
326 ])
327
328 jstr = json.dumps(stopcmd.pod(), indent=4, sort_keys=True)
329 print("="*80+"\nStop\n\n", jstr)
330
331 pausecmd = mrccmd("pause", "RUNNING", "RUNNING", [
332 ("", None)
333 ])
334
335 jstr = json.dumps(pausecmd.pod(), indent=4, sort_keys=True)
336 print("="*80+"\nPause\n\n", jstr)
337
338 resumecmd = mrccmd("resume", "RUNNING", "RUNNING", [
339 ("tde", tde.ResumeParams(
340 trigger_interval_ticks=trigger_interval_ticks
341 ))
342 ])
343
344 jstr = json.dumps(resumecmd.pod(), indent=4, sort_keys=True)
345 print("="*80+"\nResume\n\n", jstr)
346
347 scrapcmd = mcmd("scrap", [
348 ("", None)
349 ])
350
351 jstr = json.dumps(scrapcmd.pod(), indent=4, sort_keys=True)
352 print("="*80+"\nScrap\n\n", jstr)
353
354
355 cmd_seq = [initcmd, confcmd, startcmd, stopcmd, pausecmd, resumecmd, scrapcmd]
356
357
358 jstr = json.dumps([c.pod() for c in cmd_seq], indent=4, sort_keys=True)
359 return jstr
360