DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
ReadoutApplication.cpp
Go to the documentation of this file.
1
20#include "confmodel/Session.hpp"
21
25
28
31
33#include "confmodel/GeoId.hpp"
36#include "confmodel/Service.hpp"
37
43
53
54
55
57
58#include "logging/Logging.hpp"
59#include <fmt/core.h>
60
61#include <string>
62#include <vector>
63
64// using namespace dunedaq;
65// using namespace dunedaq::appmodel;
66
67namespace dunedaq {
68namespace appmodel {
69
70//-----------------------------------------------------------------------------
71
72std::vector<const confmodel::Resource*>
76
77void
79{
80
81 TLOG_DEBUG(6) << "Generating modules for application " << this->UID();
82
83 ConfigObjectFactory obj_fac(this);
84 // conffwk::Configuration& confdb = this->configuration();
85
86 //
87 // Extract basic configuration objects
88 //
89
90 // Data reader
91 auto reader_conf = get_data_reader();
92 if (reader_conf == 0) {
93 throw(BadConf(ERS_HERE, "No DataReaderModule configuration given"));
94 }
95 std::string reader_class = reader_conf->get_template_for();
96
97 // Link handler
98 auto dlh_conf = get_link_handler();
99 // What is template for?
100 auto dlh_class = dlh_conf->get_template_for();
101
102 auto tph_conf = get_tp_handler();
103 if (tph_conf==nullptr && get_tp_generation_enabled()) {
104 throw(BadConf(ERS_HERE, "TP generation is enabled but there is no TP data handler configuration"));
105 }
106
107 std::string tph_class = "";
108 if (tph_conf != nullptr && get_tp_generation_enabled()) {
109 tph_class = tph_conf->get_template_for();
110 }
111
112 //
113 // Process the queue rules looking for inputs to our DL/TP handler modules
114 //
115 const QueueDescriptor* dlh_input_qdesc = nullptr;
116 const QueueDescriptor* dlh_reqinput_qdesc = nullptr;
117 const QueueDescriptor* tp_input_qdesc = nullptr;
118 // const QueueDescriptor* tpReqInputQDesc = nullptr;
119 const QueueDescriptor* fa_output_qdesc = nullptr;
120
121 for (auto rule : get_queue_rules()) {
122 auto destination_class = rule->get_destination_class();
123 auto data_type = rule->get_descriptor()->get_data_type();
124 // Why datahander here? It is the base class for several DataHandler types (e.g. FDDataHandlerModule, SNBDataHandlerModule)
125 if (destination_class == "DataHandlerModule" || destination_class == dlh_class || destination_class == tph_class) {
126 if (data_type == "DataRequest") {
127 dlh_reqinput_qdesc = rule->get_descriptor();
128 } else if ((data_type == "TriggerPrimitive" || data_type == "TriggerPrimitiveVector") && get_tp_generation_enabled()) {
129 tp_input_qdesc = rule->get_descriptor();
130 } else {
131 dlh_input_qdesc = rule->get_descriptor();
132 }
133 } else if (destination_class == "FragmentAggregatorModule") {
134 fa_output_qdesc = rule->get_descriptor();
135 }
136 }
137
138 if (dlh_input_qdesc == nullptr) {
139 throw(BadConf(ERS_HERE, "No data link handler input queue descriptor given"));
140 }
141 if (dlh_reqinput_qdesc == nullptr) {
142 throw(BadConf(ERS_HERE, "No data link handler request input queue descriptor given"));
143 }
144
145 //
146 // Process the network rules looking for the Fragment Aggregator and TP handler data reuest inputs
147 //
148 const NetworkConnectionDescriptor* fa_net_desc = nullptr;
149 const NetworkConnectionDescriptor* tp_net_desc = nullptr;
150 const NetworkConnectionDescriptor* ta_net_desc = nullptr;
151 const NetworkConnectionDescriptor* ts_net_desc = nullptr;
152 for (auto rule : get_network_rules()) {
153 auto endpoint_class = rule->get_endpoint_class();
154 auto data_type = rule->get_descriptor()->get_data_type();
155
156 if (endpoint_class == "FragmentAggregatorModule") {
157 fa_net_desc = rule->get_descriptor();
158 } else if (data_type == "TPSet") {
159 tp_net_desc = rule->get_descriptor();
160 } else if (data_type == "TriggerActivity") {
161 ta_net_desc = rule->get_descriptor();
162 } else if (data_type == "TimeSync") {
163 ts_net_desc = rule->get_descriptor();
164 }
165 }
166
167 if (fa_net_desc == nullptr) {
168 throw(BadConf(ERS_HERE, "No Fragment Aggregator network descriptor given"));
169 }
170 if (ts_net_desc == nullptr && dlh_conf->get_generate_timesync()) {
171 throw(BadConf(ERS_HERE, "No Time Sync network descriptor given but time sync generation is enabled"));
172 }
173
174 // Create here the Queue on which all data fragments are forwarded to the fragment aggregator
175 // and a container for the queues of data request to TP handler and DLH
176 if (fa_output_qdesc == nullptr) {
177 throw(BadConf(ERS_HERE, "No fragment output queue descriptor given"));
178 }
179 std::vector<const confmodel::Connection*> req_queues;
180 conffwk::ConfigObject frag_queue_obj = obj_fac.create_queue_obj(fa_output_qdesc);
181
182 //
183 // Scan Detector 2 DAQ connections to extract sender, receiver and stream information
184 //
185
186 std::vector<const confmodel::DaqModule*> modules;
187
188 // Loop over the detector to daq connections and generate one data reader per connection
189 // and the cooresponding datalink handlers
190
191 // Collect all streams
192 std::vector<const confmodel::DetectorStream*> all_enabled_det_streams;
193 std::map<uint32_t, const confmodel::Connection*> data_queues_by_sid;
194
195 // std::vector<const conffwk::ConfigObject*> d2d_conn_objs;
196 uint16_t conn_idx = 0;
197
198 for (auto d2d_conn : get_detector_connections()) {
199 if (d2d_conn->is_disabled(*session)) {
200 TLOG_DEBUG(7) << "Ignoring disabled DetectorToDaqConnection " << d2d_conn->UID();
201 continue;
202 }
203
204 TLOG_DEBUG(6) << "Processing DetectorToDaqConnection " << d2d_conn->UID();
205
206 // Are these tests necessary? Schema does not allow 0 cardinality
207 // for these relationships!! TODO
208 if (d2d_conn->senders().empty()) {
209 throw(BadConf(ERS_HERE, "DetectorToDaqConnection does not contain senders"));
210 }
211 if (d2d_conn->receiver() == nullptr) {
212 throw(BadConf(ERS_HERE, "DetectorToDaqConnection does not contain a receiver"));
213 }
214
215 // Find senders and receiver
216 auto det_senders = d2d_conn->senders();
217 auto det_receiver = d2d_conn->receiver();
218
219 std::vector<const confmodel::DetectorStream*> enabled_det_streams;
220 // Loop over streams
221 for (auto stream : d2d_conn->streams()) {
222
223 // Are we sure?
224 if (stream->is_disabled(*session)) {
225 TLOG_DEBUG(7) << "Ignoring disabled DetectorStream " << stream->UID();
226 continue;
227 }
228
229 all_enabled_det_streams.push_back(stream);
230 enabled_det_streams.push_back(stream);
231 }
232
233
234 // Here I want to resolve the type of connection (network, felix, or?)
235 // Rules of engagement: if the receiver interface is network or felix, the receivers should be castable to the counterpart
236 if (reader_class == "DPDKReaderModule" || reader_class == "SocketReaderModule") {
237 if (!d2d_conn->castable("NetworkDetectorToDaqConnection")) {
238 throw(BadConf(ERS_HERE, fmt::format("{} requires NetworkDetectorToDaqConnection, found {} of class {}", reader_class, d2d_conn->UID(), d2d_conn->class_name())));
239 }
240 if ((reader_class == "DPDKReaderModule" && !det_receiver->cast<appmodel::DPDKReceiver>()) ||
241 (reader_class == "SocketReaderModule" && !det_receiver->cast<appmodel::SocketReceiver>())) {
242 throw(BadConf(ERS_HERE, fmt::format("{} requires NWDetDataReceiver, found {} of class {}", reader_class, det_receiver->UID(), det_receiver->class_name())));
243 }
244 }
245 else if (reader_class == "FelixReaderModule") {
246 if (!d2d_conn->castable("FelixDetectorToDaqConnection")) {
247 throw(BadConf(ERS_HERE, fmt::format("{} requires FelixDetectorToDaqConnection, found {} of class {}", reader_class, d2d_conn->UID(), d2d_conn->class_name())));
248 }
249 if (!det_receiver->cast<appmodel::FelixDataReceiver>()) {
250 throw(BadConf(ERS_HERE, fmt::format("FelixReaderModule requires FelixDataReceiver, found {} of class {}", det_receiver->UID(), det_receiver->class_name())));
251 }
252 }
253 // }
254
255 //-----------------------------------------------------------------
256 //
257 // Create DataReaderModule object
258 //
259
260 //
261 // Instantiate DataReaderModule of type DPDKReaderModule
262 //
263
264 // Create the Data reader object
265
266 std::string reader_uid(fmt::format("datareader-{}-{}", this->UID(), std::to_string(conn_idx++)));
267 TLOG_DEBUG(6) << fmt::format("creating OKS configuration object for Data reader class {} with id {}", reader_class, reader_uid);
268 auto reader_obj = obj_fac.create(reader_class, reader_uid);
269
270 // Populate configuration and interfaces (leave output queues for later)
271 reader_obj.set_obj("configuration", &reader_conf->config_object());
272 reader_obj.set_objs("connections", {&d2d_conn->config_object()});
273
274 // Create the raw data queues
275 std::vector<const conffwk::ConfigObject*> data_queue_objs;
276 // keep a map for convenience
277
278 // Create data queues
279 for (auto ds : enabled_det_streams) {
280 conffwk::ConfigObject queue_obj = obj_fac.create_queue_sid_obj(dlh_input_qdesc, ds);
281 const auto* data_queue = obj_fac.get_dal<confmodel::Connection>(queue_obj.UID());
282 data_queue_objs.push_back(&data_queue->config_object());
283 data_queues_by_sid[ds->get_source_id()] = data_queue;
284 }
285
286 reader_obj.set_objs("outputs", data_queue_objs);
287
288 modules.push_back(obj_fac.get_dal<confmodel::DaqModule>(reader_obj.UID()));
289 }
290
291
292 //-----------------------------------------------------------------
293 //
294 // Prepare the tp handlers and related queues
295 //
296 std::vector<const confmodel::Connection*> tp_queues;
298 if (tp_input_qdesc == nullptr) {
299 throw(BadConf(ERS_HERE, "TP generation is enabled but no TP input queue descriptor given"));
300 }
301 if (tp_net_desc == nullptr) {
302 throw(BadConf(ERS_HERE, "TP generation is enabled but no TPSet network descriptor given"));
303 }
304 if (ta_net_desc == nullptr) {
305 throw(BadConf(ERS_HERE, "TP generation is enabled but no TriggerActivity network descriptor given"));
306 }
307 // Create TP handler object
308 auto tph_conf_obj = tph_conf->config_object();
309 auto tpsrc_ids = get_tp_source_ids();
310
311 for (auto sid : tpsrc_ids) {
312 conffwk::ConfigObject tp_queue_obj;
313 conffwk::ConfigObject tpreq_queue_obj;
314 std::string tp_uid("tphandler-" + std::to_string(sid->get_sid()));
315 auto tph_obj = obj_fac.create(tph_class, tp_uid);
316 tph_obj.set_by_val<uint32_t>("source_id", sid->get_sid());
317 tph_obj.set_by_val<uint32_t>("detector_id", 1); // 1 == kDAQ
318 tph_obj.set_by_val<bool>("post_processing_enabled", get_ta_generation_enabled());
319 tph_obj.set_obj("module_configuration", &tph_conf_obj);
320
321 // Create the TPs aggregator queue (from RawData Handlers to TP handlers)
322 tp_queue_obj = obj_fac.create_queue_sid_obj(tp_input_qdesc, sid->get_sid());
323 tp_queue_obj.set_by_val<uint32_t>("recv_timeout_ms", 50);
324 tp_queue_obj.set_by_val<uint32_t>("send_timeout_ms", 1);
325
326 tp_queues.push_back(obj_fac.get_dal<confmodel::Connection>(tp_queue_obj.UID()));
327 // Create tp data requests queue from Fragment Aggregator
328 tpreq_queue_obj = obj_fac.create_queue_sid_obj(dlh_reqinput_qdesc, sid->get_sid());
329 req_queues.push_back(obj_fac.get_dal<confmodel::Connection>(tpreq_queue_obj.UID()));
330
331 // Create the tp(set) publishing service
332 conffwk::ConfigObject tp_net_obj = obj_fac.create_net_obj(tp_net_desc, tp_uid);
333
334 // Create the ta(set) publishing service
335 conffwk::ConfigObject ta_net_obj = obj_fac.create_net_obj(ta_net_desc, tp_uid);
336
337 // Register queues with tp hankder
338 tph_obj.set_objs("inputs", { &tp_queue_obj, &tpreq_queue_obj });
339 tph_obj.set_objs("outputs", { &tp_net_obj, &ta_net_obj, &frag_queue_obj });
340
341 modules.push_back(obj_fac.get_dal<confmodel::DaqModule>(tph_obj.UID()));
342 }
343 }
344
345 // Add output queueus of tps
346 std::vector<const conffwk::ConfigObject*> tp_queue_objs;
347 for (auto q : tp_queues) {
348 tp_queue_objs.push_back(&q->config_object());
349 }
350
351 //-----------------------------------------------------------------
352 //
353 // Create datalink handlers
354 //
355 // Recover the emulation flag
356 auto emulation_mode = reader_conf->get_emulation_mode();
357 for (auto ds : all_enabled_det_streams) {
358
359 uint32_t sid = ds->get_source_id();
360 TLOG_DEBUG(6) << fmt::format("Processing stream {}, id {}, det id {}", ds->UID(), ds->get_source_id(), ds->get_geo_id()->get_detector_id());
361 std::string uid(fmt::format("DLH-{}", sid));
362 TLOG_DEBUG(6) << fmt::format("creating OKS configuration object for Data Link Handler class {}, if {}", dlh_class, sid);
363 auto dlh_obj = obj_fac.create(dlh_class, uid);
364 dlh_obj.set_by_val<uint32_t>("source_id", sid);
365 dlh_obj.set_by_val<uint32_t>("detector_id", ds->get_geo_id()->get_detector_id());
366 dlh_obj.set_by_val<bool>("post_processing_enabled", get_tp_generation_enabled());
367 dlh_obj.set_by_val<bool>("emulation_mode", emulation_mode);
368 dlh_obj.set_obj("geo_id", &ds->get_geo_id()->config_object());
369 dlh_obj.set_obj("module_configuration", &dlh_conf->config_object());
370
371 std::vector<const conffwk::ConfigObject*> dlh_ins, dlh_outs;
372
373 // Add datalink-handler queue to the inputs
374 dlh_ins.push_back(&data_queues_by_sid.at(sid)->config_object());
375
376 // Create request queue
377 conffwk::ConfigObject req_queue_obj = obj_fac.create_queue_sid_obj(dlh_reqinput_qdesc, ds);
378
379
380 // Add the requessts queue dal pointer to the outputs of the FragmentAggregatorModule
381 req_queues.push_back(obj_fac.get_dal<confmodel::Connection>(req_queue_obj.UID()));
382 dlh_ins.push_back(&req_queue_obj);
383 dlh_outs.push_back(&frag_queue_obj);
384
385
386 // Time Sync network connection
387 if (dlh_conf->get_generate_timesync()) {
388 // Add timestamp endpoint
389 conffwk::ConfigObject ts_net_obj = obj_fac.create_net_obj(ts_net_desc, std::to_string(sid));
390 dlh_outs.push_back(&ts_net_obj);
391 }
392
393 for (auto tpq : tp_queue_objs) {
394 dlh_outs.push_back(tpq);
395 }
396 dlh_obj.set_objs("inputs", dlh_ins);
397 dlh_obj.set_objs("outputs", dlh_outs);
398
399 modules.push_back(obj_fac.get_dal<confmodel::DaqModule>(dlh_obj.UID()));
400 }
401
402 // Finally create Fragment Aggregator
403 auto aggregator_conf = get_fragment_aggregator();
404 if (aggregator_conf == 0) {
405 throw(BadConf(ERS_HERE, "No FragmentAggregatorModule configuration given"));
406 }
407 std::string faUid("fragmentaggregator-" + UID());
408 // conffwk::ConfigObject frag_aggr;
409 TLOG_DEBUG(7) << "creating OKS configuration object for Fragment Aggregator class ";
410 auto frag_aggr = obj_fac.create("FragmentAggregatorModule", faUid);
411 conffwk::ConfigObject fa_net_obj = obj_fac.create_net_obj(fa_net_desc);
412
413 // Process special Network rules!
414 // Looking for Fragment rules from DFAppplications in current Session
415 auto sessionApps = session->enabled_applications();
416 std::vector<conffwk::ConfigObject> fragOutObjs;
417 for (auto app : sessionApps) {
418 auto dfapp = app->cast<appmodel::DFApplication>();
419 if (dfapp == nullptr)
420 continue;
421
422 auto dfNRules = dfapp->get_network_rules();
423 for (auto rule : dfNRules) {
424 auto descriptor = rule->get_descriptor();
425 auto data_type = descriptor->get_data_type();
426 if (data_type == "Fragment") {
427 std::string dreqNetUid(descriptor->get_uid_base() + dfapp->UID());
428 // conffwk::ConfigObject frag_conn;
429 // confdb.create(dbfile, "NetworkConnection", dreqNetUid, frag_conn);
430 auto frag_conn = obj_fac.create("NetworkConnection", dreqNetUid);
431
432 frag_conn.set_by_val<std::string>("data_type", descriptor->get_data_type());
433
434 // Override capacity, set to 2x expected number of Fragments
435 frag_conn.set_by_val<int>("capacity", all_enabled_det_streams.size() * 2);
436 frag_conn.set_by_val<std::string>("connection_type", descriptor->get_connection_type());
437
438 auto serviceObj = descriptor->get_associated_service()->config_object();
439 frag_conn.set_obj("associated_service", &serviceObj);
440 fragOutObjs.push_back(frag_conn);
441 } // If network rule has TriggerDecision type of data
442 } // Loop over Apps network rules
443 } // loop over Session specific Apps
444
445 // Add output queueus of data requests and Fragments
446 std::vector<const conffwk::ConfigObject*> fa_output_objs;
447 for (auto& fNet : fragOutObjs) {
448 fa_output_objs.push_back(&fNet);
449 }
450
451 for (auto& q : req_queues) {
452 fa_output_objs.push_back(&q->config_object());
453 }
454
455 frag_aggr.set_obj("configuration", &aggregator_conf->config_object());
456 frag_aggr.set_objs("inputs", { &fa_net_obj, &frag_queue_obj });
457 frag_aggr.set_objs("outputs", fa_output_objs);
458 modules.push_back(obj_fac.get_dal<confmodel::DaqModule>(frag_aggr.UID()));
459
460 obj_fac.update_modules(modules);
461}
462
463} // namespace appmodel
464} // namespace dunedaq
#define ERS_HERE
void update_modules(const std::vector< const confmodel::DaqModule * > &modules)
conffwk::ConfigObject create_queue_sid_obj(const QueueDescriptor *qdesc, uint32_t src_id) const
conffwk::ConfigObject create_net_obj(const NetworkConnectionDescriptor *ndesc, std::string uid) const
Helper function that gets a network connection config.
const T * get_dal(std::string uid) const
conffwk::ConfigObject create_queue_obj(const QueueDescriptor *qdesc, std::string uid="") const
conffwk::ConfigObject create(const std::string &class_name, const std::string &id) const
const dunedaq::appmodel::DataHandlerConf * get_tp_handler() const
Get "tp_handler" relationship value.
const std::vector< const dunedaq::confmodel::DetectorToDaqConnection * > & get_detector_connections() const
Get "detector_connections" relationship value. The list of detector channels to be read out by this r...
bool get_ta_generation_enabled() const
Get "ta_generation_enabled" attribute value.
const dunedaq::appmodel::FragmentAggregatorConf * get_fragment_aggregator() const
Get "fragment_aggregator" relationship value.
const std::vector< const dunedaq::appmodel::SourceIDConf * > & get_tp_source_ids() const
Get "tp_source_ids" relationship value.
const dunedaq::appmodel::DataHandlerConf * get_link_handler() const
Get "link_handler" relationship value.
const dunedaq::appmodel::DataReaderConf * get_data_reader() const
Get "data_reader" relationship value.
virtual std::vector< const Resource * > contained_resources() const override
bool get_tp_generation_enabled() const
Get "tp_generation_enabled" attribute value.
void generate_modules(const confmodel::Session *) const override
const std::vector< const dunedaq::appmodel::NetworkConnectionRule * > & get_network_rules() const
Get "network_rules" relationship value.
const std::vector< const dunedaq::appmodel::QueueConnectionRule * > & get_queue_rules() const
Get "queue_rules" relationship value.
void set_by_val(const std::string &name, T value)
Set attribute value.
void set_objs(const std::string &name, const std::vector< const ConfigObject * > &o, bool skip_non_null_check=false)
Set relationship multi-value.
const std::string & UID() const noexcept
Return object identity.
void set_obj(const std::string &name, const ConfigObject *o, bool skip_non_null_check=false)
Set relationship single-value.
const std::string & UID() const noexcept
conffwk entry point
std::vector< const dunedaq::confmodel::Resource * > to_resources(const std::vector< T * > &vector_of_children)
#define TLOG_DEBUG(lvl,...)
Definition Logging.hpp:112
The DUNE-DAQ namespace.