74std::vector<const confmodel::Resource*>
82 TLOG_DEBUG(6) <<
"Generating modules for application " << this->
UID();
93 if (reader_conf == 0) {
94 throw(BadConf(
ERS_HERE,
"No DataReaderModule configuration given"));
96 std::string reader_class = reader_conf->get_template_for();
101 auto dlh_class = dlh_conf->get_template_for();
105 throw(BadConf(
ERS_HERE,
"TP generation is enabled but there is no TP data handler configuration"));
108 std::string tph_class =
"";
110 tph_class = tph_conf->get_template_for();
116 const QueueDescriptor* dlh_reqinput_qdesc =
nullptr;
117 const QueueDescriptor* tp_input_qdesc =
nullptr;
119 const QueueDescriptor* fa_output_qdesc =
nullptr;
122 auto destination_class = rule->get_destination_class();
123 auto data_type = rule->get_descriptor()->get_data_type();
125 if (destination_class ==
"DataHandlerModule" || destination_class == dlh_class || destination_class == tph_class) {
126 if (data_type ==
"DataRequest") {
127 dlh_reqinput_qdesc = rule->get_descriptor();
129 tp_input_qdesc = rule->get_descriptor();
131 }
else if (destination_class ==
"FragmentAggregatorModule") {
132 fa_output_qdesc = rule->get_descriptor();
136 if (dlh_reqinput_qdesc ==
nullptr) {
137 throw(BadConf(
ERS_HERE,
"No data link handler request input queue descriptor given"));
143 const NetworkConnectionDescriptor* fa_net_desc =
nullptr;
144 const NetworkConnectionDescriptor* tp_net_desc =
nullptr;
145 const NetworkConnectionDescriptor* ta_net_desc =
nullptr;
146 const NetworkConnectionDescriptor* ts_net_desc =
nullptr;
148 auto endpoint_class = rule->get_endpoint_class();
149 auto data_type = rule->get_descriptor()->get_data_type();
151 if (endpoint_class ==
"FragmentAggregatorModule") {
152 fa_net_desc = rule->get_descriptor();
153 }
else if (data_type ==
"TPSet") {
154 tp_net_desc = rule->get_descriptor();
155 }
else if (data_type ==
"TriggerActivity") {
156 ta_net_desc = rule->get_descriptor();
157 }
else if (data_type ==
"TimeSync") {
158 ts_net_desc = rule->get_descriptor();
162 if (fa_net_desc ==
nullptr) {
163 throw(BadConf(
ERS_HERE,
"No Fragment Aggregator network descriptor given"));
165 if (ts_net_desc ==
nullptr && dlh_conf->get_generate_timesync()) {
166 throw(BadConf(
ERS_HERE,
"No Time Sync network descriptor given but time sync generation is enabled"));
174 if (raw_data_callback_desc ==
nullptr) {
175 throw(BadConf(
ERS_HERE,
"No Raw Data Callback descriptor given"));
180 if (fa_output_qdesc ==
nullptr) {
181 throw(BadConf(
ERS_HERE,
"No fragment output queue descriptor given"));
183 std::vector<const confmodel::Connection*> req_queues;
190 std::vector<const confmodel::DaqModule*> modules;
196 std::vector<const confmodel::DetectorStream*> all_enabled_det_streams;
197 std::map<uint32_t, const appmodel::DataMoveCallbackConf*> callback_confs_by_sid;
200 uint16_t conn_idx = 0;
203 if (helper->is_disabled(d2d_conn)) {
204 TLOG_DEBUG(7) <<
"Ignoring disabled DetectorToDaqConnection " << d2d_conn->UID();
208 TLOG_DEBUG(6) <<
"Processing DetectorToDaqConnection " << d2d_conn->UID();
212 if (d2d_conn->senders().empty()) {
213 throw(BadConf(
ERS_HERE,
"DetectorToDaqConnection does not contain senders"));
215 if (d2d_conn->receiver() ==
nullptr) {
216 throw(BadConf(
ERS_HERE,
"DetectorToDaqConnection does not contain a receiver"));
220 auto det_senders = d2d_conn->senders();
221 auto det_receiver = d2d_conn->receiver();
223 std::vector<const confmodel::DetectorStream*> enabled_det_streams;
225 for (
auto stream : d2d_conn->streams()) {
228 if (helper->is_disabled(stream)) {
229 TLOG_DEBUG(7) <<
"Ignoring disabled DetectorStream " << stream->UID();
233 all_enabled_det_streams.push_back(stream);
234 enabled_det_streams.push_back(stream);
240 if (reader_class ==
"DPDKReaderModule" || reader_class ==
"SocketReaderModule") {
241 if (!d2d_conn->castable(
"NetworkDetectorToDaqConnection")) {
242 throw(BadConf(
ERS_HERE, fmt::format(
"{} requires NetworkDetectorToDaqConnection, found {} of class {}", reader_class, d2d_conn->UID(), d2d_conn->class_name())));
244 if ((reader_class ==
"DPDKReaderModule" && !det_receiver->cast<appmodel::DPDKReceiver>()) ||
245 (reader_class ==
"SocketReaderModule" && !det_receiver->cast<appmodel::SocketReceiver>())) {
246 throw(BadConf(
ERS_HERE, fmt::format(
"{} requires NWDetDataReceiver, found {} of class {}", reader_class, det_receiver->UID(), det_receiver->class_name())));
249 else if (reader_class ==
"FelixReaderModule") {
250 if (!d2d_conn->castable(
"FelixDetectorToDaqConnection")) {
251 throw(BadConf(
ERS_HERE, fmt::format(
"{} requires FelixDetectorToDaqConnection, found {} of class {}", reader_class, d2d_conn->UID(), d2d_conn->class_name())));
253 if (!det_receiver->cast<appmodel::FelixDataReceiver>()) {
254 throw(BadConf(
ERS_HERE, fmt::format(
"FelixReaderModule requires FelixDataReceiver, found {} of class {}", det_receiver->UID(), det_receiver->class_name())));
270 std::string reader_uid(fmt::format(
"datareader-{}-{}", this->
UID(), std::to_string(conn_idx++)));
271 TLOG_DEBUG(6) << fmt::format(
"creating OKS configuration object for Data reader class {} with id {}", reader_class, reader_uid);
272 auto reader_obj = obj_fac.create(reader_class, reader_uid);
275 reader_obj.set_obj(
"configuration", &reader_conf->config_object());
276 reader_obj.set_objs(
"connections", {&d2d_conn->config_object()});
279 std::vector<const conffwk::ConfigObject*> raw_data_callback_objs;
282 for (
auto ds : enabled_det_streams) {
283 conffwk::ConfigObject callback_obj = obj_fac.create_callback_sid_obj(raw_data_callback_desc, ds->get_source_id());
284 const auto* callback_conf = obj_fac.get_dal<DataMoveCallbackConf>(callback_obj.UID());
285 raw_data_callback_objs.push_back(&callback_conf->config_object());
286 callback_confs_by_sid[ds->get_source_id()] = callback_conf;
289 reader_obj.set_objs(
"raw_data_callbacks", raw_data_callback_objs);
291 modules.push_back(obj_fac.get_dal<confmodel::DaqModule>(reader_obj.UID()));
299 std::vector<const confmodel::Connection*> tp_queues;
301 if (tp_input_qdesc ==
nullptr) {
302 throw(BadConf(
ERS_HERE,
"TP generation is enabled but no TP input queue descriptor given"));
304 if (tp_net_desc ==
nullptr) {
305 throw(BadConf(
ERS_HERE,
"TP generation is enabled but no TPSet network descriptor given"));
307 if (ta_net_desc ==
nullptr) {
308 throw(BadConf(
ERS_HERE,
"TP generation is enabled but no TriggerActivity network descriptor given"));
311 auto tph_conf_obj = tph_conf->config_object();
314 for (
auto sid : tpsrc_ids) {
317 std::string tp_uid(
"tphandler-" + std::to_string(sid->get_sid()));
318 auto tph_obj = obj_fac.create(tph_class, tp_uid);
319 tph_obj.set_by_val<uint32_t>(
"source_id", sid->get_sid());
320 tph_obj.set_by_val<uint32_t>(
"detector_id", 1);
322 tph_obj.set_obj(
"module_configuration", &tph_conf_obj);
325 tp_queue_obj = obj_fac.create_queue_sid_obj(tp_input_qdesc, sid->get_sid());
326 tp_queue_obj.set_by_val<uint32_t>(
"recv_timeout_ms", 50);
327 tp_queue_obj.set_by_val<uint32_t>(
"send_timeout_ms", 1);
329 tp_queues.push_back(obj_fac.get_dal<confmodel::Connection>(tp_queue_obj.UID()));
331 tpreq_queue_obj = obj_fac.create_queue_sid_obj(dlh_reqinput_qdesc, sid->get_sid());
332 req_queues.push_back(obj_fac.get_dal<confmodel::Connection>(tpreq_queue_obj.UID()));
341 tph_obj.set_objs(
"inputs", { &tp_queue_obj, &tpreq_queue_obj });
342 tph_obj.set_objs(
"outputs", { &tp_net_obj, &ta_net_obj, &frag_queue_obj });
344 modules.push_back(obj_fac.get_dal<confmodel::DaqModule>(tph_obj.UID()));
349 std::vector<const conffwk::ConfigObject*> tp_queue_objs;
350 for (
auto q : tp_queues) {
351 tp_queue_objs.push_back(&q->config_object());
359 auto emulation_mode = reader_conf->get_emulation_mode();
360 for (
auto ds : all_enabled_det_streams) {
362 uint32_t sid = ds->get_source_id();
363 TLOG_DEBUG(6) << fmt::format(
"Processing stream {}, id {}, det id {}", ds->UID(), ds->get_source_id(), ds->get_geo_id()->get_detector_id());
364 std::string uid(fmt::format(
"DLH-{}", sid));
365 TLOG_DEBUG(6) << fmt::format(
"creating OKS configuration object for Data Link Handler class {}, if {}", dlh_class, sid);
366 auto dlh_obj = obj_fac.create(dlh_class, uid);
367 dlh_obj.set_by_val<uint32_t>(
"source_id", sid);
368 dlh_obj.set_by_val<uint32_t>(
"detector_id", ds->get_geo_id()->get_detector_id());
370 dlh_obj.set_by_val<
bool>(
"emulation_mode", emulation_mode);
371 dlh_obj.set_obj(
"geo_id", &ds->get_geo_id()->config_object());
372 dlh_obj.set_obj(
"module_configuration", &dlh_conf->config_object());
373 dlh_obj.set_obj(
"raw_data_callback", &callback_confs_by_sid[sid]->
config_object());
375 std::vector<const conffwk::ConfigObject*> dlh_ins, dlh_outs;
382 req_queues.push_back(obj_fac.get_dal<confmodel::Connection>(req_queue_obj.UID()));
383 dlh_ins.push_back(&req_queue_obj);
384 dlh_outs.push_back(&frag_queue_obj);
388 if (dlh_conf->get_generate_timesync()) {
391 dlh_outs.push_back(&ts_net_obj);
394 for (
auto tpq : tp_queue_objs) {
395 dlh_outs.push_back(tpq);
397 dlh_obj.set_objs(
"inputs", dlh_ins);
398 dlh_obj.set_objs(
"outputs", dlh_outs);
400 modules.push_back(obj_fac.get_dal<confmodel::DaqModule>(dlh_obj.UID()));
405 if (aggregator_conf == 0) {
406 throw(BadConf(
ERS_HERE,
"No FragmentAggregatorModule configuration given"));
408 std::string faUid(
"fragmentaggregator-" +
UID());
410 TLOG_DEBUG(7) <<
"creating OKS configuration object for Fragment Aggregator class ";
411 auto frag_aggr = obj_fac.create(
"FragmentAggregatorModule", faUid);
416 std::vector<conffwk::ConfigObject> fragOutObjs;
417 for (
auto [uid, descriptor]:
418 helper->get_netdescriptors(
"Fragment",
"DFApplication")) {
419 std::string dreqNetUid(descriptor->get_uid_base() + uid);
420 auto frag_conn = obj_fac.create(
"NetworkConnection", dreqNetUid);
422 frag_conn.set_by_val<std::string>(
"data_type", descriptor->get_data_type());
423 frag_conn.set_by_val<std::string>(
"connection_type", descriptor->get_connection_type());
425 frag_conn.set_by_val<
int>(
"capacity", all_enabled_det_streams.size() * 2);
427 auto serviceObj = descriptor->get_associated_service()->config_object();
428 frag_conn.set_obj(
"associated_service", &serviceObj);
429 fragOutObjs.push_back(frag_conn);
433 std::vector<const conffwk::ConfigObject*> fa_output_objs;
434 for (
auto& fNet : fragOutObjs) {
435 fa_output_objs.push_back(&fNet);
438 for (
auto& q : req_queues) {
439 fa_output_objs.push_back(&q->config_object());
442 frag_aggr.set_obj(
"configuration", &aggregator_conf->config_object());
443 frag_aggr.set_objs(
"inputs", { &fa_net_obj, &frag_queue_obj });
444 frag_aggr.set_objs(
"outputs", fa_output_objs);
445 modules.push_back(obj_fac.get_dal<confmodel::DaqModule>(frag_aggr.UID()));
447 obj_fac.update_modules(modules);
const dunedaq::appmodel::DataHandlerConf * get_tp_handler() const
Get "tp_handler" relationship value.
const std::vector< const dunedaq::confmodel::DetectorToDaqConnection * > & get_detector_connections() const
Get "detector_connections" relationship value. The list of detector channels to be read out by this r...
bool get_ta_generation_enabled() const
Get "ta_generation_enabled" attribute value.
const dunedaq::appmodel::FragmentAggregatorConf * get_fragment_aggregator() const
Get "fragment_aggregator" relationship value.
const std::vector< const dunedaq::appmodel::SourceIDConf * > & get_tp_source_ids() const
Get "tp_source_ids" relationship value.
const dunedaq::appmodel::DataHandlerConf * get_link_handler() const
Get "link_handler" relationship value.
const dunedaq::appmodel::DataMoveCallbackDescriptor * get_callback_desc() const
Get "callback_desc" relationship value.
const dunedaq::appmodel::DataReaderConf * get_data_reader() const
Get "data_reader" relationship value.
void generate_modules(std::shared_ptr< appmodel::ConfigurationHelper >) const override
virtual std::vector< const Resource * > contained_resources() const override
bool get_tp_generation_enabled() const
Get "tp_generation_enabled" attribute value.
const std::vector< const dunedaq::appmodel::NetworkConnectionRule * > & get_network_rules() const
Get "network_rules" relationship value.
const std::vector< const dunedaq::appmodel::QueueConnectionRule * > & get_queue_rules() const
Get "queue_rules" relationship value.
const ConfigObject & config_object() const
const std::string & UID() const noexcept
std::vector< const dunedaq::confmodel::Resource * > to_resources(const std::vector< T * > &vector_of_children)
#define TLOG_DEBUG(lvl,...)