Line data Source code
1 : /**
2 : * @file DFApplication.cpp
3 : *
4 : * Implementation of DFApplication's generate_modules dal method
5 : *
6 : * This is part of the DUNE DAQ Software Suite, copyright 2023.
7 : * Licensing/copyright details are in the COPYING file that you should have
8 : * received with this code.
9 : */
10 :
11 : #include "ConfigObjectFactory.hpp"
12 : #include "appmodel/ConfigurationHelper.hpp"
13 : #include "appmodel/DFApplication.hpp"
14 : #include "appmodel/DataStoreConf.hpp"
15 : #include "appmodel/DataWriterConf.hpp"
16 : #include "appmodel/DataWriterModule.hpp"
17 : #include "appmodel/FilenameParams.hpp"
18 : #include "appmodel/NetworkConnectionDescriptor.hpp"
19 : #include "appmodel/NetworkConnectionRule.hpp"
20 : #include "appmodel/QueueConnectionRule.hpp"
21 : #include "appmodel/QueueDescriptor.hpp"
22 : #include "appmodel/SourceIDConf.hpp"
23 : #include "appmodel/TRBConf.hpp"
24 : #include "appmodel/TRBModule.hpp"
25 : #include "appmodel/appmodelIssues.hpp"
26 :
27 : #include "confmodel/Connection.hpp"
28 : #include "confmodel/NetworkConnection.hpp"
29 :
30 : #include "logging/Logging.hpp"
31 : #include "oks/kernel.hpp"
32 :
33 : #include <fmt/core.h>
34 : #include <string>
35 : #include <vector>
36 :
37 : namespace dunedaq {
38 : namespace appmodel {
39 :
40 :
41 : static inline void
42 0 : fill_sourceid_object(const ConfigObjectFactory& obj_fac,
43 : const conffwk::ConfigObject* netConn,
44 : const std::string& uid,
45 : const std::vector<uint32_t>& stream_source_ids,
46 : const std::vector<const SourceIDConf*>& tp_source_ids,
47 : conffwk::ConfigObject& sidNetObj,
48 : std::vector<std::shared_ptr<conffwk::ConfigObject>> sidObjs)
49 : {
50 0 : sidNetObj.set_obj("netconn", netConn);
51 :
52 0 : std::vector<const conffwk::ConfigObject*> source_id_objs;
53 :
54 0 : for (auto& source_id : stream_source_ids) {
55 0 : std::string streamSidUid(uid + "SourceIDConf" + std::to_string(source_id));
56 0 : auto stream_sid_obj = std::make_shared<conffwk::ConfigObject>(obj_fac.create("SourceIDConf", streamSidUid));
57 0 : stream_sid_obj->set_by_val<uint32_t>("sid", source_id);
58 0 : stream_sid_obj->set_by_val<std::string>("subsystem", "Detector_Readout");
59 0 : sidObjs.push_back(stream_sid_obj);
60 0 : source_id_objs.push_back(sidObjs.back().get());
61 0 : }
62 :
63 0 : for (auto tp_sid : tp_source_ids) {
64 0 : sidObjs.push_back(std::make_shared<conffwk::ConfigObject>(tp_sid->config_object()));
65 0 : source_id_objs.push_back(sidObjs.back().get());
66 : }
67 : /*
68 : std::string trgSidUid(roapp->UID() + "TRGSourceIDConf" + std::to_string(roapp->get_tp_source_id()));
69 : auto trig_sid_obj = std::make_shared<conffwk::ConfigObject>(obj_fac.create("SourceIDConf", trgSidUid));
70 : trig_sid_obj->set_by_val<uint32_t>("sid", roapp->get_tp_source_id());
71 : trig_sid_obj->set_by_val<std::string>("subsystem", "Trigger");
72 : source_id_objs.push_back(sidObjs.back().get());
73 : */
74 :
75 0 : sidNetObj.set_objs("source_ids", source_id_objs);
76 0 : }
77 :
78 :
79 : inline void
80 0 : fill_replay_sourceid_object(const ConfigObjectFactory& obj_fac,
81 : const std::string& uid,
82 : const std::vector<const SourceIDConf*>& tp_source_ids,
83 : std::vector<conffwk::ConfigObject>* netConn,
84 : std::vector<conffwk::ConfigObject>* sidNetObj,
85 : const NetworkConnectionDescriptor* descriptor,
86 : std::vector<std::shared_ptr<conffwk::ConfigObject>> sidObjs)
87 : {
88 0 : std::vector<const conffwk::ConfigObject*> source_id_objs;
89 :
90 0 : for (auto tp_sid : tp_source_ids) {
91 : // get name extension
92 0 : std::string name = tp_sid->UID();
93 0 : size_t pos = name.find_last_of('-');
94 0 : std::string ext;
95 0 : if (pos != std::string::npos) {
96 0 : ext = name.substr(pos);
97 : }
98 :
99 : // set Network connections
100 0 : std::string dreqNetUid(uid + ext);
101 0 : netConn->emplace_back(
102 0 : obj_fac.create_net_obj(descriptor, dreqNetUid));
103 0 : netConn->back().set_by_val<std::string>("data_type", descriptor->get_data_type());
104 0 : netConn->back().set_by_val<std::string>("connection_type", descriptor->get_connection_type());
105 0 : auto serviceObj = descriptor->get_associated_service()->config_object();
106 0 : netConn->back().set_obj("associated_service", &serviceObj);
107 :
108 : // set SourceID to Network connections
109 0 : std::string sidToNetUid(uid + ext + "-sids");
110 0 : sidNetObj->emplace_back(
111 0 : obj_fac.create("SourceIDToNetworkConnection", sidToNetUid));
112 0 : sidNetObj->back().set_obj("netconn", &netConn->back());
113 :
114 : // set SourceID objs
115 0 : sidObjs.push_back(std::make_shared<conffwk::ConfigObject>(tp_sid->config_object()));
116 0 : sidNetObj->back().set_objs("source_ids", { sidObjs.back().get() });
117 0 : }
118 0 : }
119 :
120 :
121 :
122 : void
123 0 : DFApplication::generate_modules(
124 : std::shared_ptr<appmodel::ConfigurationHelper> helper) const {
125 :
126 0 : ConfigObjectFactory obj_fac(this);
127 :
128 0 : std::vector<const confmodel::DaqModule*> modules;
129 :
130 : // Containers for module specific config objects for output/input
131 : // Prepare TRB output objects
132 0 : std::vector<const conffwk::ConfigObject*> trbInputObjs;
133 0 : std::vector<const conffwk::ConfigObject*> trbOutputObjs;
134 0 : std::vector<const conffwk::ConfigObject*> trbSidNetObjs;
135 :
136 : // -- First, we process expected Queue and Network connections and create their objects.
137 :
138 : // Process the queue rules looking for the TriggerRecord queue between TRB and DataWriterModule
139 0 : const QueueDescriptor* trQDesc = nullptr;
140 0 : for (auto rule : get_queue_rules()) {
141 0 : auto destination_class = rule->get_destination_class();
142 0 : if (destination_class == "DataWriterModule") {
143 0 : trQDesc = rule->get_descriptor();
144 : }
145 0 : }
146 0 : if (trQDesc == nullptr) { // BadConf if no descriptor between TRB and DataWriterModule
147 0 : throw(BadConf(ERS_HERE, "Could not find queue descriptor rule for TriggerRecords!"));
148 : }
149 : // Create queue connection config object
150 0 : auto trQueueObj = obj_fac.create_queue_obj(trQDesc, UID());
151 :
152 : // Place trigger record queue object into vector of output objs of TRB module
153 0 : trbOutputObjs.push_back(&trQueueObj);
154 :
155 : // Process the network rules looking for the Fragments and TriggerDecision inputs for TRB
156 0 : const NetworkConnectionDescriptor* fragNetDesc = nullptr;
157 0 : const NetworkConnectionDescriptor* trigdecNetDesc = nullptr;
158 0 : const NetworkConnectionDescriptor* tokenNetDesc = nullptr;
159 0 : const NetworkConnectionDescriptor* trmonReqNetDesc = nullptr;
160 0 : const NetworkConnectionDescriptor* trmonTRNetDesc = nullptr;
161 0 : for (auto rule : get_network_rules()) {
162 0 : auto descriptor = rule->get_descriptor();
163 0 : auto data_type = descriptor->get_data_type();
164 0 : if (data_type == "Fragment") {
165 0 : fragNetDesc = rule->get_descriptor();
166 0 : } else if (data_type == "TriggerDecision") {
167 0 : trigdecNetDesc = rule->get_descriptor();
168 0 : } else if (data_type == "TriggerDecisionToken") {
169 0 : tokenNetDesc = rule->get_descriptor();
170 0 : } else if (data_type == "TRMonRequest") {
171 0 : trmonReqNetDesc = rule->get_descriptor();
172 0 : } else if (data_type == "TriggerRecord") {
173 0 : trmonTRNetDesc = rule->get_descriptor();
174 : }
175 0 : }
176 0 : if (fragNetDesc == nullptr) { // BadConf if no descriptor for Fragments into TRB
177 0 : throw(BadConf(ERS_HERE, "Could not find network descriptor rule for input Fragments!"));
178 : }
179 0 : if (trigdecNetDesc == nullptr) { // BadCond if no descriptor for TriggerDecisions into TRB
180 0 : throw(BadConf(ERS_HERE, "Could not find network descriptor rule for input TriggerDecisions!"));
181 : }
182 0 : if (tokenNetDesc == nullptr) { // BadCond if no descriptor for Tokens out of DataWriterModule
183 0 : throw(BadConf(ERS_HERE, "Could not find network descriptor rule for output TriggerDecisionTokens!"));
184 : }
185 0 : if (get_source_id() == nullptr) {
186 0 : throw(BadConf(ERS_HERE, "Could not retrieve SourceIDConf"));
187 : }
188 : // Create network connection config object
189 0 : auto fragNetObj = obj_fac.create_net_obj(fragNetDesc, UID());
190 0 : auto trigdecNetObj = obj_fac.create_net_obj(trigdecNetDesc, UID());
191 0 : auto tokenNetObj = obj_fac.create_net_obj(tokenNetDesc, "");
192 0 : conffwk::ConfigObject trmonReqNetObj;
193 0 : conffwk::ConfigObject trmonTRNetObj;
194 0 : if (trmonReqNetDesc != nullptr) {
195 0 : trmonReqNetObj = obj_fac.create_net_obj(trmonReqNetDesc, UID());
196 : }
197 0 : if (trmonTRNetDesc != nullptr) {
198 0 : trmonTRNetObj = obj_fac.create_net_obj(trmonTRNetDesc, "");
199 : }
200 :
201 : // Process special Network rules!
202 : // Looking for DataRequest rules from ReadoutAppplications in current Session
203 0 : std::vector<conffwk::ConfigObject> dreqNetObjs;
204 0 : std::vector<conffwk::ConfigObject> sidNetObjs;
205 0 : std::vector<std::shared_ptr<conffwk::ConfigObject>> sidObjs;
206 0 : std::set<std::string> processed_apps;
207 0 : for (auto uid: helper->get_app_uids("DFApplication")) {
208 0 : processed_apps.insert(uid);
209 0 : }
210 :
211 0 : auto stream_src_ids = helper->get_stream_source_ids();
212 0 : auto tp_src_ids = helper->get_tp_source_ids();
213 0 : for (auto [uid, descriptor]:
214 0 : helper->get_netdescriptors("DataRequest", "ReadoutApplication")) {
215 0 : dreqNetObjs.emplace_back(obj_fac.create_net_obj(descriptor, uid));
216 :
217 0 : std::string sidToNetUid(descriptor->get_uid_base() + uid + "-sids");
218 0 : sidNetObjs.emplace_back(obj_fac.create("SourceIDToNetworkConnection", sidToNetUid));
219 :
220 0 : fill_sourceid_object(obj_fac,
221 0 : &dreqNetObjs.back(),
222 : uid,
223 0 : stream_src_ids.at(uid),
224 0 : tp_src_ids.at(uid),
225 0 : sidNetObjs.back(),
226 : sidObjs);
227 0 : processed_apps.insert(uid);
228 0 : }
229 :
230 0 : for (auto [uid, descriptor]:
231 0 : helper->get_netdescriptors("DataRequest", "TPReplayApplication")) {
232 0 : fill_replay_sourceid_object(obj_fac,
233 : uid,
234 0 : tp_src_ids.at(uid),
235 : &dreqNetObjs,
236 : &sidNetObjs,
237 : descriptor,
238 : sidObjs);
239 0 : processed_apps.insert(uid);
240 0 : }
241 :
242 :
243 :
244 :
245 0 : for (auto [uid, descriptor]:
246 0 : helper->get_netdescriptors("DataRequest", "FakeDataApplication")) {
247 0 : dreqNetObjs.emplace_back(obj_fac.create_net_obj(descriptor, uid));
248 :
249 0 : std::string sidToNetUid(descriptor->get_uid_base() + uid + "-sids");
250 0 : sidNetObjs.emplace_back(obj_fac.create("SourceIDToNetworkConnection", sidToNetUid));
251 :
252 0 : fill_sourceid_object(obj_fac,
253 0 : &dreqNetObjs.back(),
254 : uid,
255 0 : stream_src_ids.at(uid),
256 : {}, // No tp src_ids for FakeDataApplication
257 0 : sidNetObjs.back(),
258 : sidObjs);
259 0 : processed_apps.insert(uid);
260 0 : }
261 :
262 : // now we treat the CTB which has 2 connections related to source IDs
263 0 : const auto ctb_type = "CTBApplication";
264 0 : for (auto [uid, descriptor]: helper->get_netdescriptors("DataRequest", ctb_type)) {
265 :
266 0 : if (processed_apps.contains(uid)) {
267 0 : continue;
268 : }
269 :
270 0 : for ( const auto & [uid, rel_sources] :
271 0 : helper->get_all_app_source_ids(ctb_type) ) {
272 0 : for ( auto [rel, id] : rel_sources ) {
273 0 : std::string local_uid = uid;
274 0 : local_uid += rel.find("LLT")!=std::string::npos ? "_LLT" : "_HLT";
275 :
276 0 : dreqNetObjs.emplace_back(obj_fac.create_net_obj(descriptor, local_uid));
277 0 : sidObjs.push_back(std::make_shared<conffwk::ConfigObject>(id->config_object()));
278 :
279 0 : std::string sidToNetUid(descriptor->get_uid_base() + local_uid);
280 0 : sidNetObjs.emplace_back(obj_fac.create("SourceIDToNetworkConnection", sidToNetUid));
281 0 : sidNetObjs.back().set_objs("source_ids", {sidObjs.back().get()});
282 0 : sidNetObjs.back().set_obj("netconn", &dreqNetObjs.back());
283 :
284 0 : } // loop on relational sources
285 :
286 0 : processed_apps.insert(uid);
287 0 : } // loop over CTB apps
288 0 : } // loop over descriptors for the CTB apps
289 :
290 0 : auto app_sources = helper->get_app_source_ids();
291 : // Now look at all Smart apps that are not Readout, FakeData or DF
292 0 : for (auto [uid, descriptor]: helper->get_netdescriptors("DataRequest")) {
293 :
294 :
295 0 : if (processed_apps.contains(uid)) {
296 0 : continue;
297 : }
298 0 : if (app_sources.contains(uid)) {
299 0 : dreqNetObjs.emplace_back(obj_fac.create_net_obj(descriptor, uid));
300 :
301 0 : sidObjs.push_back(std::make_shared<conffwk::ConfigObject>(
302 0 : app_sources.at(uid)->config_object()));
303 :
304 0 : std::string sidToNetUid(descriptor->get_uid_base() + uid + "-sids");
305 0 : sidNetObjs.emplace_back(obj_fac.create("SourceIDToNetworkConnection", sidToNetUid));
306 0 : sidNetObjs.back().set_objs("source_ids", {sidObjs.back().get()});
307 0 : sidNetObjs.back().set_obj("netconn", &dreqNetObjs.back());
308 :
309 0 : processed_apps.insert(uid);
310 0 : }
311 0 : }
312 :
313 :
314 : // Get pointers to objects here, after vector has been filled so they don't move on us
315 0 : for (auto& obj : dreqNetObjs) {
316 0 : trbOutputObjs.push_back(&obj);
317 : }
318 0 : for (auto& obj : sidNetObjs) {
319 0 : trbSidNetObjs.push_back(&obj);
320 : }
321 :
322 : // -- Second, we create the Module objects and assign their configs, with the precreated
323 : // -- connection config objects above.
324 :
325 : // Get TRB Config Object
326 0 : auto trbConf = get_trb();
327 0 : if (trbConf == nullptr) {
328 : throw(BadConf(ERS_HERE, "No DataWriterModule or TRB configuration given"));
329 : }
330 0 : auto trbConfObj = trbConf->config_object();
331 0 : trbConfObj.set_by_val<uint32_t>("source_id", get_source_id()->get_sid());
332 0 : trbInputObjs = { &trigdecNetObj, &fragNetObj };
333 0 : if (trmonReqNetDesc != nullptr) {
334 0 : trbInputObjs.push_back(&trmonReqNetObj);
335 : }
336 0 : if (trmonTRNetDesc != nullptr) {
337 0 : trbOutputObjs.push_back(&trmonTRNetObj);
338 : }
339 : // Prepare TRB Module Object and assign its Config Object.
340 0 : std::string trbUid(UID() + "-trb");
341 0 : conffwk::ConfigObject trbObj = obj_fac.create("TRBModule", trbUid);
342 0 : trbObj.set_obj("configuration", &trbConfObj);
343 0 : trbObj.set_objs("inputs", trbInputObjs);
344 0 : trbObj.set_objs("outputs", trbOutputObjs);
345 0 : trbObj.set_obj("trigger_record_output", &trQueueObj);
346 0 : trbObj.set_objs("request_connections", trbSidNetObjs);
347 : // Push TRB Module Object from confdb
348 0 : modules.push_back(obj_fac.get_dal<TRBModule>(trbUid));
349 :
350 : // Get DataWriterModule Config Object (only one for now, maybe more later?)
351 0 : auto dwrConfs = get_data_writers();
352 0 : if (dwrConfs.size() == 0) {
353 0 : throw(BadConf(ERS_HERE, "No DataWriterModule or TRB configuration given"));
354 : }
355 0 : uint dw_idx = 0;
356 0 : for (auto dwrConf : dwrConfs) {
357 : // auto fnParamsObj = dwrConf->get_data_store_params()->get_filename_params()->config_object();
358 : // fnParamsObj.set_by_val<std::string>("writer_identifier", fmt::format("{}_datawriter-{}", UID(), dw_idx));
359 0 : auto dwrConfObj = dwrConf->config_object();
360 :
361 : // Prepare DataWriterModule Module Object and assign its Config Object.
362 0 : std::string dwrUid(fmt::format("{}-dw-{}", UID(), dw_idx));
363 0 : conffwk::ConfigObject dwrObj = obj_fac.create("DataWriterModule", dwrUid);
364 0 : dwrObj.set_by_val("writer_identifier", fmt::format("{}_dw_{}", UID(), dw_idx));
365 0 : dwrObj.set_obj("configuration", &dwrConfObj);
366 0 : dwrObj.set_objs("inputs", { &trQueueObj });
367 0 : dwrObj.set_objs("outputs", { &tokenNetObj });
368 : // Push DataWriterModule Module Object from confdb
369 0 : modules.push_back(obj_fac.get_dal<DataWriterModule>(dwrUid));
370 0 : ++dw_idx;
371 0 : }
372 :
373 0 : obj_fac.update_modules(modules);
374 0 : }
375 :
376 : } // namespace appmodel
377 : } // namespace dunedaq
|