Line data Source code
1 : /**
2 : * @file DFOModule_test.cxx Test application that tests and demonstrates
3 : * the functionality of the DFOModule class.
4 : *
5 : * This is part of the DUNE DAQ Application Framework, copyright 2020.
6 : * Licensing/copyright details are in the COPYING file that you should have
7 : * received with this code.
8 : */
9 :
10 : #include "DFOModule.hpp"
11 :
12 : #include "dfmessages/TriggerDecisionToken.hpp"
13 : #include "dfmessages/TriggerInhibit.hpp"
14 : #include "dfmodules/CommonIssues.hpp"
15 : #include "iomanager/IOManager.hpp"
16 : #include "iomanager/Sender.hpp"
17 : #include "opmonlib/TestOpMonManager.hpp"
18 : #include "dfmodules/opmon/DFOModule.pb.h"
19 :
20 : #define BOOST_TEST_MODULE DFOModule_test // NOLINT
21 :
22 : #include "boost/test/unit_test.hpp"
23 :
24 : #include <map>
25 : #include <memory>
26 : #include <string>
27 : #include <vector>
28 :
29 : using namespace dunedaq::dfmodules;
30 :
31 : namespace dunedaq {
32 :
33 :
34 : struct EnvFixture
35 : {
36 1 : EnvFixture() { setenv("DUNEDAQ_PARTITION", "partition_name", 0); }
37 : };
38 : BOOST_TEST_GLOBAL_FIXTURE(EnvFixture);
39 :
40 : struct CfgFixture
41 : {
42 6 : CfgFixture()
43 6 : { std::string oksConfig = "oksconflibs:test/config/datafloworchestrator_test.data.xml";
44 6 : std::string appName = "TestApp";
45 6 : std::string sessionName = "partition_name";
46 6 : cfgMgr = std::make_shared<dunedaq::appfwk::ConfigurationManager>(oksConfig, appName, sessionName);
47 6 : get_iomanager()->configure(sessionName, cfgMgr->get_queues(), cfgMgr->get_networkconnections(), nullptr, opmgr);
48 6 : }
49 6 : ~CfgFixture() {
50 6 : get_iomanager()->reset();
51 6 : }
52 :
53 6 : auto get_dfo_info() {
54 :
55 6 : opmgr.collect();
56 6 : auto opmon_facility = opmgr.get_backend_facility();
57 6 : auto list = opmon_facility->get_entries(std::regex(".*DFOInfo"));
58 6 : BOOST_REQUIRE_EQUAL(list.size(), 1);
59 6 : const auto & entry = list.front();
60 12 : return opmonlib::from_entry<dfmodules::opmon::DFOInfo>( entry );
61 6 : }
62 :
63 : dunedaq::opmonlib::TestOpMonManager opmgr;
64 : std::shared_ptr<dunedaq::appfwk::ConfigurationManager> cfgMgr;
65 : };
66 :
67 : BOOST_FIXTURE_TEST_SUITE(DFOModule_test, CfgFixture)
68 :
69 :
70 : void
71 2 : send_init_token(std::string connection_name = "trigdec_0")
72 : {
73 2 : dfmessages::TriggerDecisionToken token;
74 2 : token.run_number = 0;
75 2 : token.trigger_number = 0;
76 2 : token.decision_destination = connection_name;
77 :
78 4 : TLOG() << "Sending Init TriggerDecisionToken to DFO";
79 2 : get_iom_sender<dfmessages::TriggerDecisionToken>("token")->send(std::move(token), iomanager::Sender::s_block);
80 2 : }
81 : void
82 7 : send_token(dfmessages::trigger_number_t trigger_number,
83 : std::string connection_name = "trigdec_0",
84 : bool different_run = false)
85 : {
86 7 : dfmessages::TriggerDecisionToken token;
87 7 : token.run_number = different_run ? 2 : 1;
88 7 : token.trigger_number = trigger_number;
89 7 : token.decision_destination = connection_name;
90 :
91 14 : TLOG() << "Sending TriggerDecisionToken with trigger number " << trigger_number << " to DFO";
92 7 : get_iom_sender<dfmessages::TriggerDecisionToken>("token")->send(std::move(token), iomanager::Sender::s_block);
93 7 : }
94 :
95 : void
96 3 : recv_trigdec(const dfmessages::TriggerDecision& decision)
97 : {
98 6 : TLOG() << "Received TriggerDecision with trigger number " << decision.trigger_number << " from DFO";
99 3 : std::this_thread::sleep_for(std::chrono::milliseconds(100));
100 3 : send_token(decision.trigger_number);
101 3 : }
102 :
103 : std::atomic<bool> busy_signal_recvd = false;
104 : void
105 2 : recv_triginh(const dfmessages::TriggerInhibit& inhibit)
106 : {
107 4 : TLOG() << "Received TriggerInhibit with busy=" << std::boolalpha << inhibit.busy << " from DFO";
108 2 : busy_signal_recvd = inhibit.busy;
109 2 : }
110 :
111 : void
112 5 : send_trigdec(dfmessages::trigger_number_t trigger_number, bool different_run = false)
113 : {
114 5 : dunedaq::dfmessages::TriggerDecision td;
115 5 : td.trigger_number = trigger_number;
116 5 : td.run_number = different_run ? 2 : 1;
117 5 : td.trigger_timestamp = 1;
118 5 : td.trigger_type = 1;
119 5 : td.readout_type = dunedaq::dfmessages::ReadoutType::kLocalized;
120 5 : auto iom = iomanager::IOManager::get();
121 10 : TLOG() << "Sending TriggerDecision with trigger number " << trigger_number << " to DFO";
122 5 : iom->get_sender<dfmessages::TriggerDecision>("trigdec")->send(std::move(td), iomanager::Sender::s_block);
123 5 : }
124 :
125 2 : BOOST_AUTO_TEST_CASE(CopyAndMoveSemantics)
126 : {
127 1 : BOOST_REQUIRE(!std::is_copy_constructible_v<DFOModule>);
128 1 : BOOST_REQUIRE(!std::is_copy_assignable_v<DFOModule>);
129 1 : BOOST_REQUIRE(!std::is_move_constructible_v<DFOModule>);
130 1 : BOOST_REQUIRE(!std::is_move_assignable_v<DFOModule>);
131 1 : }
132 :
133 2 : BOOST_AUTO_TEST_CASE(Constructor)
134 : {
135 1 : auto dfo = appfwk::make_module("DFOModule", "test");
136 1 : }
137 :
138 2 : BOOST_AUTO_TEST_CASE(Init)
139 : {
140 1 : auto dfo = appfwk::make_module("DFOModule", "test");
141 1 : dfo->init(cfgMgr);
142 1 : }
143 :
144 2 : BOOST_AUTO_TEST_CASE(Commands)
145 : {
146 1 : auto dfo = appfwk::make_module("DFOModule", "test");
147 1 : opmgr.register_node("dfo", dfo);
148 1 : dfo->init(cfgMgr);
149 :
150 1 : auto start_json = static_cast<appfwk::DAQModule::CommandData_t>("{\"run\": 1}"_json);
151 1 : appfwk::DAQModule::CommandData_t null_json;
152 :
153 1 : dfo->execute_command("conf", null_json);
154 1 : dfo->execute_command("start", start_json);
155 1 : dfo->execute_command("drain_dataflow", null_json);
156 1 : dfo->execute_command("scrap", null_json);
157 :
158 1 : auto metric = get_dfo_info();
159 1 : BOOST_REQUIRE_EQUAL(metric.tokens_received(), 0);
160 1 : BOOST_REQUIRE_EQUAL(metric.decisions_received(), 0);
161 1 : BOOST_REQUIRE_EQUAL(metric.decisions_sent(), 0);
162 1 : BOOST_REQUIRE_EQUAL(metric.forwarding_decision(), 0);
163 1 : BOOST_REQUIRE_EQUAL(metric.waiting_for_decision(), 0);
164 1 : BOOST_REQUIRE_EQUAL(metric.deciding_destination(), 0);
165 1 : BOOST_REQUIRE_EQUAL(metric.waiting_for_token(), 0);
166 1 : BOOST_REQUIRE_EQUAL(metric.processing_token(), 0);
167 :
168 1 : }
169 :
170 2 : BOOST_AUTO_TEST_CASE(DataFlow)
171 : {
172 1 : auto dfo = appfwk::make_module("DFOModule", "test");
173 1 : opmgr.register_node("dfo", dfo);
174 1 : dfo->init(cfgMgr);
175 :
176 1 : auto start_json = static_cast<appfwk::DAQModule::CommandData_t>("{\"run\": 1}"_json);
177 1 : appfwk::DAQModule::CommandData_t null_json;
178 :
179 1 : dfo->execute_command("conf", null_json);
180 :
181 1 : auto iom = iomanager::IOManager::get();
182 1 : auto dec_recv = iom->get_receiver<dfmessages::TriggerDecision>("trigdec_0");
183 1 : dec_recv->add_callback(recv_trigdec);
184 1 : auto inh_recv = iom->get_receiver<dfmessages::TriggerInhibit>("triginh");
185 1 : inh_recv->add_callback(recv_triginh);
186 :
187 1 : send_trigdec(1, true);
188 1 : std::this_thread::sleep_for(std::chrono::milliseconds(50));
189 :
190 1 : send_token(999, "trigdec_0", true);
191 1 : send_token(9999, "trigdec_0", true);
192 1 : std::this_thread::sleep_for(std::chrono::milliseconds(50));
193 :
194 :
195 :
196 : // Note: Counters are reset by calling get_dfo_info!
197 1 : auto metric = get_dfo_info();
198 :
199 1 : BOOST_REQUIRE_EQUAL(metric.tokens_received(), 0);
200 :
201 1 : dfo->execute_command("start", start_json);
202 1 : send_init_token();
203 :
204 1 : std::this_thread::sleep_for(std::chrono::milliseconds(150));
205 :
206 1 : metric = get_dfo_info();
207 1 : BOOST_REQUIRE_EQUAL(metric.tokens_received(), 0);
208 1 : BOOST_REQUIRE_EQUAL(metric.decisions_received(), 0);
209 1 : BOOST_REQUIRE_EQUAL(metric.decisions_sent(), 0);
210 :
211 1 : send_trigdec(2);
212 1 : send_trigdec(3);
213 1 : std::this_thread::sleep_for(std::chrono::milliseconds(50));
214 1 : send_trigdec(4);
215 :
216 1 : metric = get_dfo_info();
217 1 : BOOST_REQUIRE_EQUAL(metric.tokens_received(), 0);
218 1 : BOOST_REQUIRE_EQUAL(metric.decisions_received(), 2);
219 1 : BOOST_REQUIRE_EQUAL(metric.decisions_sent(), 2);
220 :
221 1 : BOOST_REQUIRE(busy_signal_recvd.load());
222 1 : std::this_thread::sleep_for(std::chrono::milliseconds(400));
223 :
224 1 : metric = get_dfo_info();
225 1 : BOOST_REQUIRE_EQUAL(metric.tokens_received(), 3);
226 1 : BOOST_REQUIRE_EQUAL(metric.decisions_received(), 1);
227 1 : BOOST_REQUIRE_EQUAL(metric.decisions_sent(), 1);
228 1 : BOOST_REQUIRE(!busy_signal_recvd.load());
229 :
230 1 : dfo->execute_command("drain_dataflow", null_json);
231 1 : dfo->execute_command("scrap", null_json);
232 :
233 1 : dec_recv->remove_callback();
234 1 : inh_recv->remove_callback();
235 1 : }
236 :
237 2 : BOOST_AUTO_TEST_CASE(SendTrigDecFailed)
238 : {
239 1 : auto dfo = appfwk::make_module("DFOModule", "test");
240 1 : opmgr.register_node("dfo", dfo);
241 1 : dfo->init(cfgMgr);
242 :
243 1 : auto start_json = static_cast<appfwk::DAQModule::CommandData_t>("{\"run\": 1}"_json);
244 1 : appfwk::DAQModule::CommandData_t null_json;
245 :
246 1 : dfo->execute_command("conf", null_json);
247 :
248 1 : dfo->execute_command("start", start_json);
249 :
250 1 : send_init_token("invalid_connection");
251 :
252 1 : std::this_thread::sleep_for(std::chrono::milliseconds(50));
253 :
254 1 : send_trigdec(1);
255 1 : std::this_thread::sleep_for(std::chrono::milliseconds(150));
256 :
257 1 : auto info = get_dfo_info();
258 1 : BOOST_REQUIRE_EQUAL(info.tokens_received(), 0);
259 1 : BOOST_REQUIRE_EQUAL(info.decisions_received(), 1);
260 1 : BOOST_REQUIRE_EQUAL(info.decisions_sent(), 0);
261 :
262 : // FWIW, tell the DFO to retry the invalid connection
263 1 : send_token(1000, "invalid_connection");
264 1 : std::this_thread::sleep_for(std::chrono::milliseconds(50));
265 :
266 : // Token for unknown dataflow app
267 1 : send_token(1000);
268 1 : std::this_thread::sleep_for(std::chrono::milliseconds(50));
269 :
270 1 : dfo->execute_command("drain_dataflow", null_json);
271 1 : dfo->execute_command("scrap", null_json);
272 1 : }
273 :
274 : BOOST_AUTO_TEST_SUITE_END()
275 : } // namespace dunedaq
|