68 NUMBER_OF_DATA_PRODUCERS=2,
69 DATA_RATE_SLOWDOWN_FACTOR = 1,
71 TRIGGER_RATE_HZ = 1.0,
72 DATA_FILE="./frames.bin",
77 """Generate the json configuration for the readout and DF process"""
79 trigger_interval_ticks = math.floor((1/TRIGGER_RATE_HZ) * CLOCK_SPEED_HZ/DATA_RATE_SLOWDOWN_FACTOR)
83 app.QueueSpec(inst=
"time_sync_to_netq", kind=
'FollyMPMCQueue', capacity=100),
84 app.QueueSpec(inst=
"token_to_netq", kind=
'FollySPSCQueue', capacity=20),
85 app.QueueSpec(inst=
"trigger_decision_from_netq", kind=
'FollySPSCQueue', capacity=20),
86 app.QueueSpec(inst=
"trigger_decision_copy_for_bookkeeping", kind=
'FollySPSCQueue', capacity=20),
87 app.QueueSpec(inst=
"trigger_record_q", kind=
'FollySPSCQueue', capacity=20),
88 app.QueueSpec(inst=
"data_fragments_q", kind=
'FollyMPMCQueue', capacity=100),
90 app.QueueSpec(inst=f
"data_requests_{idx}", kind=
'FollySPSCQueue', capacity=20)
91 for idx
in range(NUMBER_OF_DATA_PRODUCERS)
96 queue_specs = app.QueueSpecs(sorted(queue_bare_specs, key=
lambda x: x.inst))
100 mspec(
"ntoq_trigdec",
"NetworkToQueue", [
101 app.QueueInfo(name=
"output", inst=
"trigger_decision_from_netq", dir=
"output")
104 mspec(
"qton_token",
"QueueToNetwork", [
105 app.QueueInfo(name=
"input", inst=
"token_to_netq", dir=
"input")
108 mspec(
"qton_timesync",
"QueueToNetwork", [
109 app.QueueInfo(name=
"input", inst=
"time_sync_to_netq", dir=
"input")
112 mspec(
"rqg",
"RequestGenerator", [
113 app.QueueInfo(name=
"trigger_decision_input_queue", inst=
"trigger_decision_from_netq", dir=
"input"),
114 app.QueueInfo(name=
"trigger_decision_for_event_building", inst=
"trigger_decision_copy_for_bookkeeping", dir=
"output"),
116 app.QueueInfo(name=f
"data_request_{idx}_output_queue", inst=f
"data_requests_{idx}", dir=
"output")
117 for idx
in range(NUMBER_OF_DATA_PRODUCERS)
120 mspec(
"ffr",
"FragmentReceiver", [
121 app.QueueInfo(name=
"trigger_decision_input_queue", inst=
"trigger_decision_copy_for_bookkeeping", dir=
"input"),
122 app.QueueInfo(name=
"trigger_record_output_queue", inst=
"trigger_record_q", dir=
"output"),
123 app.QueueInfo(name=
"data_fragment_input_queue", inst=
"data_fragments_q", dir=
"input"),
126 mspec(
"datawriter",
"DataWriterModule", [
127 app.QueueInfo(name=
"trigger_record_input_queue", inst=
"trigger_record_q", dir=
"input"),
128 app.QueueInfo(name=
"token_output_queue", inst=
"token_to_netq", dir=
"output"),
131 mspec(
"fake_timesync_source",
"FakeTimeSyncSource", [
132 app.QueueInfo(name=
"time_sync_sink", inst=
"time_sync_to_netq", dir=
"output"),
137 mspec(f
"fakedataprod_{idx}",
"FakeDataProdModule", [
138 app.QueueInfo(name=
"data_request_input_queue", inst=f
"data_requests_{idx}", dir=
"input"),
139 app.QueueInfo(name=
"data_fragment_output_queue", inst=
"data_fragments_q", dir=
"output"),
140 ])
for idx
in range(NUMBER_OF_DATA_PRODUCERS)
143 init_specs = app.Init(queues=queue_specs, modules=mod_specs)
145 initcmd = rccmd.RCCommand(
146 id=basecmd.CmdId(
"init"),
148 exit_state=
"INITIAL",
152 confcmd = mrccmd(
"conf",
"INITIAL",
"CONFIGURED",[
153 (
"ntoq_trigdec", ntoq.Conf(msg_type=
"dunedaq::dfmessages::TriggerDecision",
154 msg_module_name=
"TriggerDecisionNQ",
155 receiver_config=nor.Conf(ipm_plugin_type=
"ZmqReceiver",
156 address=network_endpoints[
"trigdec"])
160 (
"qton_token", qton.Conf(msg_type=
"dunedaq::dfmessages::TriggerDecisionToken",
161 msg_module_name=
"TriggerDecisionTokenNQ",
162 sender_config=nos.Conf(ipm_plugin_type=
"ZmqSender",
163 address=network_endpoints[
"triginh"],
168 (
"qton_timesync", qton.Conf(msg_type=
"dunedaq::dfmessages::TimeSync",
169 msg_module_name=
"TimeSyncNQ",
170 sender_config=nos.Conf(ipm_plugin_type=
"ZmqSender",
171 address=network_endpoints[
"timesync"],
176 (
"rqg", rqg.ConfParams(
177 map=rqg.mapgeoidqueue([
178 rqg.geoidinst(apa=0, link=idx, queueinstance=f
"data_requests_{idx}")
for idx
in range(NUMBER_OF_DATA_PRODUCERS)
181 (
"ffr", ffr.ConfParams(
182 general_queue_timeout=QUEUE_POP_WAIT_MS
184 (
"datawriter", dw.ConfParams(
185 initial_token_count=TOKEN_COUNT,
186 data_store_parameters=hdf5ds.ConfParams(
189 directory_path = OUTPUT_PATH,
191 max_file_size_bytes = 1073741834,
192 disable_unique_filename_suffix =
False,
193 filename_parameters = hdf5ds.HDF5DataStoreFileNameParams(
194 overall_prefix =
"fake_minidaqapp",
196 file_index_prefix =
"file"
198 file_layout_parameters = hdf5ds.HDF5DataStoreFileLayoutParams(
199 trigger_record_name_prefix=
"TriggerRecord",
200 digits_for_trigger_number = 5,
204 (
"fake_timesync_source", ftss.ConfParams(
205 sync_interval_ticks = (CLOCK_SPEED_HZ/DATA_RATE_SLOWDOWN_FACTOR),
206 clock_frequency_hz = (CLOCK_SPEED_HZ/DATA_RATE_SLOWDOWN_FACTOR),
209 (f
"fakedataprod_{idx}", fdp.ConfParams(
210 temporarily_hacked_link_number = idx
211 ))
for idx
in range(NUMBER_OF_DATA_PRODUCERS)
214 startpars = rccmd.StartParams(run=RUN_NUMBER, disable_data_storage=DISABLE_OUTPUT)
215 startcmd = mrccmd(
"start",
"CONFIGURED",
"RUNNING", [
216 (
"ntoq_trigdec", startpars),
217 (
"qton_token", startpars),
218 (
"qton_timesync", startpars),
219 (
"datawriter", startpars),
221 (
"fakedataprod_.*", startpars),
223 (
"fake_timesync_source", startpars),
226 stopcmd = mrccmd(
"stop",
"RUNNING",
"CONFIGURED", [
227 (
"ntoq_trigdec",
None),
228 (
"qton_timesync",
None),
229 (
"qton_token",
None),
230 (
"fake_timesync_source",
None),
232 (
"fakedataprod_.*",
None),
234 (
"datawriter",
None),
237 scrapcmd = mcmd(
"scrap", [
242 cmd_seq = [initcmd, confcmd, startcmd, stopcmd, scrapcmd]
245 jstr = json.dumps([c.pod()
for c
in cmd_seq], indent=4, sort_keys=
True)
251 NUMBER_OF_DATA_PRODUCERS=2,
252 DATA_RATE_SLOWDOWN_FACTOR = 1,
254 TRIGGER_RATE_HZ = 1.0,
255 DATA_FILE="./frames.bin",
258 """Generate the json config for the TriggerDecisionEmulator process"""
260 trigger_interval_ticks = math.floor((1/TRIGGER_RATE_HZ) * CLOCK_SPEED_HZ/DATA_RATE_SLOWDOWN_FACTOR)
264 app.QueueSpec(inst=
"time_sync_from_netq", kind=
'FollySPSCQueue', capacity=100),
265 app.QueueSpec(inst=
"token_from_netq", kind=
'FollySPSCQueue', capacity=20),
266 app.QueueSpec(inst=
"trigger_decision_to_netq", kind=
'FollySPSCQueue', capacity=20),
270 queue_specs = app.QueueSpecs(sorted(queue_bare_specs, key=
lambda x: x.inst))
274 mspec(
"qton_trigdec",
"QueueToNetwork", [
275 app.QueueInfo(name=
"input", inst=
"trigger_decision_to_netq", dir=
"input")
278 mspec(
"ntoq_token",
"NetworkToQueue", [
279 app.QueueInfo(name=
"output", inst=
"token_from_netq", dir=
"output")
282 mspec(
"ntoq_timesync",
"NetworkToQueue", [
283 app.QueueInfo(name=
"output", inst=
"time_sync_from_netq", dir=
"output")
286 mspec(
"tde",
"TriggerDecisionEmulator", [
287 app.QueueInfo(name=
"time_sync_source", inst=
"time_sync_from_netq", dir=
"input"),
288 app.QueueInfo(name=
"token_source", inst=
"token_from_netq", dir=
"input"),
289 app.QueueInfo(name=
"trigger_decision_sink", inst=
"trigger_decision_to_netq", dir=
"output"),
293 init_specs = app.Init(queues=queue_specs, modules=mod_specs)
295 initcmd = rccmd.RCCommand(
296 id=basecmd.CmdId(
"init"),
298 exit_state=
"INITIAL",
302 confcmd = mrccmd(
"conf",
"INITIAL",
"CONFIGURED",[
303 (
"qton_trigdec", qton.Conf(msg_type=
"dunedaq::dfmessages::TriggerDecision",
304 msg_module_name=
"TriggerDecisionNQ",
305 sender_config=nos.Conf(ipm_plugin_type=
"ZmqSender",
306 address=network_endpoints[
"trigdec"],
311 (
"ntoq_token", ntoq.Conf(msg_type=
"dunedaq::dfmessages::TriggerDecisionToken",
312 msg_module_name=
"TriggerDecisionTokenNQ",
313 receiver_config=nor.Conf(ipm_plugin_type=
"ZmqReceiver",
314 address=network_endpoints[
"triginh"])
318 (
"ntoq_timesync", ntoq.Conf(msg_type=
"dunedaq::dfmessages::TimeSync",
319 msg_module_name=
"TimeSyncNQ",
320 receiver_config=nor.Conf(ipm_plugin_type=
"ZmqReceiver",
321 address=network_endpoints[
"timesync"])
325 (
"tde", tde.ConfParams(
326 links=[idx
for idx
in range(NUMBER_OF_DATA_PRODUCERS)],
327 min_links_in_request=NUMBER_OF_DATA_PRODUCERS,
328 max_links_in_request=NUMBER_OF_DATA_PRODUCERS,
329 min_readout_window_ticks=1200,
330 max_readout_window_ticks=1200,
331 trigger_window_offset=1000,
333 trigger_delay_ticks=math.floor( 2* CLOCK_SPEED_HZ/DATA_RATE_SLOWDOWN_FACTOR),
338 trigger_interval_ticks=trigger_interval_ticks,
339 clock_frequency_hz=CLOCK_SPEED_HZ/DATA_RATE_SLOWDOWN_FACTOR
343 startpars = rccmd.StartParams(run=RUN_NUMBER, disable_data_storage=
False)
344 startcmd = mrccmd(
"start",
"CONFIGURED",
"RUNNING", [
345 (
"qton_trigdec", startpars),
346 (
"ntoq_token", startpars),
347 (
"ntoq_timesync", startpars),
351 stopcmd = mrccmd(
"stop",
"RUNNING",
"CONFIGURED", [
352 (
"qton_trigdec",
None),
353 (
"ntoq_timesync",
None),
354 (
"ntoq_token",
None),
358 pausecmd = mrccmd(
"pause",
"RUNNING",
"RUNNING", [
362 resumecmd = mrccmd(
"resume",
"RUNNING",
"RUNNING", [
363 (
"tde", tde.ResumeParams(
364 trigger_interval_ticks=trigger_interval_ticks
368 scrapcmd = mcmd(
"scrap", [
373 cmd_seq = [initcmd, confcmd, startcmd, stopcmd, pausecmd, resumecmd, scrapcmd]
376 jstr = json.dumps([c.pod()
for c
in cmd_seq], indent=4, sort_keys=
True)
397 def cli(number_of_data_producers, data_rate_slowdown_factor, run_number, trigger_rate_hz, data_file, output_path, disable_data_storage, token_count, host_ip_df, host_ip_trigemu, json_file_base):
399 JSON_FILE: Input raw data file.
400 JSON_FILE: Output json configuration file.
403 json_file_trigemu=json_file_base+
"-trigemu.json"
404 json_file_df=json_file_base+
"-df.json"
407 "trigdec" : f
"tcp://{host_ip_trigemu}:12345",
408 "triginh" : f
"tcp://{host_ip_df}:12346",
409 "timesync": f
"tcp://{host_ip_df}:12347"
412 with open(json_file_trigemu,
'w')
as f:
415 NUMBER_OF_DATA_PRODUCERS = number_of_data_producers,
416 DATA_RATE_SLOWDOWN_FACTOR = data_rate_slowdown_factor,
417 RUN_NUMBER = run_number,
418 TRIGGER_RATE_HZ = trigger_rate_hz,
419 DATA_FILE = data_file,
420 OUTPUT_PATH = output_path
423 with open(json_file_df,
'w')
as f:
426 NUMBER_OF_DATA_PRODUCERS = number_of_data_producers,
427 DATA_RATE_SLOWDOWN_FACTOR = data_rate_slowdown_factor,
428 RUN_NUMBER = run_number,
429 TRIGGER_RATE_HZ = trigger_rate_hz,
430 DATA_FILE = data_file,
431 OUTPUT_PATH = output_path,
432 DISABLE_OUTPUT = disable_data_storage,
433 TOKEN_COUNT = token_count
cli(number_of_data_producers, data_rate_slowdown_factor, run_number, trigger_rate_hz, data_file, output_path, disable_data_storage, token_count, host_ip_df, host_ip_trigemu, json_file_base)
generate_df(network_endpoints, NUMBER_OF_DATA_PRODUCERS=2, DATA_RATE_SLOWDOWN_FACTOR=1, RUN_NUMBER=333, TRIGGER_RATE_HZ=1.0, DATA_FILE="./frames.bin", OUTPUT_PATH=".", DISABLE_OUTPUT=False, TOKEN_COUNT=10)