DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
fake_hsi_app_confgen.py
Go to the documentation of this file.
1# Set moo schema search path
2from dunedaq.env import get_moo_model_path
3import moo.io
4
5moo.io.default_load_path = get_moo_model_path()
6
7# Load configuration types
8import moo.otypes
9moo.otypes.load_types('appfwk/cmd.jsonnet')
10moo.otypes.load_types('appfwk/app.jsonnet')
11moo.otypes.load_types('cmdlib/cmd.jsonnet')
12moo.otypes.load_types('rcif/cmd.jsonnet')
13moo.otypes.load_types('timinglibs/fakehsieventgenerator.jsonnet')
14moo.otypes.load_types('nwqueueadapters/queuetonetwork.jsonnet')
15moo.otypes.load_types('nwqueueadapters/networktoqueue.jsonnet')
16moo.otypes.load_types('nwqueueadapters/networkobjectreceiver.jsonnet')
17moo.otypes.load_types('nwqueueadapters/networkobjectsender.jsonnet')
18moo.otypes.load_types('networkmanager/nwmgr.jsonnet')
19
20# Import new types
21import dunedaq.appfwk.cmd as cmd # AddressedCmd,
22import dunedaq.appfwk.app as app # Queue spec
23import dunedaq.cmdlib.cmd as cmdlib # Command
24import dunedaq.rcif.cmd as rcif # rcif
25import dunedaq.networkmanager.nwmgr as nwmgr
26
27import dunedaq.timinglibs.fakehsieventgenerator as fhsig
28import dunedaq.nwqueueadapters.networktoqueue as ntoq
29import dunedaq.nwqueueadapters.queuetonetwork as qton
30import dunedaq.nwqueueadapters.networkobjectreceiver as nor
31import dunedaq.nwqueueadapters.networkobjectsender as nos
32
33from appfwk.utils import mcmd, mspec, mrccmd
34
35import json
36import math
37
39 PARTITION = "hsi_readout_test",
40 OUTPUT_PATH=".",
41 TRIGGER_RATE_HZ: int=1,
42 CLOCK_SPEED_HZ: int = 62500000,
43 HSI_TIMESTAMP_OFFSET: int = 0, # Offset for HSIEvent timestamps in units of clock ticks. Positive offset increases timestamp estimate.
44 HSI_DEVICE_ID: int = 0,
45 MEAN_SIGNAL_MULTIPLICITY: int = 0,
46 SIGNAL_EMULATION_MODE: int = 0,
47 ENABLED_SIGNALS: int = 0b00000001,
48 ):
49
50 # network connection
51 nw_specs = [nwmgr.Connection(name=PARTITION + ".hsievent",topics=[], address="tcp://127.0.0.1:12344")]
52
53 # Define modules and queues
54 queue_bare_specs = [
55 app.QueueSpec(inst="time_sync_from_netq", kind='FollySPSCQueue', capacity=100),
56 ]
57
58 # Only needed to reproduce the same order as when using jsonnet
59 queue_specs = app.QueueSpecs(sorted(queue_bare_specs, key=lambda x: x.inst))
60
61 mod_specs = [
62 mspec("ntoq_timesync", "NetworkToQueue", [
63 app.QueueInfo(name="output", inst="time_sync_from_netq", dir="output")
64 ]),
65
66 mspec("fhsig", "FakeHSIEventGeneratorModule", [
67 app.QueueInfo(name="time_sync_source", inst="time_sync_from_netq", dir="input"),
68 ]),
69 ]
70
71 init_specs = app.Init(queues=queue_specs, modules=mod_specs)
72
73
74 jstr = json.dumps(init_specs.pod(), indent=4, sort_keys=True)
75 print(jstr)
76
77 initcmd = rcif.RCCommand(
78 id=cmdlib.CmdId("init"),
79 entry_state="NONE",
80 exit_state="INITIAL",
81 data=init_specs
82 )
83
84 trigger_interval_ticks = 0
85 if TRIGGER_RATE_HZ > 0:
86 trigger_interval_ticks = math.floor((1 / TRIGGER_RATE_HZ) * CLOCK_SPEED_HZ)
87
88 mods = [
89 ("fhsig", fhsig.Conf(
90 clock_frequency=CLOCK_SPEED_HZ,
91 trigger_interval_ticks=trigger_interval_ticks,
92 timestamp_offset=HSI_TIMESTAMP_OFFSET,
93 mean_signal_multiplicity=MEAN_SIGNAL_MULTIPLICITY,
94 signal_emulation_mode=SIGNAL_EMULATION_MODE,
95 enabled_signals=ENABLED_SIGNALS,
96 timesync_topic="Timesync",
97 hsievent_connection_name = PARTITION+".hsievent",
98 )),
99 ]
100
101 confcmd = mrccmd("conf", "INITIAL", "CONFIGURED", mods)
102
103 jstr = json.dumps(confcmd.pod(), indent=4, sort_keys=True)
104 print(jstr)
105
106 startpars = rcif.StartParams(run=33, disable_data_storage=False)
107
108 startcmd = mrccmd("start", "CONFIGURED", "RUNNING", [
109 ("ntoq_timesync", startpars),
110 ("fhsig", startpars)
111 ])
112
113 jstr = json.dumps(startcmd.pod(), indent=4, sort_keys=True)
114 print("="*80+"\nStart\n\n", jstr)
115
116 stopcmd = mrccmd("stop", "RUNNING", "CONFIGURED", [
117 ("ntoq_timesync", None),
118 ("fhsig", None)
119 ])
120
121 jstr = json.dumps(stopcmd.pod(), indent=4, sort_keys=True)
122 print("="*80+"\nStop\n\n", jstr)
123
124
125 scrapcmd = mcmd("scrap", [
126 ("", None)
127 ])
128
129 jstr = json.dumps(scrapcmd.pod(), indent=4, sort_keys=True)
130 print("="*80+"\nScrap\n\n", jstr)
131
132
133 # Create a list of commands
134 cmd_seq = [initcmd, confcmd, startcmd, stopcmd]
135
136 # Print them as json (to be improved/moved out)
137 jstr = json.dumps([c.pod() for c in cmd_seq], indent=4, sort_keys=True)
138 return jstr
139
140if __name__ == '__main__':
141 # Add -h as default help option
142 CONTEXT_SETTINGS = dict(help_option_names=['-h', '--help'])
143
144 import click
145
146 @click.command(context_settings=CONTEXT_SETTINGS)
147 @click.option('-p', '--partition', default="fake_hsi_readout_test")
148 @click.option('-t', '--trigger-rate-hz', default=1.0, help='Fake HSI only: rate at which fake HSIEvents are sent. 0 - disable HSIEvent generation.')
149 @click.option('-o', '--output-path', type=click.Path(), default='.')
150 @click.argument('json_file', type=click.Path(), default='fake_hsi_app.json')
151 def cli(partition, trigger_rate_hz, output_path, json_file):
152 """
153 JSON_FILE: Input raw data file.
154 JSON_FILE: Output json configuration file.
155 """
156
157 with open(json_file, 'w') as f:
158 f.write(generate(
159 PARTITION=partition,
160 TRIGGER_RATE_HZ=trigger_rate_hz,
161 OUTPUT_PATH = output_path,
162 ))
163
164 print(f"'{json_file}' generation completed.")
165
166 cli()
167
cli(partition, trigger_rate_hz, output_path, json_file)