DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
ReadoutApplication.cpp
Go to the documentation of this file.
1
19#include "confmodel/Session.hpp"
20
24
27
30
33#include "confmodel/GeoId.hpp"
36#include "confmodel/Service.hpp"
37
43
52
53
54
56
57#include "logging/Logging.hpp"
58#include <fmt/core.h>
59
60#include <string>
61#include <vector>
62
63// using namespace dunedaq;
64// using namespace dunedaq::appmodel;
65
66namespace dunedaq {
67namespace appmodel {
68
69//-----------------------------------------------------------------------------
70std::vector<const confmodel::DaqModule*>
72{
73
74 TLOG_DEBUG(6) << "Generating modules for application " << this->UID();
75
76 ConfigObjectFactory obj_fac(this);
77 // conffwk::Configuration& confdb = this->configuration();
78
79 //
80 // Extract basic configuration objects
81 //
82
83 // Data reader
84 auto reader_conf = get_data_reader();
85 if (reader_conf == 0) {
86 throw(BadConf(ERS_HERE, "No DataReaderModule configuration given"));
87 }
88 std::string reader_class = reader_conf->get_template_for();
89
90 // Link handler
91 auto dlh_conf = get_link_handler();
92 // What is template for?
93 auto dlh_class = dlh_conf->get_template_for();
94
95 auto tph_conf = get_tp_handler();
96 if (tph_conf==nullptr && get_tp_generation_enabled()) {
97 throw(BadConf(ERS_HERE, "TP generation is enabled but there is no TP data handler configuration"));
98 }
99
100 std::string tph_class = "";
101 if (tph_conf != nullptr && get_tp_generation_enabled()) {
102 tph_class = tph_conf->get_template_for();
103 }
104
105 //
106 // Process the queue rules looking for inputs to our DL/TP handler modules
107 //
108 const QueueDescriptor* dlh_input_qdesc = nullptr;
109 const QueueDescriptor* dlh_reqinput_qdesc = nullptr;
110 const QueueDescriptor* tp_input_qdesc = nullptr;
111 // const QueueDescriptor* tpReqInputQDesc = nullptr;
112 const QueueDescriptor* fa_output_qdesc = nullptr;
113
114 for (auto rule : get_queue_rules()) {
115 auto destination_class = rule->get_destination_class();
116 auto data_type = rule->get_descriptor()->get_data_type();
117 // Why datahander here?
118 if (destination_class == "DataHandlerModule" || destination_class == dlh_class || destination_class == tph_class) {
119 if (data_type == "DataRequest") {
120 dlh_reqinput_qdesc = rule->get_descriptor();
121 } else if ((data_type == "TriggerPrimitive" || data_type == "TriggerPrimitiveVector") && get_tp_generation_enabled()) {
122 tp_input_qdesc = rule->get_descriptor();
123 } else {
124 dlh_input_qdesc = rule->get_descriptor();
125 }
126 } else if (destination_class == "FragmentAggregatorModule") {
127 fa_output_qdesc = rule->get_descriptor();
128 }
129 }
130
131 //
132 // Process the network rules looking for the Fragment Aggregator and TP handler data reuest inputs
133 //
134 const NetworkConnectionDescriptor* fa_net_desc = nullptr;
135 const NetworkConnectionDescriptor* tp_net_desc = nullptr;
136 const NetworkConnectionDescriptor* ta_net_desc = nullptr;
137 const NetworkConnectionDescriptor* ts_net_desc = nullptr;
138 for (auto rule : get_network_rules()) {
139 auto endpoint_class = rule->get_endpoint_class();
140 auto data_type = rule->get_descriptor()->get_data_type();
141
142 if (endpoint_class == "FragmentAggregatorModule") {
143 fa_net_desc = rule->get_descriptor();
144 } else if (data_type == "TPSet") {
145 tp_net_desc = rule->get_descriptor();
146 } else if (data_type == "TriggerActivity") {
147 ta_net_desc = rule->get_descriptor();
148 } else if (data_type == "TimeSync") {
149 ts_net_desc = rule->get_descriptor();
150 }
151 }
152
153 // Create here the Queue on which all data fragments are forwarded to the fragment aggregator
154 // and a container for the queues of data request to TP handler and DLH
155 if (fa_output_qdesc == nullptr) {
156 throw(BadConf(ERS_HERE, "No fragment output queue descriptor given"));
157 }
158 std::vector<const confmodel::Connection*> req_queues;
159 conffwk::ConfigObject frag_queue_obj = obj_fac.create_queue_obj(fa_output_qdesc);
160
161 //
162 // Scan Detector 2 DAQ connections to extract sender, receiver and stream information
163 //
164
165 std::vector<const confmodel::DaqModule*> modules;
166
167 // Loop over the detector to daq connections and generate one data reader per connection
168 // and the cooresponding datalink handlers
169
170 // Collect all streams
171 std::vector<const confmodel::DetectorStream*> all_enabled_det_streams;
172 std::map<uint32_t, const confmodel::Connection*> data_queues_by_sid;
173
174 // std::vector<const conffwk::ConfigObject*> d2d_conn_objs;
175 uint16_t conn_idx = 0;
176
177 for (auto d2d_conn_res : get_contains()) {
178
179 // Are we sure?
180 if (d2d_conn_res->disabled(*session)) {
181 TLOG_DEBUG(7) << "Ignoring disabled DetectorToDaqConnection " << d2d_conn_res->UID();
182 continue;
183 }
184
185 // d2d_conn_objs.push_back(&d2d_conn_res->config_object());
186
187 TLOG_DEBUG(6) << "Processing DetectorToDaqConnection " << d2d_conn_res->UID();
188 // get the readout groups and the interfaces and streams therein; 1 reaout group corresponds to 1 data reader module
189 auto d2d_conn = d2d_conn_res->cast<confmodel::DetectorToDaqConnection>();
190
191 if (!d2d_conn) {
192 throw(BadConf(ERS_HERE, "ReadoutApplication contains something other than DetectorToDaqConnection"));
193 }
194
195 if (d2d_conn->get_contains().empty()) {
196 throw(BadConf(ERS_HERE, "DetectorToDaqConnection does not contain sebders or receivers"));
197 }
198
199 // Loop over detector 2 daq connections to find senders and receivers
200 auto det_senders = d2d_conn->get_senders();
201 auto det_receiver = d2d_conn->get_receiver();
202
203 std::vector<const confmodel::DetectorStream*> enabled_det_streams;
204 // Loop over senders
205 for (auto stream : d2d_conn->get_streams()) {
206
207 // Are we sure?
208 if (stream->disabled(*session)) {
209 TLOG_DEBUG(7) << "Ignoring disabled DetectorStream " << stream->UID();
210 continue;
211 }
212
213 // loop over streams
214 all_enabled_det_streams.push_back(stream);
215 enabled_det_streams.push_back(stream);
216 }
217
218
219
220 // Here I want to resolve the type of connection (network, felix, or?)
221 // Rules of engagement: if the receiver interface is network or felix, the receivers should be castable to the counterpart
222 if (reader_class == "DPDKReaderModule" || reader_class == "SocketReaderModule") {
223 if ((reader_class == "DPDKReaderModule" && !det_receiver->cast<appmodel::DPDKReceiver>()) ||
224 (reader_class == "SocketReaderModule" && !det_receiver->cast<appmodel::SocketReceiver>())) {
225 throw(BadConf(ERS_HERE, fmt::format("{} requires NWDetDataReceiver, found {} of class {}", reader_class, det_receiver->UID(), det_receiver->class_name())));
226 }
227
228 bool all_nw_senders = true;
229 for (auto s : det_senders) {
230 all_nw_senders &= (s->cast<appmodel::NWDetDataSender>() != nullptr);
231 }
232
233 // Ensure that all senders are compatible with receiver
234 if (!all_nw_senders) {
235 throw(BadConf(ERS_HERE, "Non-network DetDataSener found with NWreceiver"));
236 }
237 }
238 else if (reader_class == "FelixReaderModule") {
239 if (!det_receiver->cast<appmodel::FelixDataReceiver>()) {
240 throw(BadConf(ERS_HERE, fmt::format("FelixReaderModule requires FelixDataReceiver, found {} of class {}", det_receiver->UID(), det_receiver->class_name())));
241 }
242
243 bool all_flx_senders = true;
244 for (auto s : det_senders) {
245 all_flx_senders &= (s->cast<appmodel::FelixDataSender>() != nullptr);
246 }
247
248 // Ensure that all senders are compatible with receiver
249 if (!all_flx_senders) {
250 throw(BadConf(ERS_HERE, "Non-felix DetDataSener found with FelixDataReceiver"));
251 }
252 }
253 // }
254
255 //-----------------------------------------------------------------
256 //
257 // Create DataReaderModule object
258 //
259
260 //
261 // Instantiate DataReaderModule of type DPDKReaderModule
262 //
263
264 // Create the Data reader object
265
266 std::string reader_uid(fmt::format("datareader-{}-{}", this->UID(), std::to_string(conn_idx++)));
267 TLOG_DEBUG(6) << fmt::format("creating OKS configuration object for Data reader class {} with id {}", reader_class, reader_uid);
268 auto reader_obj = obj_fac.create(reader_class, reader_uid);
269
270 // Populate configuration and interfaces (leave output queues for later)
271 reader_obj.set_obj("configuration", &reader_conf->config_object());
272 reader_obj.set_objs("connections", {&d2d_conn_res->config_object()});
273
274 // Create the raw data queues
275 std::vector<const conffwk::ConfigObject*> data_queue_objs;
276 // keep a map for convenience
277
278 // Create data queues
279 for (auto ds : enabled_det_streams) {
280 conffwk::ConfigObject queue_obj = obj_fac.create_queue_sid_obj(dlh_input_qdesc, ds);
281 const auto* data_queue = obj_fac.get_dal<confmodel::Connection>(queue_obj.UID());
282 data_queue_objs.push_back(&data_queue->config_object());
283 data_queues_by_sid[ds->get_source_id()] = data_queue;
284 }
285
286 reader_obj.set_objs("outputs", data_queue_objs);
287
288 modules.push_back(obj_fac.get_dal<confmodel::DaqModule>(reader_obj.UID()));
289
290 }
291
292
293 //-----------------------------------------------------------------
294 //
295 // Prepare the tp handlers and related queues
296 //
297 std::vector<const confmodel::Connection*> tp_queues;
299
300 // Create TP handler object
301 auto tph_conf_obj = tph_conf->config_object();
302 auto tpsrc_ids = get_tp_source_ids();
303
304 for (auto sid : tpsrc_ids) {
305 conffwk::ConfigObject tp_queue_obj;
306 conffwk::ConfigObject tpreq_queue_obj;
307 std::string tp_uid("tphandler-" + std::to_string(sid->get_sid()));
308 auto tph_obj = obj_fac.create(tph_class, tp_uid);
309 tph_obj.set_by_val<uint32_t>("source_id", sid->get_sid());
310 tph_obj.set_by_val<uint32_t>("detector_id", 1); // 1 == kDAQ
311 tph_obj.set_by_val<bool>("post_processing_enabled", get_ta_generation_enabled());
312 tph_obj.set_obj("module_configuration", &tph_conf_obj);
313
314 // Create the TPs aggregator queue (from RawData Handlers to TP handlers)
315 tp_queue_obj = obj_fac.create_queue_sid_obj(tp_input_qdesc, sid->get_sid());
316 tp_queue_obj.set_by_val<uint32_t>("recv_timeout_ms", 50);
317 tp_queue_obj.set_by_val<uint32_t>("send_timeout_ms", 1);
318
319 tp_queues.push_back(obj_fac.get_dal<confmodel::Connection>(tp_queue_obj.UID()));
320 // Create tp data requests queue from Fragment Aggregator
321 tpreq_queue_obj = obj_fac.create_queue_sid_obj(dlh_reqinput_qdesc, sid->get_sid());
322 req_queues.push_back(obj_fac.get_dal<confmodel::Connection>(tpreq_queue_obj.UID()));
323
324 // Create the tp(set) publishing service
325 conffwk::ConfigObject tp_net_obj = obj_fac.create_net_obj(tp_net_desc, tp_uid);
326
327 // Create the ta(set) publishing service
328 conffwk::ConfigObject ta_net_obj = obj_fac.create_net_obj(ta_net_desc, tp_uid);
329
330 // Register queues with tp hankder
331 tph_obj.set_objs("inputs", { &tp_queue_obj, &tpreq_queue_obj });
332 tph_obj.set_objs("outputs", { &tp_net_obj, &ta_net_obj, &frag_queue_obj });
333 modules.push_back(obj_fac.get_dal<confmodel::DaqModule>(tph_obj.UID()));
334 }
335 }
336
337 // Add output queueus of tps
338 std::vector<const conffwk::ConfigObject*> tp_queue_objs;
339 for (auto q : tp_queues) {
340 tp_queue_objs.push_back(&q->config_object());
341 }
342
343 //-----------------------------------------------------------------
344 //
345 // Create datalink handlers
346 //
347 // Recover the emulation flag
348 auto emulation_mode = reader_conf->get_emulation_mode();
349 for (auto ds : all_enabled_det_streams) {
350
351 uint32_t sid = ds->get_source_id();
352 TLOG_DEBUG(6) << fmt::format("Processing stream {}, id {}, det id {}", ds->UID(), ds->get_source_id(), ds->get_geo_id()->get_detector_id());
353 std::string uid(fmt::format("DLH-{}", sid));
354 TLOG_DEBUG(6) << fmt::format("creating OKS configuration object for Data Link Handler class {}, if {}", dlh_class, sid);
355 auto dlh_obj = obj_fac.create(dlh_class, uid);
356 dlh_obj.set_by_val<uint32_t>("source_id", sid);
357 dlh_obj.set_by_val<uint32_t>("detector_id", ds->get_geo_id()->get_detector_id());
358 dlh_obj.set_by_val<bool>("post_processing_enabled", get_tp_generation_enabled());
359 dlh_obj.set_by_val<bool>("emulation_mode", emulation_mode);
360 dlh_obj.set_obj("geo_id", &ds->get_geo_id()->config_object());
361 dlh_obj.set_obj("module_configuration", &dlh_conf->config_object());
362
363 std::vector<const conffwk::ConfigObject*> dlh_ins, dlh_outs;
364
365 // Add datalink-handler queue to the inputs
366 dlh_ins.push_back(&data_queues_by_sid.at(sid)->config_object());
367
368 // Create request queue
369 conffwk::ConfigObject req_queue_obj = obj_fac.create_queue_sid_obj(dlh_reqinput_qdesc, ds);
370
371
372 // Add the requessts queue dal pointer to the outputs of the FragmentAggregatorModule
373 req_queues.push_back(obj_fac.get_dal<confmodel::Connection>(req_queue_obj.UID()));
374 dlh_ins.push_back(&req_queue_obj);
375 dlh_outs.push_back(&frag_queue_obj);
376
377
378 // Time Sync network connection
379 if (dlh_conf->get_generate_timesync()) {
380 // Add timestamp endpoint
381 conffwk::ConfigObject ts_net_obj = obj_fac.create_net_obj(ts_net_desc, std::to_string(sid));
382 dlh_outs.push_back(&ts_net_obj);
383 }
384
385 for (auto tpq : tp_queue_objs) {
386 dlh_outs.push_back(tpq);
387 }
388 dlh_obj.set_objs("inputs", dlh_ins);
389 dlh_obj.set_objs("outputs", dlh_outs);
390
391 modules.push_back(obj_fac.get_dal<confmodel::DaqModule>(dlh_obj.UID()));
392 }
393
394
395 // Finally create Fragment Aggregator
396 std::string faUid("fragmentaggregator-" + UID());
397 // conffwk::ConfigObject frag_aggr;
398 TLOG_DEBUG(7) << "creating OKS configuration object for Fragment Aggregator class ";
399 auto frag_aggr = obj_fac.create("FragmentAggregatorModule", faUid);
400 conffwk::ConfigObject fa_net_obj = obj_fac.create_net_obj(fa_net_desc);
401
402 // Process special Network rules!
403 // Looking for Fragment rules from DFAppplications in current Session
404 auto sessionApps = session->get_enabled_applications();
405 std::vector<conffwk::ConfigObject> fragOutObjs;
406 for (auto app : sessionApps) {
407 auto dfapp = app->cast<appmodel::DFApplication>();
408 if (dfapp == nullptr)
409 continue;
410
411 auto dfNRules = dfapp->get_network_rules();
412 for (auto rule : dfNRules) {
413 auto descriptor = rule->get_descriptor();
414 auto data_type = descriptor->get_data_type();
415 if (data_type == "Fragment") {
416 std::string dreqNetUid(descriptor->get_uid_base() + dfapp->UID());
417 // conffwk::ConfigObject frag_conn;
418 // confdb.create(dbfile, "NetworkConnection", dreqNetUid, frag_conn);
419 auto frag_conn = obj_fac.create("NetworkConnection", dreqNetUid);
420
421 frag_conn.set_by_val<std::string>("data_type", descriptor->get_data_type());
422 frag_conn.set_by_val<std::string>("connection_type", descriptor->get_connection_type());
423
424 auto serviceObj = descriptor->get_associated_service()->config_object();
425 frag_conn.set_obj("associated_service", &serviceObj);
426 fragOutObjs.push_back(frag_conn);
427 } // If network rule has TriggerDecision type of data
428 } // Loop over Apps network rules
429 } // loop over Session specific Apps
430
431 // Add output queueus of data requests and Fragments
432 std::vector<const conffwk::ConfigObject*> fa_output_objs;
433 for (auto& fNet : fragOutObjs) {
434 fa_output_objs.push_back(&fNet);
435 }
436
437 for (auto& q : req_queues) {
438 fa_output_objs.push_back(&q->config_object());
439 }
440
441 frag_aggr.set_objs("inputs", { &fa_net_obj, &frag_queue_obj });
442 frag_aggr.set_objs("outputs", fa_output_objs);
443
444 modules.push_back(obj_fac.get_dal<confmodel::DaqModule>(frag_aggr.UID()));
445
446 return modules;
447}
448
449} // namespace appmodel
450} // namespace dunedaq
#define ERS_HERE
conffwk::ConfigObject create_queue_sid_obj(const QueueDescriptor *qdesc, uint32_t src_id) const
conffwk::ConfigObject create_net_obj(const NetworkConnectionDescriptor *ndesc, std::string uid) const
Helper function that gets a network connection config.
const T * get_dal(std::string uid) const
conffwk::ConfigObject create_queue_obj(const QueueDescriptor *qdesc, std::string uid="") const
conffwk::ConfigObject create(const std::string &class_name, const std::string &id) const
const dunedaq::appmodel::DataHandlerConf * get_tp_handler() const
Get "tp_handler" relationship value.
bool get_ta_generation_enabled() const
Get "ta_generation_enabled" attribute value.
const std::vector< const dunedaq::appmodel::SourceIDConf * > & get_tp_source_ids() const
Get "tp_source_ids" relationship value.
const dunedaq::appmodel::DataHandlerConf * get_link_handler() const
Get "link_handler" relationship value.
virtual std::vector< const dunedaq::confmodel::DaqModule * > generate_modules(const confmodel::Session *) const override
const dunedaq::appmodel::DataReaderConf * get_data_reader() const
Get "data_reader" relationship value.
bool get_tp_generation_enabled() const
Get "tp_generation_enabled" attribute value.
const std::vector< const dunedaq::appmodel::NetworkConnectionRule * > & get_network_rules() const
Get "network_rules" relationship value.
const std::vector< const dunedaq::appmodel::QueueConnectionRule * > & get_queue_rules() const
Get "queue_rules" relationship value.
void set_by_val(const std::string &name, T value)
Set attribute value.
void set_objs(const std::string &name, const std::vector< const ConfigObject * > &o, bool skip_non_null_check=false)
Set relationship multi-value.
const std::string & UID() const noexcept
Return object identity.
void set_obj(const std::string &name, const ConfigObject *o, bool skip_non_null_check=false)
Set relationship single-value.
const std::string & UID() const noexcept
std::vector< const confmodel::DetDataSender * > get_senders() const
const std::vector< const dunedaq::confmodel::ResourceBase * > & get_contains() const
Get "contains" relationship value. A resource set is a container of resources to easily implement gro...
conffwk entry point
#define TLOG_DEBUG(lvl,...)
Definition Logging.hpp:112
Including Qt Headers.