DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
testapp_noreadout_two_process Namespace Reference

Functions

 generate_df (network_endpoints, NUMBER_OF_DATA_PRODUCERS=2, DATA_RATE_SLOWDOWN_FACTOR=1, RUN_NUMBER=333, TRIGGER_RATE_HZ=1.0, DATA_FILE="./frames.bin", OUTPUT_PATH=".", DISABLE_OUTPUT=False, TOKEN_COUNT=10)
 
 generate_trigemu (network_endpoints, NUMBER_OF_DATA_PRODUCERS=2, DATA_RATE_SLOWDOWN_FACTOR=1, RUN_NUMBER=333, TRIGGER_RATE_HZ=1.0, DATA_FILE="./frames.bin", OUTPUT_PATH=".")
 
 cli (number_of_data_producers, data_rate_slowdown_factor, run_number, trigger_rate_hz, data_file, output_path, disable_data_storage, token_count, host_ip_df, host_ip_trigemu, json_file_base)
 

Variables

 default_load_path
 
int QUEUE_POP_WAIT_MS = 100;
 
int CLOCK_SPEED_HZ = 62500000;
 
 CONTEXT_SETTINGS = dict(help_option_names=['-h', '--help'])
 

Function Documentation

◆ cli()

testapp_noreadout_two_process.cli ( number_of_data_producers,
data_rate_slowdown_factor,
run_number,
trigger_rate_hz,
data_file,
output_path,
disable_data_storage,
token_count,
host_ip_df,
host_ip_trigemu,
json_file_base )
  JSON_FILE: Input raw data file.
  JSON_FILE: Output json configuration file.

Definition at line 397 of file testapp_noreadout_two_process.py.

397 def cli(number_of_data_producers, data_rate_slowdown_factor, run_number, trigger_rate_hz, data_file, output_path, disable_data_storage, token_count, host_ip_df, host_ip_trigemu, json_file_base):
398 """
399 JSON_FILE: Input raw data file.
400 JSON_FILE: Output json configuration file.
401 """
402
403 json_file_trigemu=json_file_base+"-trigemu.json"
404 json_file_df=json_file_base+"-df.json"
405
406 network_endpoints={
407 "trigdec" : f"tcp://{host_ip_trigemu}:12345",
408 "triginh" : f"tcp://{host_ip_df}:12346",
409 "timesync": f"tcp://{host_ip_df}:12347"
410 }
411
412 with open(json_file_trigemu, 'w') as f:
413 f.write(generate_trigemu(
414 network_endpoints,
415 NUMBER_OF_DATA_PRODUCERS = number_of_data_producers,
416 DATA_RATE_SLOWDOWN_FACTOR = data_rate_slowdown_factor,
417 RUN_NUMBER = run_number,
418 TRIGGER_RATE_HZ = trigger_rate_hz,
419 DATA_FILE = data_file,
420 OUTPUT_PATH = output_path
421 ))
422
423 with open(json_file_df, 'w') as f:
424 f.write(generate_df(
425 network_endpoints,
426 NUMBER_OF_DATA_PRODUCERS = number_of_data_producers,
427 DATA_RATE_SLOWDOWN_FACTOR = data_rate_slowdown_factor,
428 RUN_NUMBER = run_number,
429 TRIGGER_RATE_HZ = trigger_rate_hz,
430 DATA_FILE = data_file,
431 OUTPUT_PATH = output_path,
432 DISABLE_OUTPUT = disable_data_storage,
433 TOKEN_COUNT = token_count
434 ))
435

◆ generate_df()

testapp_noreadout_two_process.generate_df ( network_endpoints,
NUMBER_OF_DATA_PRODUCERS = 2,
DATA_RATE_SLOWDOWN_FACTOR = 1,
RUN_NUMBER = 333,
TRIGGER_RATE_HZ = 1.0,
DATA_FILE = "./frames.bin",
OUTPUT_PATH = ".",
DISABLE_OUTPUT = False,
TOKEN_COUNT = 10 )
Generate the json configuration for the readout and DF process

