Line data Source code
1 : /**
2 : * @file FakeDataApplication.cpp
3 : *
4 : * Implementation of FakeDataApplication's dal methods
5 : *
6 : * This is part of the DUNE DAQ Software Suite, copyright 2023.
7 : * Licensing/copyright details are in the COPYING file that you should have
8 : * received with this code.
9 : */
10 :
11 :
12 : #include "ConfigObjectFactory.hpp"
13 :
14 : #include "conffwk/Configuration.hpp"
15 : #include "oks/kernel.hpp"
16 :
17 : #include "confmodel/Connection.hpp"
18 : #include "confmodel/NetworkConnection.hpp"
19 : // #include "confmodel/ReadoutGroup.hpp"
20 : #include "confmodel/ResourceSet.hpp"
21 : #include "confmodel/Service.hpp"
22 : #include "confmodel/Session.hpp"
23 :
24 : #include "appmodel/FakeDataApplication.hpp"
25 : #include "appmodel/FakeDataProdConf.hpp"
26 : #include "appmodel/FakeDataProdModule.hpp"
27 : #include "appmodel/FragmentAggregatorModule.hpp"
28 : #include "appmodel/FragmentAggregatorConf.hpp"
29 : #include "appmodel/NetworkConnectionDescriptor.hpp"
30 : #include "appmodel/NetworkConnectionRule.hpp"
31 : #include "appmodel/QueueConnectionRule.hpp"
32 : #include "appmodel/QueueDescriptor.hpp"
33 :
34 : #include "appmodel/appmodelIssues.hpp"
35 :
36 : #include "logging/Logging.hpp"
37 :
38 : #include <string>
39 : #include <vector>
40 :
41 : namespace dunedaq {
42 : namespace appmodel {
43 :
44 : //-----------------------------------------------------------------------------
45 :
46 : std::vector<const confmodel::Resource*>
47 0 : FakeDataApplication::contained_resources() const {
48 0 : return to_resources(get_producers());
49 : }
50 :
51 : void
52 0 : FakeDataApplication::generate_modules(const confmodel::Session* session) const
53 : {
54 : // oks::OksFile::set_nolock_mode(true);
55 :
56 0 : std::vector<const confmodel::DaqModule*> modules;
57 :
58 0 : ConfigObjectFactory obj_fac(this);
59 :
60 :
61 : // Process the queue rules looking for inputs to our DL/TP handler modules
62 0 : const QueueDescriptor* dlhReqInputQDesc = nullptr;
63 0 : const QueueDescriptor* faOutputQDesc = nullptr;
64 :
65 0 : for (auto rule : get_queue_rules()) {
66 0 : auto destination_class = rule->get_destination_class();
67 0 : auto data_type = rule->get_descriptor()->get_data_type();
68 0 : if (destination_class == "FakeDataProdModule") {
69 0 : if (data_type == "DataRequest") {
70 0 : dlhReqInputQDesc = rule->get_descriptor();
71 : }
72 0 : } else if (destination_class == "FragmentAggregatorModule") {
73 0 : faOutputQDesc = rule->get_descriptor();
74 : }
75 0 : }
76 0 : if (faOutputQDesc == nullptr) {
77 0 : throw(BadConf(ERS_HERE, "No fragment output queue descriptor given"));
78 : }
79 0 : if (dlhReqInputQDesc == nullptr) {
80 0 : throw(BadConf(ERS_HERE, "No DLH request input queue descriptor given"));
81 : }
82 : // Process the network rules looking for the Fragment Aggregator and TP handler data reuest inputs
83 0 : const NetworkConnectionDescriptor* faNetDesc = nullptr;
84 0 : const NetworkConnectionDescriptor* tsNetDesc = nullptr;
85 0 : for (auto rule : get_network_rules()) {
86 0 : auto endpoint_class = rule->get_endpoint_class();
87 0 : if (endpoint_class == "FragmentAggregatorModule") {
88 0 : faNetDesc = rule->get_descriptor();
89 0 : } else if (endpoint_class == "FakeDataProdModule") {
90 0 : tsNetDesc = rule->get_descriptor();
91 : }
92 0 : }
93 0 : if (faNetDesc == nullptr) {
94 0 : throw(BadConf(ERS_HERE, "No Fragment output network descriptor given"));
95 : }
96 0 : if (tsNetDesc == nullptr) {
97 0 : throw(BadConf(ERS_HERE, "No TimeSync output network descriptor given"));
98 : }
99 :
100 : // Create here the Queue on which all data fragments are forwarded to the fragment aggregator
101 : // and a container for the queues of data request to TP handler and DLH
102 :
103 0 : std::vector<const confmodel::Connection*> faOutputQueues;
104 :
105 0 : conffwk::ConfigObject faQueueObj = obj_fac.create_queue_obj(faOutputQDesc, UID());
106 :
107 : // Create a FakeDataProdModule for each stream of this Readout Group
108 0 : for (auto fdpConf : get_producers()) {
109 0 : if (fdpConf->is_disabled(*session)) {
110 0 : TLOG_DEBUG(7) << "Ignoring disabled FakeDataProdConf " << fdpConf->UID();
111 0 : continue;
112 0 : }
113 :
114 0 : auto stream = fdpConf->cast<appmodel::FakeDataProdConf>();
115 0 : if (stream == nullptr) {
116 0 : throw(BadConf(ERS_HERE, "ReadoutGroup contains something other than FakeDataProdConf"));
117 : }
118 :
119 0 : auto id = stream->get_source_id();
120 0 : std::string uid("FakeDataProdModule-" + std::to_string(id));
121 0 : TLOG_DEBUG(7) << "creating OKS configuration object for FakeDataProdModule";
122 0 : conffwk::ConfigObject dlhObj = obj_fac.create("FakeDataProdModule", uid);
123 0 : dlhObj.set_obj("configuration", &stream->config_object());
124 :
125 : // Time Sync network connection
126 0 : auto tsNetObj = obj_fac.create_net_obj(tsNetDesc, std::to_string(id));
127 :
128 0 : dlhObj.set_objs("outputs", { &faQueueObj, &tsNetObj });
129 :
130 0 : auto reqQueueObj = obj_fac.create_queue_sid_obj(dlhReqInputQDesc, id);
131 :
132 : // Add the requessts queue dal pointer to the outputs of the FragmentAggregatorModule
133 0 : faOutputQueues.push_back(obj_fac.get_dal<confmodel::Connection>(
134 0 : dlhReqInputQDesc->get_uid_base() + std::to_string(id)));
135 :
136 0 : dlhObj.set_objs("inputs", { &reqQueueObj });
137 :
138 0 : modules.push_back(obj_fac.get_dal<FakeDataProdModule>(uid));
139 0 : }
140 :
141 : // Finally create Fragment Aggregator
142 0 : auto aggregator_conf = get_fragment_aggregator();
143 0 : if (aggregator_conf == 0) {
144 0 : throw(BadConf(ERS_HERE, "No FragmentAggregatorModule configuration given"));
145 : }
146 0 : std::string faUid("fragmentaggregator-" + UID());
147 0 : TLOG_DEBUG(7) << "creating OKS configuration object for Fragment Aggregator class ";
148 0 : conffwk::ConfigObject faObj = obj_fac.create("FragmentAggregatorModule", faUid);
149 :
150 : // Add network connection to TRBs
151 0 : conffwk::ConfigObject faNetObj = obj_fac.create_net_obj(faNetDesc, UID());
152 :
153 : // Add output queueus of data requests
154 0 : std::vector<const conffwk::ConfigObject*> qObjs;
155 0 : for (auto q : faOutputQueues) {
156 0 : qObjs.push_back(&q->config_object());
157 : }
158 0 : faObj.set_obj("configuration", &aggregator_conf->config_object());
159 0 : faObj.set_objs("inputs", { &faNetObj, &faQueueObj });
160 0 : faObj.set_objs("outputs", qObjs);
161 :
162 0 : modules.push_back(obj_fac.get_dal<FragmentAggregatorModule>(faUid));
163 :
164 0 : obj_fac.update_modules(modules);
165 0 : }
166 :
167 : } // namespace appmodel
168 : } // namespace dunedaq
|