80 NUMBER_OF_DATA_PRODUCERS=2,
81 DATA_RATE_SLOWDOWN_FACTOR = 1,
83 TRIGGER_RATE_HZ = 1.0,
84 DATA_FILE="./frames.bin",
90 trigger_interval_ticks = math.floor((1/TRIGGER_RATE_HZ) * CLOCK_SPEED_HZ/DATA_RATE_SLOWDOWN_FACTOR)
94 app.QueueSpec(inst=
"time_sync_to_netq", kind=
'FollyMPMCQueue', capacity=100),
95 app.QueueSpec(inst=
"time_sync_from_netq", kind=
'FollySPSCQueue', capacity=100),
97 app.QueueSpec(inst=
"token_to_netq", kind=
'FollySPSCQueue', capacity=20),
98 app.QueueSpec(inst=
"token_from_netq", kind=
'FollySPSCQueue', capacity=20),
100 app.QueueSpec(inst=
"trigger_decision_to_netq", kind=
'FollySPSCQueue', capacity=20),
101 app.QueueSpec(inst=
"trigger_decision_from_netq", kind=
'FollySPSCQueue', capacity=20),
103 app.QueueSpec(inst=
"trigger_decision_copy_for_bookkeeping", kind=
'FollySPSCQueue', capacity=20),
104 app.QueueSpec(inst=
"trigger_record_q", kind=
'FollySPSCQueue', capacity=20),
105 app.QueueSpec(inst=
"data_fragments_q", kind=
'FollyMPMCQueue', capacity=100),
107 app.QueueSpec(inst=f
"data_requests_{idx}", kind=
'FollySPSCQueue', capacity=20)
108 for idx
in range(NUMBER_OF_DATA_PRODUCERS)
113 queue_specs = app.QueueSpecs(sorted(queue_bare_specs, key=
lambda x: x.inst))
117 mspec(
"ntoq_trigdec",
"NetworkToQueue", [
118 app.QueueInfo(name=
"output", inst=
"trigger_decision_from_netq", dir=
"output")
121 mspec(
"qton_trigdec",
"QueueToNetwork", [
122 app.QueueInfo(name=
"input", inst=
"trigger_decision_to_netq", dir=
"input")
125 mspec(
"ntoq_token",
"NetworkToQueue", [
126 app.QueueInfo(name=
"output", inst=
"token_from_netq", dir=
"output")
129 mspec(
"qton_token",
"QueueToNetwork", [
130 app.QueueInfo(name=
"input", inst=
"token_to_netq", dir=
"input")
133 mspec(
"ntoq_timesync",
"NetworkToQueue", [
134 app.QueueInfo(name=
"output", inst=
"time_sync_from_netq", dir=
"output")
137 mspec(
"qton_timesync",
"QueueToNetwork", [
138 app.QueueInfo(name=
"input", inst=
"time_sync_to_netq", dir=
"input")
141 mspec(
"tde",
"TriggerDecisionEmulator", [
142 app.QueueInfo(name=
"time_sync_source", inst=
"time_sync_from_netq", dir=
"input"),
143 app.QueueInfo(name=
"token_source", inst=
"token_from_netq", dir=
"input"),
144 app.QueueInfo(name=
"trigger_decision_sink", inst=
"trigger_decision_to_netq", dir=
"output"),
147 mspec(
"rqg",
"RequestGenerator", [
148 app.QueueInfo(name=
"trigger_decision_input_queue", inst=
"trigger_decision_from_netq", dir=
"input"),
149 app.QueueInfo(name=
"trigger_decision_for_event_building", inst=
"trigger_decision_copy_for_bookkeeping", dir=
"output"),
151 app.QueueInfo(name=f
"data_request_{idx}_output_queue", inst=f
"data_requests_{idx}", dir=
"output")
152 for idx
in range(NUMBER_OF_DATA_PRODUCERS)
155 mspec(
"ffr",
"FragmentReceiver", [
156 app.QueueInfo(name=
"trigger_decision_input_queue", inst=
"trigger_decision_copy_for_bookkeeping", dir=
"input"),
157 app.QueueInfo(name=
"trigger_record_output_queue", inst=
"trigger_record_q", dir=
"output"),
158 app.QueueInfo(name=
"data_fragment_input_queue", inst=
"data_fragments_q", dir=
"input"),
161 mspec(
"datawriter",
"DataWriterModule", [
162 app.QueueInfo(name=
"trigger_record_input_queue", inst=
"trigger_record_q", dir=
"input"),
163 app.QueueInfo(name=
"token_output_queue", inst=
"token_to_netq", dir=
"output"),
166 mspec(
"fake_timesync_source",
"FakeTimeSyncSource", [
167 app.QueueInfo(name=
"time_sync_sink", inst=
"time_sync_to_netq", dir=
"output"),
172 mspec(f
"fakedataprod_{idx}",
"FakeDataProdModule", [
173 app.QueueInfo(name=
"data_request_input_queue", inst=f
"data_requests_{idx}", dir=
"input"),
174 app.QueueInfo(name=
"data_fragment_output_queue", inst=
"data_fragments_q", dir=
"output"),
175 ])
for idx
in range(NUMBER_OF_DATA_PRODUCERS)
178 init_specs = app.Init(queues=queue_specs, modules=mod_specs)
180 jstr = json.dumps(init_specs.pod(), indent=4, sort_keys=
True)
183 initcmd = rccmd.RCCommand(
184 id=basecmd.CmdId(
"init"),
186 exit_state=
"INITIAL",
190 confcmd = mrccmd(
"conf",
"INITIAL",
"CONFIGURED",[
191 (
"qton_trigdec", qton.Conf(msg_type=
"dunedaq::dfmessages::TriggerDecision",
192 msg_module_name=
"TriggerDecisionNQ",
193 sender_config=nos.Conf(ipm_plugin_type=
"ZmqSender",
194 address=
"tcp://127.0.0.1:12345",
199 (
"ntoq_trigdec", ntoq.Conf(msg_type=
"dunedaq::dfmessages::TriggerDecision",
200 msg_module_name=
"TriggerDecisionNQ",
201 receiver_config=nor.Conf(ipm_plugin_type=
"ZmqReceiver",
202 address=
"tcp://127.0.0.1:12345")
206 (
"qton_token", qton.Conf(msg_type=
"dunedaq::dfmessages::TriggerDecisionToken",
207 msg_module_name=
"TriggerDecisionTokenNQ",
208 sender_config=nos.Conf(ipm_plugin_type=
"ZmqSender",
209 address=
"tcp://127.0.0.1:12346",
214 (
"ntoq_token", ntoq.Conf(msg_type=
"dunedaq::dfmessages::TriggerDecisionToken",
215 msg_module_name=
"TriggerDecisionTokenNQ",
216 receiver_config=nor.Conf(ipm_plugin_type=
"ZmqReceiver",
217 address=
"tcp://127.0.0.1:12346")
221 (
"qton_timesync", qton.Conf(msg_type=
"dunedaq::dfmessages::TimeSync",
222 msg_module_name=
"TimeSyncNQ",
223 sender_config=nos.Conf(ipm_plugin_type=
"ZmqSender",
224 address=
"tcp://127.0.0.1:12347",
229 (
"ntoq_timesync", ntoq.Conf(msg_type=
"dunedaq::dfmessages::TimeSync",
230 msg_module_name=
"TimeSyncNQ",
231 receiver_config=nor.Conf(ipm_plugin_type=
"ZmqReceiver",
232 address=
"tcp://127.0.0.1:12347")
236 (
"tde", tde.ConfParams(
237 sourceids=[idx
for idx
in range(NUMBER_OF_DATA_PRODUCERS)],
238 min_links_in_request=NUMBER_OF_DATA_PRODUCERS,
239 max_links_in_request=NUMBER_OF_DATA_PRODUCERS,
240 min_readout_window_ticks=1200,
241 max_readout_window_ticks=1200,
242 trigger_window_offset=1000,
244 trigger_delay_ticks=math.floor( 2* CLOCK_SPEED_HZ/DATA_RATE_SLOWDOWN_FACTOR),
249 trigger_interval_ticks=trigger_interval_ticks,
250 clock_frequency_hz=CLOCK_SPEED_HZ/DATA_RATE_SLOWDOWN_FACTOR
252 (
"rqg", rqg.ConfParams(
253 map=rqg.mapsourceidqueue([
254 rqg.sourceidinst(source_id=idx, queueinstance=f
"data_requests_{idx}")
for idx
in range(NUMBER_OF_DATA_PRODUCERS)
257 (
"ffr", ffr.ConfParams(
258 general_queue_timeout=QUEUE_POP_WAIT_MS
260 (
"datawriter", dw.ConfParams(
261 initial_token_count=TOKEN_COUNT,
262 data_store_parameters=hdf5ds.ConfParams(
265 directory_path = OUTPUT_PATH,
267 max_file_size_bytes = 1073741834,
268 disable_unique_filename_suffix =
False,
269 filename_parameters = hdf5ds.HDF5DataStoreFileNameParams(
270 overall_prefix =
"fake_minidaqapp",
272 file_index_prefix =
"file"
274 file_layout_parameters = hdf5ds.HDF5DataStoreFileLayoutParams(
275 trigger_record_name_prefix=
"TriggerRecord",
276 digits_for_trigger_number = 5,
280 (
"fake_timesync_source", ftss.ConfParams(
281 sync_interval_ticks = (CLOCK_SPEED_HZ/DATA_RATE_SLOWDOWN_FACTOR),
282 clock_frequency_hz = (CLOCK_SPEED_HZ/DATA_RATE_SLOWDOWN_FACTOR),
285 (f
"fakedataprod_{idx}", fdp.ConfParams(
286 temporarily_hacked_link_number = idx
287 ))
for idx
in range(NUMBER_OF_DATA_PRODUCERS)
290 jstr = json.dumps(confcmd.pod(), indent=4, sort_keys=
True)
293 startpars = rccmd.StartParams(run=RUN_NUMBER, disable_data_storage=DISABLE_OUTPUT)
294 startcmd = mrccmd(
"start",
"CONFIGURED",
"RUNNING", [
295 (
"ntoq_trigdec", startpars),
296 (
"qton_trigdec", startpars),
297 (
"ntoq_token", startpars),
298 (
"qton_token", startpars),
299 (
"ntoq_timesync", startpars),
300 (
"qton_timesync", startpars),
301 (
"datawriter", startpars),
303 (
"fakedataprod_.*", startpars),
305 (
"fake_timesync_source", startpars),
309 jstr = json.dumps(startcmd.pod(), indent=4, sort_keys=
True)
310 print(
"="*80+
"\nStart\n\n", jstr)
313 stopcmd = mrccmd(
"stop",
"RUNNING",
"CONFIGURED", [
314 (
"ntoq_trigdec",
None),
315 (
"qton_trigdec",
None),
316 (
"ntoq_timesync",
None),
317 (
"qton_timesync",
None),
318 (
"ntoq_token",
None),
319 (
"qton_token",
None),
320 (
"fake_timesync_source",
None),
323 (
"fakedataprod_.*",
None),
325 (
"datawriter",
None),
328 jstr = json.dumps(stopcmd.pod(), indent=4, sort_keys=
True)
329 print(
"="*80+
"\nStop\n\n", jstr)
331 pausecmd = mrccmd(
"pause",
"RUNNING",
"RUNNING", [
335 jstr = json.dumps(pausecmd.pod(), indent=4, sort_keys=
True)
336 print(
"="*80+
"\nPause\n\n", jstr)
338 resumecmd = mrccmd(
"resume",
"RUNNING",
"RUNNING", [
339 (
"tde", tde.ResumeParams(
340 trigger_interval_ticks=trigger_interval_ticks
344 jstr = json.dumps(resumecmd.pod(), indent=4, sort_keys=
True)
345 print(
"="*80+
"\nResume\n\n", jstr)
347 scrapcmd = mcmd(
"scrap", [
351 jstr = json.dumps(scrapcmd.pod(), indent=4, sort_keys=
True)
352 print(
"="*80+
"\nScrap\n\n", jstr)
355 cmd_seq = [initcmd, confcmd, startcmd, stopcmd, pausecmd, resumecmd, scrapcmd]
358 jstr = json.dumps([c.pod()
for c
in cmd_seq], indent=4, sort_keys=
True)
377 def cli(number_of_data_producers, data_rate_slowdown_factor, run_number, trigger_rate_hz, data_file, output_path, disable_data_storage,token_count, json_file):
379 JSON_FILE: Input raw data file.
380 JSON_FILE: Output json configuration file.
383 with open(json_file,
'w')
as f:
385 NUMBER_OF_DATA_PRODUCERS = number_of_data_producers,
386 DATA_RATE_SLOWDOWN_FACTOR = data_rate_slowdown_factor,
387 RUN_NUMBER = run_number,
388 TRIGGER_RATE_HZ = trigger_rate_hz,
389 DATA_FILE = data_file,
390 OUTPUT_PATH = output_path,
391 DISABLE_OUTPUT = disable_data_storage,
392 TOKEN_COUNT = token_count
395 print(f
"'{json_file}' generation completed.")