Definition at line 66 of file testapp_noreadout_two_process.py.

76 ):
77 """Generate the json configuration for the readout and DF process"""
78
79 trigger_interval_ticks = math.floor((1/TRIGGER_RATE_HZ) * CLOCK_SPEED_HZ/DATA_RATE_SLOWDOWN_FACTOR)
80
81 # Define modules and queues
82 queue_bare_specs = [
83 app.QueueSpec(inst="time_sync_to_netq", kind='FollyMPMCQueue', capacity=100),
84 app.QueueSpec(inst="token_to_netq", kind='FollySPSCQueue', capacity=20),
85 app.QueueSpec(inst="trigger_decision_from_netq", kind='FollySPSCQueue', capacity=20),
86 app.QueueSpec(inst="trigger_decision_copy_for_bookkeeping", kind='FollySPSCQueue', capacity=20),
87 app.QueueSpec(inst="trigger_record_q", kind='FollySPSCQueue', capacity=20),
88 app.QueueSpec(inst="data_fragments_q", kind='FollyMPMCQueue', capacity=100),
89 ] + [
90 app.QueueSpec(inst=f"data_requests_{idx}", kind='FollySPSCQueue', capacity=20)
91 for idx in range(NUMBER_OF_DATA_PRODUCERS)
92 ]
93
94
95 # Only needed to reproduce the same order as when using jsonnet
96 queue_specs = app.QueueSpecs(sorted(queue_bare_specs, key=lambda x: x.inst))
97
98
99 mod_specs = [
100 mspec("ntoq_trigdec", "NetworkToQueue", [
101 app.QueueInfo(name="output", inst="trigger_decision_from_netq", dir="output")
102 ]),
103
104 mspec("qton_token", "QueueToNetwork", [
105 app.QueueInfo(name="input", inst="token_to_netq", dir="input")
106 ]),
107
108 mspec("qton_timesync", "QueueToNetwork", [
109 app.QueueInfo(name="input", inst="time_sync_to_netq", dir="input")
110 ]),
111
112 mspec("rqg", "RequestGenerator", [
113 app.QueueInfo(name="trigger_decision_input_queue", inst="trigger_decision_from_netq", dir="input"),
114 app.QueueInfo(name="trigger_decision_for_event_building", inst="trigger_decision_copy_for_bookkeeping", dir="output"),
115 ] + [
116 app.QueueInfo(name=f"data_request_{idx}_output_queue", inst=f"data_requests_{idx}", dir="output")
117 for idx in range(NUMBER_OF_DATA_PRODUCERS)
118 ]),
119
120 mspec("ffr", "FragmentReceiver", [
121 app.QueueInfo(name="trigger_decision_input_queue", inst="trigger_decision_copy_for_bookkeeping", dir="input"),
122 app.QueueInfo(name="trigger_record_output_queue", inst="trigger_record_q", dir="output"),
123 app.QueueInfo(name="data_fragment_input_queue", inst="data_fragments_q", dir="input"),
124 ]),
125
126 mspec("datawriter", "DataWriterModule", [
127 app.QueueInfo(name="trigger_record_input_queue", inst="trigger_record_q", dir="input"),
128 app.QueueInfo(name="token_output_queue", inst="token_to_netq", dir="output"),
129 ]),
130
131 mspec("fake_timesync_source", "FakeTimeSyncSource", [
132 app.QueueInfo(name="time_sync_sink", inst="time_sync_to_netq", dir="output"),
133 ]),
134
135 ] + [
136
137 mspec(f"fakedataprod_{idx}", "FakeDataProdModule", [
138 app.QueueInfo(name="data_request_input_queue", inst=f"data_requests_{idx}", dir="input"),
139 app.QueueInfo(name="data_fragment_output_queue", inst="data_fragments_q", dir="output"),
140 ]) for idx in range(NUMBER_OF_DATA_PRODUCERS)
141 ]
142
143 init_specs = app.Init(queues=queue_specs, modules=mod_specs)
144
145 initcmd = rccmd.RCCommand(
146 id=basecmd.CmdId("init"),
147 entry_state="NONE",
148 exit_state="INITIAL",
149 data=init_specs
150 )
151
152 confcmd = mrccmd("conf", "INITIAL", "CONFIGURED",[
153 ("ntoq_trigdec", ntoq.Conf(msg_type="dunedaq::dfmessages::TriggerDecision",
154 msg_module_name="TriggerDecisionNQ",
155 receiver_config=nor.Conf(ipm_plugin_type="ZmqReceiver",
156 address=network_endpoints["trigdec"])
157 )
158 ),
159
160 ("qton_token", qton.Conf(msg_type="dunedaq::dfmessages::TriggerDecisionToken",
161 msg_module_name="TriggerDecisionTokenNQ",
162 sender_config=nos.Conf(ipm_plugin_type="ZmqSender",
163 address=network_endpoints["triginh"],
164 stype="msgpack")
165 )
166 ),
167
168 ("qton_timesync", qton.Conf(msg_type="dunedaq::dfmessages::TimeSync",
169 msg_module_name="TimeSyncNQ",
170 sender_config=nos.Conf(ipm_plugin_type="ZmqSender",
171 address=network_endpoints["timesync"],
172 stype="msgpack")
173 )
174 ),
175
176 ("rqg", rqg.ConfParams(
177 map=rqg.mapgeoidqueue([
178 rqg.geoidinst(apa=0, link=idx, queueinstance=f"data_requests_{idx}") for idx in range(NUMBER_OF_DATA_PRODUCERS)
179 ])
180 )),
181 ("ffr", ffr.ConfParams(
182 general_queue_timeout=QUEUE_POP_WAIT_MS
183 )),
184 ("datawriter", dw.ConfParams(
185 initial_token_count=TOKEN_COUNT,
186 data_store_parameters=hdf5ds.ConfParams(
187 name="data_store",
188 # type = "HDF5DataStore", # default
189 directory_path = OUTPUT_PATH, # default
190 # mode = "all-per-file", # default
191 max_file_size_bytes = 1073741834,
192 disable_unique_filename_suffix = False,
193 filename_parameters = hdf5ds.HDF5DataStoreFileNameParams(
194 overall_prefix = "fake_minidaqapp",
195 # digits_for_run_number = 6, #default
196 file_index_prefix = "file"
197 ),
198 file_layout_parameters = hdf5ds.HDF5DataStoreFileLayoutParams(
199 trigger_record_name_prefix= "TriggerRecord",
200 digits_for_trigger_number = 5,
201 )
202 )
203 )),
204 ("fake_timesync_source", ftss.ConfParams(
205 sync_interval_ticks = (CLOCK_SPEED_HZ/DATA_RATE_SLOWDOWN_FACTOR),
206 clock_frequency_hz = (CLOCK_SPEED_HZ/DATA_RATE_SLOWDOWN_FACTOR),
207 )),
208 ] + [
209 (f"fakedataprod_{idx}", fdp.ConfParams(
210 temporarily_hacked_link_number = idx
211 )) for idx in range(NUMBER_OF_DATA_PRODUCERS)
212 ])
213
214 startpars = rccmd.StartParams(run=RUN_NUMBER, disable_data_storage=DISABLE_OUTPUT)
215 startcmd = mrccmd("start", "CONFIGURED", "RUNNING", [
216 ("ntoq_trigdec", startpars),
217 ("qton_token", startpars),
218 ("qton_timesync", startpars),
219 ("datawriter", startpars),
220 ("ffr", startpars),
221 ("fakedataprod_.*", startpars),
222 ("rqg", startpars),
223 ("fake_timesync_source", startpars),
224 ])
225
226 stopcmd = mrccmd("stop", "RUNNING", "CONFIGURED", [
227 ("ntoq_trigdec", None),
228 ("qton_timesync", None),
229 ("qton_token", None),
230 ("fake_timesync_source", None),
231 ("rqg", None),
232 ("fakedataprod_.*", None),
233 ("ffr", None),
234 ("datawriter", None),
235 ])
236
237 scrapcmd = mcmd("scrap", [
238 ("", None)
239 ])
240
241 # Create a list of commands
242 cmd_seq = [initcmd, confcmd, startcmd, stopcmd, scrapcmd]
243
244 # Print them as json (to be improved/moved out)
245 jstr = json.dumps([c.pod() for c in cmd_seq], indent=4, sort_keys=True)
246 return jstr
247
248#===============================================================================

