DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
daphnemodulesapp_gen.py
Go to the documentation of this file.
1# This module facilitates the generation of daphnemodules DAQModules within daphnemodules apps
2
3
4# Set moo schema search path
5from dunedaq.env import get_moo_model_path
6import moo.io
7moo.io.default_load_path = get_moo_model_path()
8
9# Load configuration types
10import moo.otypes
11moo.otypes.load_types("daphnemodules/DaphneV2ControllerModule.jsonnet")
12
13import dunedaq.daphnemodules.DaphneV2ControllerModule as DaphneV2ControllerModule
14
15from daqconf.core.app import App, ModuleGraph
16from daqconf.core.daqmodule import DAQModule
17
18import json
19
20ip_base = "10.73.137."
21n_afe = 5
22n_channels = 40
23
24def unpack( j : dict, block : str ) -> dict :
25 ret = {}
26 if not j :
27 return ret
28
29 for e in j[block] :
30 ret[e['id']]=e['value']
31
32 return ret
33
34def to_adc( j : dict ) -> DaphneV2ControllerModule.ADCConf :
35 ret = DaphneV2ControllerModule.ADCConf(
36 resolution = j['resolution'],
37 output_format = j['output_format'],
38 SB_first = j['SB_first'])
39 return ret
40
41
42def to_pga( j : dict ) -> DaphneV2ControllerModule.PGAConf :
43 ret = DaphneV2ControllerModule.PGAConf(
44 lpf_cut_frequency = j['lpf_cut_frequnecy'],
45 integrator_disable = j['integrator_disable'],
46 gain = j['gain'] )
47 return ret
48
49def to_lna( j : dict ) -> DaphneV2ControllerModule.LNAConf :
50 ret = DaphneV2ControllerModule.LNAConf(
51 clamp = j['clamp'],
52 integrator_disable = j['integrator_disable'],
53 gain = j['gain'] )
54 return ret
55
56
58 slot : int,
59 ip : str,
60 timeout_ms : int,
61 biasctrl : int,
62 afe_gain : int,
63 channel_gain : int,
64 channel_offset : int,
65 adc : DaphneV2ControllerModule.ADCConf,
66 pga : DaphneV2ControllerModule.PGAConf,
67 lna : DaphneV2ControllerModule.LNAConf,
68 details,
69 nickname="daphne",
70 host="localhost"):
71 """
72 Here the configuration for an entire daq_application instance using DAQModules from daphnemodules is generated.
73
74 The map file, will profvide details to override whatever comes from the inputs
75 """
76
77 ext_conf = details
78
79 afes = []
80 ext_afe_gains = []
81 ext_biases = []
82 ext_adcs = []
83 ext_pgas = []
84 ext_lnas = []
85
86 if ext_conf :
87 afe_block = ext_conf['afes']
88 ext_afe_gains = unpack(afe_block, 'v_gains')
89 ext_biases = unpack(afe_block, 'v_biases')
90 ext_adcs = unpack(afe_block, 'adcs')
91 ext_pgas = unpack(afe_block, 'pgas')
92 ext_lnas = unpack(afe_block, 'lnas')
93
94 for afe in range(n_afe) :
95 afes.append( DaphneV2ControllerModule.AFE(
96 id=afe,
97 v_gain=afe_gain if afe not in ext_afe_gains else ext_afe_gains[afe],
98 v_bias = 0 if afe not in ext_biases else ext_biases[afe],
99 adc = adc if afe not in ext_adcs else to_adc(ext_adcs[afe]),
100 pga = pga if afe not in ext_pgas else to_pga(ext_pgas[afe]),
101 lna = lna if afe not in ext_lnas else to_lna(ext_lnas[afe])
102 ) )
103
104 channels=[]
105 ext_gains = []
106 ext_offsets = []
107 ext_trims = []
108 if ext_conf :
109 channel_block = ext_conf['channels']
110 ext_gains = unpack(channel_block, 'gains')
111 ext_offsets = unpack(channel_block, 'offsets')
112 ext_trims = unpack(channel_block, 'trims')
113
114 for ch in range(n_channels) :
115 conf = None
116
117 gain = channel_gain if ch not in ext_gains else ext_gains[ch]
118 offset = channel_offset if ch not in ext_offsets else ext_offsets[ch]
119 if ch in ext_trims :
120 conf = DaphneV2ControllerModule.ChannelConf(
121 gain = gain,
122 offset = offset,
123 trim = ext_trims[ch] )
124 else :
125 conf = DaphneV2ControllerModule.ChannelConf(
126 gain = gain,
127 offset = offset )
128
129 channels.append( DaphneV2ControllerModule.Channel( id = ch, conf = conf ) )
130
131 conf = DaphneV2ControllerModule.Conf(
132 daphne_address=ip,
133 slot=slot,
134 timeout_ms=timeout_ms,
135 biasctrl=biasctrl,
136 afes = afes,
137 channels = channels,
138 self_trigger_threshold = 0 if not ext_conf else ext_conf['self_trigger_threshold'],
139 full_stream_channels = [] if not ext_conf else ext_conf['full_stream_channels']
140 )
141
142 modules = [DAQModule(name = "controller",
143 plugin = "DaphneV2ControllerModule",
144 conf = conf
145 )]
146
147 mgraph = ModuleGraph(modules)
148 daphnemodules_app = App(modulegraph = mgraph, host = host, name = nickname)
149
150 return daphnemodules_app
151
152
get_daphnemodules_app(int slot, str ip, int timeout_ms, int biasctrl, int afe_gain, int channel_gain, int channel_offset, DaphneV2ControllerModule.ADCConf adc, DaphneV2ControllerModule.PGAConf pga, DaphneV2ControllerModule.LNAConf lna, details, nickname="daphne", host="localhost")
DaphneV2ControllerModule.PGAConf to_pga(dict j)
DaphneV2ControllerModule.ADCConf to_adc(dict j)
DaphneV2ControllerModule.LNAConf to_lna(dict j)