76 TLOG_DEBUG(6) <<
"Generating modules for application " << this->
UID();
86 if (reader_conf == 0) {
87 throw(BadConf(
ERS_HERE,
"No DataReaderModule configuration given"));
89 std::string reader_class = reader_conf->get_template_for();
94 auto dlh_class = dlh_conf->get_template_for();
98 throw(BadConf(
ERS_HERE,
"TP generation is enabled but there is no TP data handler configuration"));
101 std::string tph_class =
"";
103 tph_class = tph_conf->get_template_for();
115 auto destination_class = rule->get_destination_class();
116 auto data_type = rule->get_descriptor()->get_data_type();
118 if (destination_class ==
"DataHandlerModule" || destination_class == dlh_class || destination_class == tph_class) {
119 if (data_type ==
"DataRequest") {
120 dlh_reqinput_qdesc = rule->get_descriptor();
122 tp_input_qdesc = rule->get_descriptor();
124 }
else if (destination_class ==
"FragmentAggregatorModule") {
125 fa_output_qdesc = rule->get_descriptor();
137 auto endpoint_class = rule->get_endpoint_class();
138 auto data_type = rule->get_descriptor()->get_data_type();
140 if (endpoint_class ==
"FragmentAggregatorModule") {
141 fa_net_desc = rule->get_descriptor();
142 }
else if (data_type ==
"TPSet") {
143 tp_net_desc = rule->get_descriptor();
144 }
else if (data_type ==
"TriggerActivity") {
145 ta_net_desc = rule->get_descriptor();
146 }
else if (data_type ==
"TimeSync") {
147 ts_net_desc = rule->get_descriptor();
153 if (fa_output_qdesc ==
nullptr) {
154 throw(BadConf(
ERS_HERE,
"No fragment output queue descriptor given"));
156 std::vector<const confmodel::Connection*> req_queues;
164 if (raw_data_callback_desc ==
nullptr) {
165 throw(BadConf(
ERS_HERE,
"No Raw Data Callback descriptor given"));
172 std::vector<const confmodel::DaqModule*> modules;
178 std::vector<std::pair<int16_t, const confmodel::DetectorStream*>> all_enabled_det_streams;
179 std::map<uint32_t, const appmodel::DataMoveCallbackConf*> callback_confs_by_sid;
181 std::vector<const conffwk::ConfigObject*> d2d_conn_objs;
182 uint16_t conn_idx = 0;
185 std::set<int16_t> numas;
187 uint16_t receiver_numa = 0;
190 if (helper->is_disabled(d2d_conn)) {
191 TLOG_DEBUG(7) <<
"Ignoring disabled DetectorToDaqConnection " << d2d_conn->UID();
195 d2d_conn_objs.push_back(&d2d_conn->config_object());
197 TLOG_DEBUG(6) <<
"Processing DetectorToDaqConnection " << d2d_conn->UID();
200 if (d2d_conn->senders().empty()) {
201 throw(BadConf(
ERS_HERE,
"DetectorToDaqConnection does not contain sebders or receivers"));
203 if (d2d_conn->receiver() ==
nullptr) {
204 throw(BadConf(
ERS_HERE,
"DetectorToDaqConnection does not contain a receiver"));
208 auto det_senders = d2d_conn->senders();
209 auto det_receiver = d2d_conn->receiver();
214 if (reader_class ==
"DPDKReaderModule" || reader_class ==
"SocketReaderModule") {
217 throw(BadConf(
ERS_HERE, fmt::format(
"{} requires NWDetDataReceiver, found {} of class {}", reader_class, det_receiver->UID(), det_receiver->class_name())));
220 if (reader_class ==
"DPDKReaderModule") {
225 bool all_nw_senders =
true;
226 for (
auto s : det_senders) {
231 if (!all_nw_senders) {
232 throw(BadConf(
ERS_HERE,
"Non-network DetDataSener found with NWreceiver"));
236 std::vector<const confmodel::DetectorStream*> enabled_det_streams;
238 for (
auto stream : d2d_conn->streams()) {
241 if (helper->is_disabled(stream)) {
242 TLOG_DEBUG(7) <<
"Ignoring disabled DetectorStream " << stream->UID();
247 all_enabled_det_streams.push_back(std::make_pair(receiver_numa, stream));
248 enabled_det_streams.push_back(stream);
249 numas.insert(receiver_numa);
265 std::string reader_uid(fmt::format(
"datareader-{}-{}", this->
UID(), std::to_string(conn_idx++)));
266 TLOG_DEBUG(6) << fmt::format(
"creating OKS configuration object for Data reader class {} with id {}", reader_class, reader_uid);
267 auto reader_obj = obj_fac.
create(reader_class, reader_uid);
270 reader_obj.
set_obj(
"configuration", &reader_conf->config_object());
271 reader_obj.set_objs(
"connections", d2d_conn_objs);
274 std::vector<const conffwk::ConfigObject*> raw_data_callback_objs;
277 for (
auto& [numa, ds] : all_enabled_det_streams) {
280 raw_data_callback_objs.push_back(&callback_conf->config_object());
281 callback_confs_by_sid[ds->get_source_id()] = callback_conf;
284 reader_obj.set_objs(
"raw_data_callbacks", raw_data_callback_objs);
295 std::vector<std::pair<uint32_t, const confmodel::Connection*>> tp_queues;
300 auto tph_conf_obj = tph_conf->config_object();
303 if ((tpsrc_ids.size() % 3) > 0) {
304 throw(BadConf(
ERS_HERE, fmt::format(
"number of TP source IDs must be a multiple of 3, current amount: {}", tpsrc_ids.size())));
307 for (
auto sid : tpsrc_ids) {
310 std::string tp_uid(
"tphandler-" + std::to_string(sid->get_sid()));
311 auto tph_obj = obj_fac.
create(tph_class, tp_uid);
312 tph_obj.
set_by_val<uint32_t>(
"source_id", sid->get_sid());
313 tph_obj.set_by_val<uint32_t>(
"detector_id", 1);
315 tph_obj.set_obj(
"module_configuration", &tph_conf_obj);
319 tp_queue_obj.
set_by_val<uint32_t>(
"recv_timeout_ms", 50);
320 tp_queue_obj.
set_by_val<uint32_t>(
"send_timeout_ms", 1);
334 tph_obj.
set_objs(
"inputs", { &tp_queue_obj, &tpreq_queue_obj });
335 tph_obj.set_objs(
"outputs", { &tp_net_obj, &ta_net_obj, &frag_queue_obj });
341 std::vector<std::pair<uint32_t, const conffwk::ConfigObject*>> tp_queue_objs;
342 for (
auto q : tp_queues) {
343 tp_queue_objs.push_back(std::make_pair(q.first, &q.second->config_object()));
352 auto lb_conf = dlh_conf->get_latency_buffer();
354 std::map<int16_t, conffwk::ConfigObject> numa_dhlconf_map;
355 for ( int16_t numa : numas ) {
356 auto lb_confobj_numa = obj_fac.
create(lb_conf->class_name(), fmt::format(
"{}-numa{}",lb_conf->UID(), numa));
357 lb_confobj_numa.
set_by_val<uint32_t>(
"size", lb_conf->get_size());
358 lb_confobj_numa.set_by_val<
bool>(
"numa_aware", lb_conf->get_numa_aware());
359 lb_confobj_numa.set_by_val<int16_t>(
"numa_node", numa);
360 lb_confobj_numa.set_by_val<
bool>(
"intrinsic_allocator", lb_conf->get_intrinsic_allocator());
361 lb_confobj_numa.set_by_val<uint32_t>(
"alignment_size", lb_conf->get_alignment_size());
362 lb_confobj_numa.set_by_val<
bool>(
"preallocation", lb_conf->get_preallocation());
364 auto dhl_confobj_numa = obj_fac.
create(dlh_conf->class_name(), fmt::format(
"{}-numa{}",dlh_conf->UID(), numa));
365 dhl_confobj_numa.
set_by_val<std::string>(
"template_for", dlh_conf->get_template_for());
366 dhl_confobj_numa.set_by_val<std::string>(
"input_data_type", dlh_conf->get_input_data_type());
367 dhl_confobj_numa.set_by_val<
bool>(
"generate_timesync", dlh_conf->get_generate_timesync());
368 dhl_confobj_numa.set_by_val<uint64_t>(
"post_processing_delay_ticks", dlh_conf->get_post_processing_delay_ticks());
369 dhl_confobj_numa.set_by_val<std::string>(
"input_data_type", dlh_conf->get_input_data_type());
370 dhl_confobj_numa.set_obj(
"request_handler", &dlh_conf->get_request_handler()->config_object());
371 dhl_confobj_numa.set_obj(
"latency_buffer", &lb_confobj_numa);
372 dhl_confobj_numa.set_obj(
"data_processor", &dlh_conf->get_data_processor()->config_object());
375 numa_dhlconf_map[numa] = dhl_confobj_numa;
379 auto emulation_mode = reader_conf->get_emulation_mode();
380 for (
auto& [numa, ds] : all_enabled_det_streams) {
381 uint32_t sid = ds->get_source_id();
382 TLOG_DEBUG(6) << fmt::format(
"Processing stream {}, id {}, det id {}", ds->UID(), ds->get_source_id(), ds->get_geo_id()->get_detector_id());
383 std::string uid(fmt::format(
"DLH-{}", sid));
384 TLOG_DEBUG(6) << fmt::format(
"creating OKS configuration object for Data Link Handler class {}, if {}", dlh_class, sid);
385 auto dlh_obj = obj_fac.
create(dlh_class, uid);
386 dlh_obj.
set_by_val<uint32_t>(
"source_id", sid);
387 dlh_obj.set_by_val<uint32_t>(
"detector_id", ds->get_geo_id()->get_detector_id());
389 dlh_obj.set_by_val<
bool>(
"emulation_mode", emulation_mode);
390 dlh_obj.set_obj(
"geo_id", &ds->get_geo_id()->config_object());
391 dlh_obj.set_obj(
"module_configuration", &numa_dhlconf_map[numa]);
392 dlh_obj.set_obj(
"raw_data_callback", &callback_confs_by_sid[sid]->
config_object());
394 std::vector<const conffwk::ConfigObject*> dlh_ins, dlh_outs;
402 dlh_ins.push_back(&req_queue_obj);
403 dlh_outs.push_back(&frag_queue_obj);
407 if (dlh_conf->get_generate_timesync()) {
410 dlh_outs.push_back(&ts_net_obj);
414 for (
auto tpq : tp_queue_objs) {
415 if ((sid / 100) == (tpq.first / 10)) {
416 dlh_outs.push_back(tpq.second);
419 dlh_obj.
set_objs(
"inputs", dlh_ins);
420 dlh_obj.set_objs(
"outputs", dlh_outs);
428 if (aggregator_conf == 0) {
429 throw(BadConf(
ERS_HERE,
"No FragmentAggregatorModule configuration given"));
431 std::string faUid(
"fragmentaggregator-" +
UID());
433 TLOG_DEBUG(7) <<
"creating OKS configuration object for Fragment Aggregator class ";
434 auto frag_aggr = obj_fac.
create(
"FragmentAggregatorModule", faUid);
439 std::vector<conffwk::ConfigObject> fragOutObjs;
440 for (
auto [uid, descriptor]:
441 helper->get_netdescriptors(
"Fragment",
"DFApplication")) {
442 std::string dreqNetUid(descriptor->get_uid_base() + uid);
443 auto frag_conn = obj_fac.
create(
"NetworkConnection", dreqNetUid);
445 frag_conn.
set_by_val<std::string>(
"data_type", descriptor->get_data_type());
446 frag_conn.set_by_val<std::string>(
"connection_type", descriptor->get_connection_type());
448 frag_conn.set_by_val<
int>(
"capacity", all_enabled_det_streams.size() * 2);
450 auto serviceObj = descriptor->get_associated_service()->config_object();
451 frag_conn.set_obj(
"associated_service", &serviceObj);
452 fragOutObjs.push_back(frag_conn);
456 std::vector<const conffwk::ConfigObject*> fa_output_objs;
457 for (
auto& fNet : fragOutObjs) {
458 fa_output_objs.push_back(&fNet);
461 for (
auto& q : req_queues) {
462 fa_output_objs.push_back(&q->config_object());
465 frag_aggr.
set_obj(
"configuration", &aggregator_conf->config_object());
466 frag_aggr.set_objs(
"inputs", { &fa_net_obj, &frag_queue_obj });
467 frag_aggr.set_objs(
"outputs", fa_output_objs);