◆ generate_trigemu()

testapp_noreadout_two_process.generate_trigemu ( network_endpoints,
NUMBER_OF_DATA_PRODUCERS = 2,
DATA_RATE_SLOWDOWN_FACTOR = 1,
RUN_NUMBER = 333,
TRIGGER_RATE_HZ = 1.0,
DATA_FILE = "./frames.bin",
OUTPUT_PATH = "." )
Generate the json config for the TriggerDecisionEmulator process

Definition at line 249 of file testapp_noreadout_two_process.py.

257 ):
258 """Generate the json config for the TriggerDecisionEmulator process"""
259
260 trigger_interval_ticks = math.floor((1/TRIGGER_RATE_HZ) * CLOCK_SPEED_HZ/DATA_RATE_SLOWDOWN_FACTOR)
261
262 # Define modules and queues
263 queue_bare_specs = [
264 app.QueueSpec(inst="time_sync_from_netq", kind='FollySPSCQueue', capacity=100),
265 app.QueueSpec(inst="token_from_netq", kind='FollySPSCQueue', capacity=20),
266 app.QueueSpec(inst="trigger_decision_to_netq", kind='FollySPSCQueue', capacity=20),
267 ]
268
269 # Only needed to reproduce the same order as when using jsonnet
270 queue_specs = app.QueueSpecs(sorted(queue_bare_specs, key=lambda x: x.inst))
271
272
273 mod_specs = [
274 mspec("qton_trigdec", "QueueToNetwork", [
275 app.QueueInfo(name="input", inst="trigger_decision_to_netq", dir="input")
276 ]),
277
278 mspec("ntoq_token", "NetworkToQueue", [
279 app.QueueInfo(name="output", inst="token_from_netq", dir="output")
280 ]),
281
282 mspec("ntoq_timesync", "NetworkToQueue", [
283 app.QueueInfo(name="output", inst="time_sync_from_netq", dir="output")
284 ]),
285
286 mspec("tde", "TriggerDecisionEmulator", [
287 app.QueueInfo(name="time_sync_source", inst="time_sync_from_netq", dir="input"),
288 app.QueueInfo(name="token_source", inst="token_from_netq", dir="input"),
289 app.QueueInfo(name="trigger_decision_sink", inst="trigger_decision_to_netq", dir="output"),
290 ]),
291 ]
292
293 init_specs = app.Init(queues=queue_specs, modules=mod_specs)
294
295 initcmd = rccmd.RCCommand(
296 id=basecmd.CmdId("init"),
297 entry_state="NONE",
298 exit_state="INITIAL",
299 data=init_specs
300 )
301
302 confcmd = mrccmd("conf", "INITIAL", "CONFIGURED",[
303 ("qton_trigdec", qton.Conf(msg_type="dunedaq::dfmessages::TriggerDecision",
304 msg_module_name="TriggerDecisionNQ",
305 sender_config=nos.Conf(ipm_plugin_type="ZmqSender",
306 address=network_endpoints["trigdec"],
307 stype="msgpack")
308 )
309 ),
310
311 ("ntoq_token", ntoq.Conf(msg_type="dunedaq::dfmessages::TriggerDecisionToken",
312 msg_module_name="TriggerDecisionTokenNQ",
313 receiver_config=nor.Conf(ipm_plugin_type="ZmqReceiver",
314 address=network_endpoints["triginh"])
315 )
316 ),
317
318 ("ntoq_timesync", ntoq.Conf(msg_type="dunedaq::dfmessages::TimeSync",
319 msg_module_name="TimeSyncNQ",
320 receiver_config=nor.Conf(ipm_plugin_type="ZmqReceiver",
321 address=network_endpoints["timesync"])
322 )
323 ),
324
325 ("tde", tde.ConfParams(
326 links=[idx for idx in range(NUMBER_OF_DATA_PRODUCERS)],
327 min_links_in_request=NUMBER_OF_DATA_PRODUCERS,
328 max_links_in_request=NUMBER_OF_DATA_PRODUCERS,
329 min_readout_window_ticks=1200,
330 max_readout_window_ticks=1200,
331 trigger_window_offset=1000,
332 # The delay is set to put the trigger well within the latency buff
333 trigger_delay_ticks=math.floor( 2* CLOCK_SPEED_HZ/DATA_RATE_SLOWDOWN_FACTOR),
334 # We divide the trigger interval by
335 # DATA_RATE_SLOWDOWN_FACTOR so the triggers are still
336 # emitted per (wall-clock) second, rather than being
337 # spaced out further
338 trigger_interval_ticks=trigger_interval_ticks,
339 clock_frequency_hz=CLOCK_SPEED_HZ/DATA_RATE_SLOWDOWN_FACTOR
340 )),
341 ])
342
343 startpars = rccmd.StartParams(run=RUN_NUMBER, disable_data_storage=False)
344 startcmd = mrccmd("start", "CONFIGURED", "RUNNING", [
345 ("qton_trigdec", startpars),
346 ("ntoq_token", startpars),
347 ("ntoq_timesync", startpars),
348 ("tde", startpars),
349 ])
350
351 stopcmd = mrccmd("stop", "RUNNING", "CONFIGURED", [
352 ("qton_trigdec", None),
353 ("ntoq_timesync", None),
354 ("ntoq_token", None),
355 ("tde", None),
356 ])
357
358 pausecmd = mrccmd("pause", "RUNNING", "RUNNING", [
359 ("", None)
360 ])
361
362 resumecmd = mrccmd("resume", "RUNNING", "RUNNING", [
363 ("tde", tde.ResumeParams(
364 trigger_interval_ticks=trigger_interval_ticks
365 ))
366 ])
367
368 scrapcmd = mcmd("scrap", [
369 ("", None)
370 ])
371
372 # Create a list of commands
373 cmd_seq = [initcmd, confcmd, startcmd, stopcmd, pausecmd, resumecmd, scrapcmd]
374
375 # Print them as json (to be improved/moved out)
376 jstr = json.dumps([c.pod() for c in cmd_seq], indent=4, sort_keys=True)
377 return jstr
378

Variable Documentation

◆ CLOCK_SPEED_HZ

int testapp_noreadout_two_process.CLOCK_SPEED_HZ = 62500000;

Definition at line 63 of file testapp_noreadout_two_process.py.

◆ CONTEXT_SETTINGS

testapp_noreadout_two_process.CONTEXT_SETTINGS = dict(help_option_names=['-h', '--help'])

Definition at line 381 of file testapp_noreadout_two_process.py.

◆ default_load_path

testapp_noreadout_two_process.default_load_path

Definition at line 18 of file testapp_noreadout_two_process.py.

◆ QUEUE_POP_WAIT_MS

int testapp_noreadout_two_process.QUEUE_POP_WAIT_MS = 100;

Definition at line 61 of file testapp_noreadout_two_process.py.