DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
testapp_noreadout_networkqueue_confgen.py
Go to the documentation of this file.
1# testapp_noreadout_networkqueue_confgen.py
2
3# This python configuration provides a MiniDAQApp v1-style
4# single-application configuration, with connections to and from the
5# TriggerDecisionEmulator going via pairs of QueueToNetwork and
6# NetworkToQueue modules. Since everything is in one application, the
7# only purpose this serves is to test the QueueToNetwork and
8# NetworkToQueue functionality. As with testapp_noreadout_confgen.py
9# in this directory, no modules from the readout package are used: the
10# fragments are provided by the FakeDataProdModule module from dfmodules
11
12
13# Set moo schema search path
14from dunedaq.env import get_moo_model_path
15import moo.io
16moo.io.default_load_path = get_moo_model_path()
17
18# Load configuration types
19import moo.otypes
20moo.otypes.load_types('rcif/cmd.jsonnet')
21moo.otypes.load_types('appfwk/cmd.jsonnet')
22moo.otypes.load_types('appfwk/app.jsonnet')
23moo.otypes.load_types('trigemu/triggerdecisionemulator.jsonnet')
24moo.otypes.load_types('trigemu/faketimesyncsource.jsonnet')
25moo.otypes.load_types('dfmodules/requestgenerator.jsonnet')
26moo.otypes.load_types('dfmodules/fragmentreceiver.jsonnet')
27moo.otypes.load_types('dfmodules/datawriter.jsonnet')
28moo.otypes.load_types('dfmodules/hdf5datastore.jsonnet')
29moo.otypes.load_types('dfmodules/fakedataprod.jsonnet')
30moo.otypes.load_types('nwqueueadapters/queuetonetwork.jsonnet')
31moo.otypes.load_types('nwqueueadapters/networktoqueue.jsonnet')
32moo.otypes.load_types('serialization/networkobjectreceiver.jsonnet')
33moo.otypes.load_types('serialization/networkobjectsender.jsonnet')
34
35# Import new types
36import dunedaq.cmdlib.cmd as basecmd # AddressedCmd,
37import dunedaq.rcif.cmd as rccmd # AddressedCmd,
38import dunedaq.appfwk.cmd as cmd # AddressedCmd,
39import dunedaq.appfwk.app as app # AddressedCmd,
40import dunedaq.trigemu.triggerdecisionemulator as tde
41import dunedaq.trigemu.faketimesyncsource as ftss
42import dunedaq.dfmodules.requestgenerator as rqg
43import dunedaq.dfmodules.fragmentreceiver as ffr
44import dunedaq.dfmodules.datawriter as dw
45import dunedaq.dfmodules.hdf5datastore as hdf5ds
46import dunedaq.dfmodules.fakedataprod as fdp
47import dunedaq.nwqueueadapters.networktoqueue as ntoq
48import dunedaq.nwqueueadapters.queuetonetwork as qton
49import dunedaq.serialization.networkobjectreceiver as nor
50import dunedaq.serialization.networkobjectsender as nos
51
52from appfwk.utils import mcmd, mrccmd, mspec
53
54import json
55import math
56from pprint import pprint
57# Time to waait on pop()
58QUEUE_POP_WAIT_MS=100;
59# local clock speed Hz
60CLOCK_SPEED_HZ = 62500000;
61
62# Checklist for replacing a queue with a qton/ntoq pair:
63
64# 1. Delete the queue `qname` from init's list of queues, and add
65# queues "${qname}_to_netq" and "${qname}_from_netq"
66#
67# 2. Add ntoq and qton modules to init's list of modules
68#
69# 3. In init's list of modules, find all references to `qname` and
70# replace them with "${qname}_to_netq" or "${qname}_from_netq",
71# depending on the queue's direction
72#
73# 4. In conf, add configuration for the ntoq and qton modules
74#
75# 5. In start, add the ntoq and qton module
76#
77# 6. In stop, add the ntoq and qton modules
78
80 NUMBER_OF_DATA_PRODUCERS=2,
81 DATA_RATE_SLOWDOWN_FACTOR = 1,
82 RUN_NUMBER = 333,
83 TRIGGER_RATE_HZ = 1.0,
84 DATA_FILE="./frames.bin",
85 OUTPUT_PATH=".",
86 DISABLE_OUTPUT=False,
87 TOKEN_COUNT=10
88 ):
89
90 trigger_interval_ticks = math.floor((1/TRIGGER_RATE_HZ) * CLOCK_SPEED_HZ/DATA_RATE_SLOWDOWN_FACTOR)
91
92 # Define modules and queues
93 queue_bare_specs = [
94 app.QueueSpec(inst="time_sync_to_netq", kind='FollyMPMCQueue', capacity=100),
95 app.QueueSpec(inst="time_sync_from_netq", kind='FollySPSCQueue', capacity=100),
96
97 app.QueueSpec(inst="token_to_netq", kind='FollySPSCQueue', capacity=20),
98 app.QueueSpec(inst="token_from_netq", kind='FollySPSCQueue', capacity=20),
99
100 app.QueueSpec(inst="trigger_decision_to_netq", kind='FollySPSCQueue', capacity=20),
101 app.QueueSpec(inst="trigger_decision_from_netq", kind='FollySPSCQueue', capacity=20),
102
103 app.QueueSpec(inst="trigger_decision_copy_for_bookkeeping", kind='FollySPSCQueue', capacity=20),
104 app.QueueSpec(inst="trigger_record_q", kind='FollySPSCQueue', capacity=20),
105 app.QueueSpec(inst="data_fragments_q", kind='FollyMPMCQueue', capacity=100),
106 ] + [
107 app.QueueSpec(inst=f"data_requests_{idx}", kind='FollySPSCQueue', capacity=20)
108 for idx in range(NUMBER_OF_DATA_PRODUCERS)
109 ]
110
111
112 # Only needed to reproduce the same order as when using jsonnet
113 queue_specs = app.QueueSpecs(sorted(queue_bare_specs, key=lambda x: x.inst))
114
115
116 mod_specs = [
117 mspec("ntoq_trigdec", "NetworkToQueue", [
118 app.QueueInfo(name="output", inst="trigger_decision_from_netq", dir="output")
119 ]),
120
121 mspec("qton_trigdec", "QueueToNetwork", [
122 app.QueueInfo(name="input", inst="trigger_decision_to_netq", dir="input")
123 ]),
124
125 mspec("ntoq_token", "NetworkToQueue", [
126 app.QueueInfo(name="output", inst="token_from_netq", dir="output")
127 ]),
128
129 mspec("qton_token", "QueueToNetwork", [
130 app.QueueInfo(name="input", inst="token_to_netq", dir="input")
131 ]),
132
133 mspec("ntoq_timesync", "NetworkToQueue", [
134 app.QueueInfo(name="output", inst="time_sync_from_netq", dir="output")
135 ]),
136
137 mspec("qton_timesync", "QueueToNetwork", [
138 app.QueueInfo(name="input", inst="time_sync_to_netq", dir="input")
139 ]),
140
141 mspec("tde", "TriggerDecisionEmulator", [
142 app.QueueInfo(name="time_sync_source", inst="time_sync_from_netq", dir="input"),
143 app.QueueInfo(name="token_source", inst="token_from_netq", dir="input"),
144 app.QueueInfo(name="trigger_decision_sink", inst="trigger_decision_to_netq", dir="output"),
145 ]),
146
147 mspec("rqg", "RequestGenerator", [
148 app.QueueInfo(name="trigger_decision_input_queue", inst="trigger_decision_from_netq", dir="input"),
149 app.QueueInfo(name="trigger_decision_for_event_building", inst="trigger_decision_copy_for_bookkeeping", dir="output"),
150 ] + [
151 app.QueueInfo(name=f"data_request_{idx}_output_queue", inst=f"data_requests_{idx}", dir="output")
152 for idx in range(NUMBER_OF_DATA_PRODUCERS)
153 ]),
154
155 mspec("ffr", "FragmentReceiver", [
156 app.QueueInfo(name="trigger_decision_input_queue", inst="trigger_decision_copy_for_bookkeeping", dir="input"),
157 app.QueueInfo(name="trigger_record_output_queue", inst="trigger_record_q", dir="output"),
158 app.QueueInfo(name="data_fragment_input_queue", inst="data_fragments_q", dir="input"),
159 ]),
160
161 mspec("datawriter", "DataWriterModule", [
162 app.QueueInfo(name="trigger_record_input_queue", inst="trigger_record_q", dir="input"),
163 app.QueueInfo(name="token_output_queue", inst="token_to_netq", dir="output"),
164 ]),
165
166 mspec("fake_timesync_source", "FakeTimeSyncSource", [
167 app.QueueInfo(name="time_sync_sink", inst="time_sync_to_netq", dir="output"),
168 ]),
169
170 ] + [
171
172 mspec(f"fakedataprod_{idx}", "FakeDataProdModule", [
173 app.QueueInfo(name="data_request_input_queue", inst=f"data_requests_{idx}", dir="input"),
174 app.QueueInfo(name="data_fragment_output_queue", inst="data_fragments_q", dir="output"),
175 ]) for idx in range(NUMBER_OF_DATA_PRODUCERS)
176 ]
177
178 init_specs = app.Init(queues=queue_specs, modules=mod_specs)
179
180 jstr = json.dumps(init_specs.pod(), indent=4, sort_keys=True)
181 print(jstr)
182
183 initcmd = rccmd.RCCommand(
184 id=basecmd.CmdId("init"),
185 entry_state="NONE",
186 exit_state="INITIAL",
187 data=init_specs
188 )
189
190 confcmd = mrccmd("conf", "INITIAL", "CONFIGURED",[
191 ("qton_trigdec", qton.Conf(msg_type="dunedaq::dfmessages::TriggerDecision",
192 msg_module_name="TriggerDecisionNQ",
193 sender_config=nos.Conf(ipm_plugin_type="ZmqSender",
194 address= "tcp://127.0.0.1:12345",
195 stype="msgpack")
196 )
197 ),
198
199 ("ntoq_trigdec", ntoq.Conf(msg_type="dunedaq::dfmessages::TriggerDecision",
200 msg_module_name="TriggerDecisionNQ",
201 receiver_config=nor.Conf(ipm_plugin_type="ZmqReceiver",
202 address= "tcp://127.0.0.1:12345")
203 )
204 ),
205
206 ("qton_token", qton.Conf(msg_type="dunedaq::dfmessages::TriggerDecisionToken",
207 msg_module_name="TriggerDecisionTokenNQ",
208 sender_config=nos.Conf(ipm_plugin_type="ZmqSender",
209 address= "tcp://127.0.0.1:12346",
210 stype="msgpack")
211 )
212 ),
213
214 ("ntoq_token", ntoq.Conf(msg_type="dunedaq::dfmessages::TriggerDecisionToken",
215 msg_module_name="TriggerDecisionTokenNQ",
216 receiver_config=nor.Conf(ipm_plugin_type="ZmqReceiver",
217 address= "tcp://127.0.0.1:12346")
218 )
219 ),
220
221 ("qton_timesync", qton.Conf(msg_type="dunedaq::dfmessages::TimeSync",
222 msg_module_name="TimeSyncNQ",
223 sender_config=nos.Conf(ipm_plugin_type="ZmqSender",
224 address= "tcp://127.0.0.1:12347",
225 stype="msgpack")
226 )
227 ),
228
229 ("ntoq_timesync", ntoq.Conf(msg_type="dunedaq::dfmessages::TimeSync",
230 msg_module_name="TimeSyncNQ",
231 receiver_config=nor.Conf(ipm_plugin_type="ZmqReceiver",
232 address= "tcp://127.0.0.1:12347")
233 )
234 ),
235
236 ("tde", tde.ConfParams(
237 sourceids=[idx for idx in range(NUMBER_OF_DATA_PRODUCERS)],
238 min_links_in_request=NUMBER_OF_DATA_PRODUCERS,
239 max_links_in_request=NUMBER_OF_DATA_PRODUCERS,
240 min_readout_window_ticks=1200,
241 max_readout_window_ticks=1200,
242 trigger_window_offset=1000,
243 # The delay is set to put the trigger well within the latency buff
244 trigger_delay_ticks=math.floor( 2* CLOCK_SPEED_HZ/DATA_RATE_SLOWDOWN_FACTOR),
245 # We divide the trigger interval by
246 # DATA_RATE_SLOWDOWN_FACTOR so the triggers are still
247 # emitted per (wall-clock) second, rather than being
248 # spaced out further
249 trigger_interval_ticks=trigger_interval_ticks,
250 clock_frequency_hz=CLOCK_SPEED_HZ/DATA_RATE_SLOWDOWN_FACTOR
251 )),
252 ("rqg", rqg.ConfParams(
253 map=rqg.mapsourceidqueue([
254 rqg.sourceidinst(source_id=idx, queueinstance=f"data_requests_{idx}") for idx in range(NUMBER_OF_DATA_PRODUCERS)
255 ])
256 )),
257 ("ffr", ffr.ConfParams(
258 general_queue_timeout=QUEUE_POP_WAIT_MS
259 )),
260 ("datawriter", dw.ConfParams(
261 initial_token_count=TOKEN_COUNT,
262 data_store_parameters=hdf5ds.ConfParams(
263 name="data_store",
264 # type = "HDF5DataStore", # default
265 directory_path = OUTPUT_PATH, # default
266 # mode = "all-per-file", # default
267 max_file_size_bytes = 1073741834,
268 disable_unique_filename_suffix = False,
269 filename_parameters = hdf5ds.HDF5DataStoreFileNameParams(
270 overall_prefix = "fake_minidaqapp",
271 # digits_for_run_number = 6, #default
272 file_index_prefix = "file"
273 ),
274 file_layout_parameters = hdf5ds.HDF5DataStoreFileLayoutParams(
275 trigger_record_name_prefix= "TriggerRecord",
276 digits_for_trigger_number = 5,
277 )
278 )
279 )),
280 ("fake_timesync_source", ftss.ConfParams(
281 sync_interval_ticks = (CLOCK_SPEED_HZ/DATA_RATE_SLOWDOWN_FACTOR),
282 clock_frequency_hz = (CLOCK_SPEED_HZ/DATA_RATE_SLOWDOWN_FACTOR),
283 )),
284 ] + [
285 (f"fakedataprod_{idx}", fdp.ConfParams(
286 temporarily_hacked_link_number = idx
287 )) for idx in range(NUMBER_OF_DATA_PRODUCERS)
288 ])
289
290 jstr = json.dumps(confcmd.pod(), indent=4, sort_keys=True)
291 print(jstr)
292
293 startpars = rccmd.StartParams(run=RUN_NUMBER, disable_data_storage=DISABLE_OUTPUT)
294 startcmd = mrccmd("start", "CONFIGURED", "RUNNING", [
295 ("ntoq_trigdec", startpars),
296 ("qton_trigdec", startpars),
297 ("ntoq_token", startpars),
298 ("qton_token", startpars),
299 ("ntoq_timesync", startpars),
300 ("qton_timesync", startpars),
301 ("datawriter", startpars),
302 ("ffr", startpars),
303 ("fakedataprod_.*", startpars),
304 ("rqg", startpars),
305 ("fake_timesync_source", startpars),
306 ("tde", startpars),
307 ])
308
309 jstr = json.dumps(startcmd.pod(), indent=4, sort_keys=True)
310 print("="*80+"\nStart\n\n", jstr)
311
312
313 stopcmd = mrccmd("stop", "RUNNING", "CONFIGURED", [
314 ("ntoq_trigdec", None),
315 ("qton_trigdec", None),
316 ("ntoq_timesync", None),
317 ("qton_timesync", None),
318 ("ntoq_token", None),
319 ("qton_token", None),
320 ("fake_timesync_source", None),
321 ("tde", None),
322 ("rqg", None),
323 ("fakedataprod_.*", None),
324 ("ffr", None),
325 ("datawriter", None),
326 ])
327
328 jstr = json.dumps(stopcmd.pod(), indent=4, sort_keys=True)
329 print("="*80+"\nStop\n\n", jstr)
330
331 pausecmd = mrccmd("pause", "RUNNING", "RUNNING", [
332 ("", None)
333 ])
334
335 jstr = json.dumps(pausecmd.pod(), indent=4, sort_keys=True)
336 print("="*80+"\nPause\n\n", jstr)
337
338 resumecmd = mrccmd("resume", "RUNNING", "RUNNING", [
339 ("tde", tde.ResumeParams(
340 trigger_interval_ticks=trigger_interval_ticks
341 ))
342 ])
343
344 jstr = json.dumps(resumecmd.pod(), indent=4, sort_keys=True)
345 print("="*80+"\nResume\n\n", jstr)
346
347 scrapcmd = mcmd("scrap", [
348 ("", None)
349 ])
350
351 jstr = json.dumps(scrapcmd.pod(), indent=4, sort_keys=True)
352 print("="*80+"\nScrap\n\n", jstr)
353
354 # Create a list of commands
355 cmd_seq = [initcmd, confcmd, startcmd, stopcmd, pausecmd, resumecmd, scrapcmd]
356
357 # Print them as json (to be improved/moved out)
358 jstr = json.dumps([c.pod() for c in cmd_seq], indent=4, sort_keys=True)
359 return jstr
360
361if __name__ == '__main__':
362 # Add -h as default help option
363 CONTEXT_SETTINGS = dict(help_option_names=['-h', '--help'])
364
365 import click
366
367 @click.command(context_settings=CONTEXT_SETTINGS)
368 @click.option('-n', '--number-of-data-producers', default=2)
369 @click.option('-s', '--data-rate-slowdown-factor', default=1)
370 @click.option('-r', '--run-number', default=333)
371 @click.option('-t', '--trigger-rate-hz', default=1.0)
372 @click.option('-d', '--data-file', type=click.Path(), default='./frames.bin')
373 @click.option('-o', '--output-path', type=click.Path(), default='.')
374 @click.option('--disable-data-storage', is_flag=True)
375 @click.option('-c', '--token-count', default=10)
376 @click.argument('json_file', type=click.Path(), default='testapp-noreadout-networkqueue.json')
377 def cli(number_of_data_producers, data_rate_slowdown_factor, run_number, trigger_rate_hz, data_file, output_path, disable_data_storage,token_count, json_file):
378 """
379 JSON_FILE: Input raw data file.
380 JSON_FILE: Output json configuration file.
381 """
382
383 with open(json_file, 'w') as f:
384 f.write(generate(
385 NUMBER_OF_DATA_PRODUCERS = number_of_data_producers,
386 DATA_RATE_SLOWDOWN_FACTOR = data_rate_slowdown_factor,
387 RUN_NUMBER = run_number,
388 TRIGGER_RATE_HZ = trigger_rate_hz,
389 DATA_FILE = data_file,
390 OUTPUT_PATH = output_path,
391 DISABLE_OUTPUT = disable_data_storage,
392 TOKEN_COUNT = token_count
393 ))
394
395 print(f"'{json_file}' generation completed.")
396
397 cli()
398
cli(number_of_data_producers, data_rate_slowdown_factor, run_number, trigger_rate_hz, data_file, output_path, disable_data_storage, token_count, json_file)