DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
testapp_noreadout_two_process.py
Go to the documentation of this file.
1# testapp_noreadout_two_process.py
2
3# This python configuration produces *two* json configuration files
4# that together form a MiniDAQApp with the same functionality as
5# MiniDAQApp v1, but in two processes. One process contains the
6# TriggerDecisionEmulator, while the other process contains everything
7# else. The network communication is done with the QueueToNetwork and
8# NetworkToQueue modules from the nwqueueadapters package.
9#
10# As with testapp_noreadout_confgen.py
11# in this directory, no modules from the readout package are used: the
12# fragments are provided by the FakeDataProdModule module from dfmodules
13
14
15# Set moo schema search path
16from dunedaq.env import get_moo_model_path
17import moo.io
18moo.io.default_load_path = get_moo_model_path()
19
20# Load configuration types
21import moo.otypes
22moo.otypes.load_types('rcif/cmd.jsonnet')
23moo.otypes.load_types('appfwk/cmd.jsonnet')
24moo.otypes.load_types('appfwk/app.jsonnet')
25
26moo.otypes.load_types('trigemu/triggerdecisionemulator.jsonnet')
27moo.otypes.load_types('trigemu/faketimesyncsource.jsonnet')
28moo.otypes.load_types('dfmodules/requestgenerator.jsonnet')
29moo.otypes.load_types('dfmodules/fragmentreceiver.jsonnet')
30moo.otypes.load_types('dfmodules/datawriter.jsonnet')
31moo.otypes.load_types('dfmodules/hdf5datastore.jsonnet')
32moo.otypes.load_types('dfmodules/fakedataprod.jsonnet')
33moo.otypes.load_types('nwqueueadapters/queuetonetwork.jsonnet')
34moo.otypes.load_types('nwqueueadapters/networktoqueue.jsonnet')
35moo.otypes.load_types('serialization/networkobjectreceiver.jsonnet')
36moo.otypes.load_types('serialization/networkobjectsender.jsonnet')
37
38# Import new types
39import dunedaq.cmdlib.cmd as basecmd # AddressedCmd,
40import dunedaq.rcif.cmd as rccmd # AddressedCmd,
41import dunedaq.appfwk.cmd as cmd # AddressedCmd,
42import dunedaq.appfwk.app as app # AddressedCmd,
43import dunedaq.trigemu.triggerdecisionemulator as tde
44import dunedaq.trigemu.faketimesyncsource as ftss
45import dunedaq.dfmodules.requestgenerator as rqg
46import dunedaq.dfmodules.fragmentreceiver as ffr
47import dunedaq.dfmodules.datawriter as dw
48import dunedaq.dfmodules.hdf5datastore as hdf5ds
49import dunedaq.dfmodules.fakedataprod as fdp
50import dunedaq.nwqueueadapters.networktoqueue as ntoq
51import dunedaq.nwqueueadapters.queuetonetwork as qton
52import dunedaq.serialization.networkobjectreceiver as nor
53import dunedaq.serialization.networkobjectsender as nos
54
55from appfwk.utils import mcmd, mrccmd, mspec
56
57import json
58import math
59from pprint import pprint
60# Time to wait on pop()
61QUEUE_POP_WAIT_MS=100;
62# local clock speed Hz
63CLOCK_SPEED_HZ = 62500000;
64
65
67 network_endpoints,
68 NUMBER_OF_DATA_PRODUCERS=2,
69 DATA_RATE_SLOWDOWN_FACTOR = 1,
70 RUN_NUMBER = 333,
71 TRIGGER_RATE_HZ = 1.0,
72 DATA_FILE="./frames.bin",
73 OUTPUT_PATH=".",
74 DISABLE_OUTPUT=False,
75 TOKEN_COUNT=10
76 ):
77 """Generate the json configuration for the readout and DF process"""
78
79 trigger_interval_ticks = math.floor((1/TRIGGER_RATE_HZ) * CLOCK_SPEED_HZ/DATA_RATE_SLOWDOWN_FACTOR)
80
81 # Define modules and queues
82 queue_bare_specs = [
83 app.QueueSpec(inst="time_sync_to_netq", kind='FollyMPMCQueue', capacity=100),
84 app.QueueSpec(inst="token_to_netq", kind='FollySPSCQueue', capacity=20),
85 app.QueueSpec(inst="trigger_decision_from_netq", kind='FollySPSCQueue', capacity=20),
86 app.QueueSpec(inst="trigger_decision_copy_for_bookkeeping", kind='FollySPSCQueue', capacity=20),
87 app.QueueSpec(inst="trigger_record_q", kind='FollySPSCQueue', capacity=20),
88 app.QueueSpec(inst="data_fragments_q", kind='FollyMPMCQueue', capacity=100),
89 ] + [
90 app.QueueSpec(inst=f"data_requests_{idx}", kind='FollySPSCQueue', capacity=20)
91 for idx in range(NUMBER_OF_DATA_PRODUCERS)
92 ]
93
94
95 # Only needed to reproduce the same order as when using jsonnet
96 queue_specs = app.QueueSpecs(sorted(queue_bare_specs, key=lambda x: x.inst))
97
98
99 mod_specs = [
100 mspec("ntoq_trigdec", "NetworkToQueue", [
101 app.QueueInfo(name="output", inst="trigger_decision_from_netq", dir="output")
102 ]),
103
104 mspec("qton_token", "QueueToNetwork", [
105 app.QueueInfo(name="input", inst="token_to_netq", dir="input")
106 ]),
107
108 mspec("qton_timesync", "QueueToNetwork", [
109 app.QueueInfo(name="input", inst="time_sync_to_netq", dir="input")
110 ]),
111
112 mspec("rqg", "RequestGenerator", [
113 app.QueueInfo(name="trigger_decision_input_queue", inst="trigger_decision_from_netq", dir="input"),
114 app.QueueInfo(name="trigger_decision_for_event_building", inst="trigger_decision_copy_for_bookkeeping", dir="output"),
115 ] + [
116 app.QueueInfo(name=f"data_request_{idx}_output_queue", inst=f"data_requests_{idx}", dir="output")
117 for idx in range(NUMBER_OF_DATA_PRODUCERS)
118 ]),
119
120 mspec("ffr", "FragmentReceiver", [
121 app.QueueInfo(name="trigger_decision_input_queue", inst="trigger_decision_copy_for_bookkeeping", dir="input"),
122 app.QueueInfo(name="trigger_record_output_queue", inst="trigger_record_q", dir="output"),
123 app.QueueInfo(name="data_fragment_input_queue", inst="data_fragments_q", dir="input"),
124 ]),
125
126 mspec("datawriter", "DataWriterModule", [
127 app.QueueInfo(name="trigger_record_input_queue", inst="trigger_record_q", dir="input"),
128 app.QueueInfo(name="token_output_queue", inst="token_to_netq", dir="output"),
129 ]),
130
131 mspec("fake_timesync_source", "FakeTimeSyncSource", [
132 app.QueueInfo(name="time_sync_sink", inst="time_sync_to_netq", dir="output"),
133 ]),
134
135 ] + [
136
137 mspec(f"fakedataprod_{idx}", "FakeDataProdModule", [
138 app.QueueInfo(name="data_request_input_queue", inst=f"data_requests_{idx}", dir="input"),
139 app.QueueInfo(name="data_fragment_output_queue", inst="data_fragments_q", dir="output"),
140 ]) for idx in range(NUMBER_OF_DATA_PRODUCERS)
141 ]
142
143 init_specs = app.Init(queues=queue_specs, modules=mod_specs)
144
145 initcmd = rccmd.RCCommand(
146 id=basecmd.CmdId("init"),
147 entry_state="NONE",
148 exit_state="INITIAL",
149 data=init_specs
150 )
151
152 confcmd = mrccmd("conf", "INITIAL", "CONFIGURED",[
153 ("ntoq_trigdec", ntoq.Conf(msg_type="dunedaq::dfmessages::TriggerDecision",
154 msg_module_name="TriggerDecisionNQ",
155 receiver_config=nor.Conf(ipm_plugin_type="ZmqReceiver",
156 address=network_endpoints["trigdec"])
157 )
158 ),
159
160 ("qton_token", qton.Conf(msg_type="dunedaq::dfmessages::TriggerDecisionToken",
161 msg_module_name="TriggerDecisionTokenNQ",
162 sender_config=nos.Conf(ipm_plugin_type="ZmqSender",
163 address=network_endpoints["triginh"],
164 stype="msgpack")
165 )
166 ),
167
168 ("qton_timesync", qton.Conf(msg_type="dunedaq::dfmessages::TimeSync",
169 msg_module_name="TimeSyncNQ",
170 sender_config=nos.Conf(ipm_plugin_type="ZmqSender",
171 address=network_endpoints["timesync"],
172 stype="msgpack")
173 )
174 ),
175
176 ("rqg", rqg.ConfParams(
177 map=rqg.mapgeoidqueue([
178 rqg.geoidinst(apa=0, link=idx, queueinstance=f"data_requests_{idx}") for idx in range(NUMBER_OF_DATA_PRODUCERS)
179 ])
180 )),
181 ("ffr", ffr.ConfParams(
182 general_queue_timeout=QUEUE_POP_WAIT_MS
183 )),
184 ("datawriter", dw.ConfParams(
185 initial_token_count=TOKEN_COUNT,
186 data_store_parameters=hdf5ds.ConfParams(
187 name="data_store",
188 # type = "HDF5DataStore", # default
189 directory_path = OUTPUT_PATH, # default
190 # mode = "all-per-file", # default
191 max_file_size_bytes = 1073741834,
192 disable_unique_filename_suffix = False,
193 filename_parameters = hdf5ds.HDF5DataStoreFileNameParams(
194 overall_prefix = "fake_minidaqapp",
195 # digits_for_run_number = 6, #default
196 file_index_prefix = "file"
197 ),
198 file_layout_parameters = hdf5ds.HDF5DataStoreFileLayoutParams(
199 trigger_record_name_prefix= "TriggerRecord",
200 digits_for_trigger_number = 5,
201 )
202 )
203 )),
204 ("fake_timesync_source", ftss.ConfParams(
205 sync_interval_ticks = (CLOCK_SPEED_HZ/DATA_RATE_SLOWDOWN_FACTOR),
206 clock_frequency_hz = (CLOCK_SPEED_HZ/DATA_RATE_SLOWDOWN_FACTOR),
207 )),
208 ] + [
209 (f"fakedataprod_{idx}", fdp.ConfParams(
210 temporarily_hacked_link_number = idx
211 )) for idx in range(NUMBER_OF_DATA_PRODUCERS)
212 ])
213
214 startpars = rccmd.StartParams(run=RUN_NUMBER, disable_data_storage=DISABLE_OUTPUT)
215 startcmd = mrccmd("start", "CONFIGURED", "RUNNING", [
216 ("ntoq_trigdec", startpars),
217 ("qton_token", startpars),
218 ("qton_timesync", startpars),
219 ("datawriter", startpars),
220 ("ffr", startpars),
221 ("fakedataprod_.*", startpars),
222 ("rqg", startpars),
223 ("fake_timesync_source", startpars),
224 ])
225
226 stopcmd = mrccmd("stop", "RUNNING", "CONFIGURED", [
227 ("ntoq_trigdec", None),
228 ("qton_timesync", None),
229 ("qton_token", None),
230 ("fake_timesync_source", None),
231 ("rqg", None),
232 ("fakedataprod_.*", None),
233 ("ffr", None),
234 ("datawriter", None),
235 ])
236
237 scrapcmd = mcmd("scrap", [
238 ("", None)
239 ])
240
241 # Create a list of commands
242 cmd_seq = [initcmd, confcmd, startcmd, stopcmd, scrapcmd]
243
244 # Print them as json (to be improved/moved out)
245 jstr = json.dumps([c.pod() for c in cmd_seq], indent=4, sort_keys=True)
246 return jstr
247
248#===============================================================================
250 network_endpoints,
251 NUMBER_OF_DATA_PRODUCERS=2,
252 DATA_RATE_SLOWDOWN_FACTOR = 1,
253 RUN_NUMBER = 333,
254 TRIGGER_RATE_HZ = 1.0,
255 DATA_FILE="./frames.bin",
256 OUTPUT_PATH=".",
257 ):
258 """Generate the json config for the TriggerDecisionEmulator process"""
259
260 trigger_interval_ticks = math.floor((1/TRIGGER_RATE_HZ) * CLOCK_SPEED_HZ/DATA_RATE_SLOWDOWN_FACTOR)
261
262 # Define modules and queues
263 queue_bare_specs = [
264 app.QueueSpec(inst="time_sync_from_netq", kind='FollySPSCQueue', capacity=100),
265 app.QueueSpec(inst="token_from_netq", kind='FollySPSCQueue', capacity=20),
266 app.QueueSpec(inst="trigger_decision_to_netq", kind='FollySPSCQueue', capacity=20),
267 ]
268
269 # Only needed to reproduce the same order as when using jsonnet
270 queue_specs = app.QueueSpecs(sorted(queue_bare_specs, key=lambda x: x.inst))
271
272
273 mod_specs = [
274 mspec("qton_trigdec", "QueueToNetwork", [
275 app.QueueInfo(name="input", inst="trigger_decision_to_netq", dir="input")
276 ]),
277
278 mspec("ntoq_token", "NetworkToQueue", [
279 app.QueueInfo(name="output", inst="token_from_netq", dir="output")
280 ]),
281
282 mspec("ntoq_timesync", "NetworkToQueue", [
283 app.QueueInfo(name="output", inst="time_sync_from_netq", dir="output")
284 ]),
285
286 mspec("tde", "TriggerDecisionEmulator", [
287 app.QueueInfo(name="time_sync_source", inst="time_sync_from_netq", dir="input"),
288 app.QueueInfo(name="token_source", inst="token_from_netq", dir="input"),
289 app.QueueInfo(name="trigger_decision_sink", inst="trigger_decision_to_netq", dir="output"),
290 ]),
291 ]
292
293 init_specs = app.Init(queues=queue_specs, modules=mod_specs)
294
295 initcmd = rccmd.RCCommand(
296 id=basecmd.CmdId("init"),
297 entry_state="NONE",
298 exit_state="INITIAL",
299 data=init_specs
300 )
301
302 confcmd = mrccmd("conf", "INITIAL", "CONFIGURED",[
303 ("qton_trigdec", qton.Conf(msg_type="dunedaq::dfmessages::TriggerDecision",
304 msg_module_name="TriggerDecisionNQ",
305 sender_config=nos.Conf(ipm_plugin_type="ZmqSender",
306 address=network_endpoints["trigdec"],
307 stype="msgpack")
308 )
309 ),
310
311 ("ntoq_token", ntoq.Conf(msg_type="dunedaq::dfmessages::TriggerDecisionToken",
312 msg_module_name="TriggerDecisionTokenNQ",
313 receiver_config=nor.Conf(ipm_plugin_type="ZmqReceiver",
314 address=network_endpoints["triginh"])
315 )
316 ),
317
318 ("ntoq_timesync", ntoq.Conf(msg_type="dunedaq::dfmessages::TimeSync",
319 msg_module_name="TimeSyncNQ",
320 receiver_config=nor.Conf(ipm_plugin_type="ZmqReceiver",
321 address=network_endpoints["timesync"])
322 )
323 ),
324
325 ("tde", tde.ConfParams(
326 links=[idx for idx in range(NUMBER_OF_DATA_PRODUCERS)],
327 min_links_in_request=NUMBER_OF_DATA_PRODUCERS,
328 max_links_in_request=NUMBER_OF_DATA_PRODUCERS,
329 min_readout_window_ticks=1200,
330 max_readout_window_ticks=1200,
331 trigger_window_offset=1000,
332 # The delay is set to put the trigger well within the latency buff
333 trigger_delay_ticks=math.floor( 2* CLOCK_SPEED_HZ/DATA_RATE_SLOWDOWN_FACTOR),
334 # We divide the trigger interval by
335 # DATA_RATE_SLOWDOWN_FACTOR so the triggers are still
336 # emitted per (wall-clock) second, rather than being
337 # spaced out further
338 trigger_interval_ticks=trigger_interval_ticks,
339 clock_frequency_hz=CLOCK_SPEED_HZ/DATA_RATE_SLOWDOWN_FACTOR
340 )),
341 ])
342
343 startpars = rccmd.StartParams(run=RUN_NUMBER, disable_data_storage=False)
344 startcmd = mrccmd("start", "CONFIGURED", "RUNNING", [
345 ("qton_trigdec", startpars),
346 ("ntoq_token", startpars),
347 ("ntoq_timesync", startpars),
348 ("tde", startpars),
349 ])
350
351 stopcmd = mrccmd("stop", "RUNNING", "CONFIGURED", [
352 ("qton_trigdec", None),
353 ("ntoq_timesync", None),
354 ("ntoq_token", None),
355 ("tde", None),
356 ])
357
358 pausecmd = mrccmd("pause", "RUNNING", "RUNNING", [
359 ("", None)
360 ])
361
362 resumecmd = mrccmd("resume", "RUNNING", "RUNNING", [
363 ("tde", tde.ResumeParams(
364 trigger_interval_ticks=trigger_interval_ticks
365 ))
366 ])
367
368 scrapcmd = mcmd("scrap", [
369 ("", None)
370 ])
371
372 # Create a list of commands
373 cmd_seq = [initcmd, confcmd, startcmd, stopcmd, pausecmd, resumecmd, scrapcmd]
374
375 # Print them as json (to be improved/moved out)
376 jstr = json.dumps([c.pod() for c in cmd_seq], indent=4, sort_keys=True)
377 return jstr
378
379if __name__ == '__main__':
380 # Add -h as default help option
381 CONTEXT_SETTINGS = dict(help_option_names=['-h', '--help'])
382
383 import click
384
385 @click.command(context_settings=CONTEXT_SETTINGS)
386 @click.option('-n', '--number-of-data-producers', default=2)
387 @click.option('-s', '--data-rate-slowdown-factor', default=1)
388 @click.option('-r', '--run-number', default=333)
389 @click.option('-t', '--trigger-rate-hz', default=1.0)
390 @click.option('-d', '--data-file', type=click.Path(), default='./frames.bin')
391 @click.option('-o', '--output-path', type=click.Path(), default='.')
392 @click.option('--disable-data-storage', is_flag=True)
393 @click.option('-c', '--token-count', default=10)
394 @click.option('--host-ip-df', default='127.0.0.1')
395 @click.option('--host-ip-trigemu', default='127.0.0.1')
396 @click.argument('json_file_base', type=click.Path(), default='testapp-noreadout-two-process')
397 def cli(number_of_data_producers, data_rate_slowdown_factor, run_number, trigger_rate_hz, data_file, output_path, disable_data_storage, token_count, host_ip_df, host_ip_trigemu, json_file_base):
398 """
399 JSON_FILE: Input raw data file.
400 JSON_FILE: Output json configuration file.
401 """
402
403 json_file_trigemu=json_file_base+"-trigemu.json"
404 json_file_df=json_file_base+"-df.json"
405
406 network_endpoints={
407 "trigdec" : f"tcp://{host_ip_trigemu}:12345",
408 "triginh" : f"tcp://{host_ip_df}:12346",
409 "timesync": f"tcp://{host_ip_df}:12347"
410 }
411
412 with open(json_file_trigemu, 'w') as f:
413 f.write(generate_trigemu(
414 network_endpoints,
415 NUMBER_OF_DATA_PRODUCERS = number_of_data_producers,
416 DATA_RATE_SLOWDOWN_FACTOR = data_rate_slowdown_factor,
417 RUN_NUMBER = run_number,
418 TRIGGER_RATE_HZ = trigger_rate_hz,
419 DATA_FILE = data_file,
420 OUTPUT_PATH = output_path
421 ))
422
423 with open(json_file_df, 'w') as f:
424 f.write(generate_df(
425 network_endpoints,
426 NUMBER_OF_DATA_PRODUCERS = number_of_data_producers,
427 DATA_RATE_SLOWDOWN_FACTOR = data_rate_slowdown_factor,
428 RUN_NUMBER = run_number,
429 TRIGGER_RATE_HZ = trigger_rate_hz,
430 DATA_FILE = data_file,
431 OUTPUT_PATH = output_path,
432 DISABLE_OUTPUT = disable_data_storage,
433 TOKEN_COUNT = token_count
434 ))
435
436 cli()
437
cli(number_of_data_producers, data_rate_slowdown_factor, run_number, trigger_rate_hz, data_file, output_path, disable_data_storage, token_count, host_ip_df, host_ip_trigemu, json_file_base)
generate_df(network_endpoints, NUMBER_OF_DATA_PRODUCERS=2, DATA_RATE_SLOWDOWN_FACTOR=1, RUN_NUMBER=333, TRIGGER_RATE_HZ=1.0, DATA_FILE="./frames.bin", OUTPUT_PATH=".", DISABLE_OUTPUT=False, TOKEN_COUNT=10)
generate_trigemu(network_endpoints, NUMBER_OF_DATA_PRODUCERS=2, DATA_RATE_SLOWDOWN_FACTOR=1, RUN_NUMBER=333, TRIGGER_RATE_HZ=1.0, DATA_FILE="./frames.bin", OUTPUT_PATH=".")