81 TLOG_DEBUG(6) <<
"Generating modules for application " << this->
UID();
92 if (reader_conf == 0) {
93 throw(BadConf(
ERS_HERE,
"No DataReaderModule configuration given"));
95 std::string reader_class = reader_conf->get_template_for();
100 auto dlh_class = dlh_conf->get_template_for();
104 throw(BadConf(
ERS_HERE,
"TP generation is enabled but there is no TP data handler configuration"));
107 std::string tph_class =
"";
109 tph_class = tph_conf->get_template_for();
122 auto destination_class = rule->get_destination_class();
123 auto data_type = rule->get_descriptor()->get_data_type();
125 if (destination_class ==
"DataHandlerModule" || destination_class == dlh_class || destination_class == tph_class) {
126 if (data_type ==
"DataRequest") {
127 dlh_reqinput_qdesc = rule->get_descriptor();
129 tp_input_qdesc = rule->get_descriptor();
131 dlh_input_qdesc = rule->get_descriptor();
133 }
else if (destination_class ==
"FragmentAggregatorModule") {
134 fa_output_qdesc = rule->get_descriptor();
138 if (dlh_input_qdesc ==
nullptr) {
139 throw(BadConf(
ERS_HERE,
"No data link handler input queue descriptor given"));
141 if (dlh_reqinput_qdesc ==
nullptr) {
142 throw(BadConf(
ERS_HERE,
"No data link handler request input queue descriptor given"));
153 auto endpoint_class = rule->get_endpoint_class();
154 auto data_type = rule->get_descriptor()->get_data_type();
156 if (endpoint_class ==
"FragmentAggregatorModule") {
157 fa_net_desc = rule->get_descriptor();
158 }
else if (data_type ==
"TPSet") {
159 tp_net_desc = rule->get_descriptor();
160 }
else if (data_type ==
"TriggerActivity") {
161 ta_net_desc = rule->get_descriptor();
162 }
else if (data_type ==
"TimeSync") {
163 ts_net_desc = rule->get_descriptor();
167 if (fa_net_desc ==
nullptr) {
168 throw(BadConf(
ERS_HERE,
"No Fragment Aggregator network descriptor given"));
170 if (ts_net_desc ==
nullptr && dlh_conf->get_generate_timesync()) {
171 throw(BadConf(
ERS_HERE,
"No Time Sync network descriptor given but time sync generation is enabled"));
176 if (fa_output_qdesc ==
nullptr) {
177 throw(BadConf(
ERS_HERE,
"No fragment output queue descriptor given"));
179 std::vector<const confmodel::Connection*> req_queues;
186 std::vector<const confmodel::DaqModule*> modules;
192 std::vector<const confmodel::DetectorStream*> all_enabled_det_streams;
193 std::map<uint32_t, const confmodel::Connection*> data_queues_by_sid;
196 uint16_t conn_idx = 0;
199 if (d2d_conn->is_disabled(*
session)) {
200 TLOG_DEBUG(7) <<
"Ignoring disabled DetectorToDaqConnection " << d2d_conn->UID();
204 TLOG_DEBUG(6) <<
"Processing DetectorToDaqConnection " << d2d_conn->UID();
208 if (d2d_conn->senders().empty()) {
209 throw(BadConf(
ERS_HERE,
"DetectorToDaqConnection does not contain senders"));
211 if (d2d_conn->receiver() ==
nullptr) {
212 throw(BadConf(
ERS_HERE,
"DetectorToDaqConnection does not contain a receiver"));
216 auto det_senders = d2d_conn->senders();
217 auto det_receiver = d2d_conn->receiver();
219 std::vector<const confmodel::DetectorStream*> enabled_det_streams;
221 for (
auto stream : d2d_conn->streams()) {
224 if (stream->is_disabled(*
session)) {
225 TLOG_DEBUG(7) <<
"Ignoring disabled DetectorStream " << stream->UID();
229 all_enabled_det_streams.push_back(stream);
230 enabled_det_streams.push_back(stream);
236 if (reader_class ==
"DPDKReaderModule" || reader_class ==
"SocketReaderModule") {
237 if (!d2d_conn->castable(
"NetworkDetectorToDaqConnection")) {
238 throw(BadConf(
ERS_HERE, fmt::format(
"{} requires NetworkDetectorToDaqConnection, found {} of class {}", reader_class, d2d_conn->UID(), d2d_conn->class_name())));
242 throw(BadConf(
ERS_HERE, fmt::format(
"{} requires NWDetDataReceiver, found {} of class {}", reader_class, det_receiver->UID(), det_receiver->class_name())));
245 else if (reader_class ==
"FelixReaderModule") {
246 if (!d2d_conn->castable(
"FelixDetectorToDaqConnection")) {
247 throw(BadConf(
ERS_HERE, fmt::format(
"{} requires FelixDetectorToDaqConnection, found {} of class {}", reader_class, d2d_conn->UID(), d2d_conn->class_name())));
250 throw(BadConf(
ERS_HERE, fmt::format(
"FelixReaderModule requires FelixDataReceiver, found {} of class {}", det_receiver->UID(), det_receiver->class_name())));
266 std::string reader_uid(fmt::format(
"datareader-{}-{}", this->
UID(), std::to_string(conn_idx++)));
267 TLOG_DEBUG(6) << fmt::format(
"creating OKS configuration object for Data reader class {} with id {}", reader_class, reader_uid);
268 auto reader_obj = obj_fac.
create(reader_class, reader_uid);
271 reader_obj.
set_obj(
"configuration", &reader_conf->config_object());
272 reader_obj.set_objs(
"connections", {&d2d_conn->config_object()});
275 std::vector<const conffwk::ConfigObject*> data_queue_objs;
279 for (
auto ds : enabled_det_streams) {
282 data_queue_objs.push_back(&data_queue->config_object());
283 data_queues_by_sid[ds->get_source_id()] = data_queue;
286 reader_obj.set_objs(
"outputs", data_queue_objs);
296 std::vector<const confmodel::Connection*> tp_queues;
298 if (tp_input_qdesc ==
nullptr) {
299 throw(BadConf(
ERS_HERE,
"TP generation is enabled but no TP input queue descriptor given"));
301 if (tp_net_desc ==
nullptr) {
302 throw(BadConf(
ERS_HERE,
"TP generation is enabled but no TPSet network descriptor given"));
304 if (ta_net_desc ==
nullptr) {
305 throw(BadConf(
ERS_HERE,
"TP generation is enabled but no TriggerActivity network descriptor given"));
308 auto tph_conf_obj = tph_conf->config_object();
311 for (
auto sid : tpsrc_ids) {
314 std::string tp_uid(
"tphandler-" + std::to_string(sid->get_sid()));
315 auto tph_obj = obj_fac.
create(tph_class, tp_uid);
316 tph_obj.
set_by_val<uint32_t>(
"source_id", sid->get_sid());
317 tph_obj.set_by_val<uint32_t>(
"detector_id", 1);
319 tph_obj.set_obj(
"module_configuration", &tph_conf_obj);
323 tp_queue_obj.
set_by_val<uint32_t>(
"recv_timeout_ms", 50);
324 tp_queue_obj.
set_by_val<uint32_t>(
"send_timeout_ms", 1);
338 tph_obj.
set_objs(
"inputs", { &tp_queue_obj, &tpreq_queue_obj });
339 tph_obj.set_objs(
"outputs", { &tp_net_obj, &ta_net_obj, &frag_queue_obj });
346 std::vector<const conffwk::ConfigObject*> tp_queue_objs;
347 for (
auto q : tp_queues) {
348 tp_queue_objs.push_back(&q->config_object());
356 auto emulation_mode = reader_conf->get_emulation_mode();
357 for (
auto ds : all_enabled_det_streams) {
359 uint32_t sid = ds->get_source_id();
360 TLOG_DEBUG(6) << fmt::format(
"Processing stream {}, id {}, det id {}", ds->UID(), ds->get_source_id(), ds->get_geo_id()->get_detector_id());
361 std::string uid(fmt::format(
"DLH-{}", sid));
362 TLOG_DEBUG(6) << fmt::format(
"creating OKS configuration object for Data Link Handler class {}, if {}", dlh_class, sid);
363 auto dlh_obj = obj_fac.
create(dlh_class, uid);
364 dlh_obj.
set_by_val<uint32_t>(
"source_id", sid);
365 dlh_obj.set_by_val<uint32_t>(
"detector_id", ds->get_geo_id()->get_detector_id());
367 dlh_obj.set_by_val<
bool>(
"emulation_mode", emulation_mode);
368 dlh_obj.set_obj(
"geo_id", &ds->get_geo_id()->config_object());
369 dlh_obj.set_obj(
"module_configuration", &dlh_conf->config_object());
371 std::vector<const conffwk::ConfigObject*> dlh_ins, dlh_outs;
374 dlh_ins.push_back(&data_queues_by_sid.at(sid)->config_object());
382 dlh_ins.push_back(&req_queue_obj);
383 dlh_outs.push_back(&frag_queue_obj);
387 if (dlh_conf->get_generate_timesync()) {
390 dlh_outs.push_back(&ts_net_obj);
393 for (
auto tpq : tp_queue_objs) {
394 dlh_outs.push_back(tpq);
396 dlh_obj.
set_objs(
"inputs", dlh_ins);
397 dlh_obj.set_objs(
"outputs", dlh_outs);
404 if (aggregator_conf == 0) {
405 throw(BadConf(
ERS_HERE,
"No FragmentAggregatorModule configuration given"));
407 std::string faUid(
"fragmentaggregator-" +
UID());
409 TLOG_DEBUG(7) <<
"creating OKS configuration object for Fragment Aggregator class ";
410 auto frag_aggr = obj_fac.
create(
"FragmentAggregatorModule", faUid);
415 auto sessionApps =
session->enabled_applications();
416 std::vector<conffwk::ConfigObject> fragOutObjs;
417 for (
auto app : sessionApps) {
419 if (dfapp ==
nullptr)
423 for (
auto rule : dfNRules) {
424 auto descriptor = rule->get_descriptor();
425 auto data_type = descriptor->get_data_type();
426 if (data_type ==
"Fragment") {
427 std::string dreqNetUid(descriptor->get_uid_base() + dfapp->UID());
430 auto frag_conn = obj_fac.
create(
"NetworkConnection", dreqNetUid);
432 frag_conn.
set_by_val<std::string>(
"data_type", descriptor->get_data_type());
435 frag_conn.set_by_val<
int>(
"capacity", all_enabled_det_streams.size() * 2);
436 frag_conn.set_by_val<std::string>(
"connection_type", descriptor->get_connection_type());
438 auto serviceObj = descriptor->get_associated_service()->config_object();
439 frag_conn.set_obj(
"associated_service", &serviceObj);
440 fragOutObjs.push_back(frag_conn);
446 std::vector<const conffwk::ConfigObject*> fa_output_objs;
447 for (
auto& fNet : fragOutObjs) {
448 fa_output_objs.push_back(&fNet);
451 for (
auto& q : req_queues) {
452 fa_output_objs.push_back(&q->config_object());
455 frag_aggr.set_obj(
"configuration", &aggregator_conf->config_object());
456 frag_aggr.set_objs(
"inputs", { &fa_net_obj, &frag_queue_obj });
457 frag_aggr.set_objs(
"outputs", fa_output_objs);