Line data Source code
1 : /**
2 : * @file DFOModule_test.cxx Test application that tests and demonstrates
3 : * the functionality of the DFOModule class.
4 : *
5 : * This is part of the DUNE DAQ Application Framework, copyright 2020.
6 : * Licensing/copyright details are in the COPYING file that you should have
7 : * received with this code.
8 : */
9 :
10 : #include "DFOModule.hpp"
11 :
12 : #include "dfmessages/TriggerDecisionToken.hpp"
13 : #include "dfmessages/TriggerInhibit.hpp"
14 : #include "dfmodules/CommonIssues.hpp"
15 : #include "dfmodules/opmon/DFOModule.pb.h"
16 : #include "iomanager/IOManager.hpp"
17 : #include "iomanager/Sender.hpp"
18 : #include "opmonlib/TestOpMonManager.hpp"
19 :
20 : #define BOOST_TEST_MODULE DFOModule_test // NOLINT
21 :
22 : #include "boost/test/unit_test.hpp"
23 :
24 : #include <map>
25 : #include <memory>
26 : #include <string>
27 : #include <vector>
28 :
29 : using namespace dunedaq::dfmodules;
30 :
31 : namespace dunedaq {
32 :
33 : struct EnvFixture
34 : {
35 1 : EnvFixture() { setenv("DUNEDAQ_PARTITION", "partition_name", 0); }
36 : };
37 : BOOST_TEST_GLOBAL_FIXTURE(EnvFixture);
38 :
39 : struct CfgFixture
40 : {
41 6 : CfgFixture()
42 18 : {
43 6 : std::string oksConfig = "oksconflibs:test/config/datafloworchestrator_test.data.xml";
44 6 : std::string appName = "TestApp";
45 6 : std::string sessionName = "partition_name";
46 6 : cfgMgr = std::make_shared<dunedaq::appfwk::ConfigurationManager>(oksConfig, appName, sessionName);
47 6 : get_iomanager()->configure(sessionName, cfgMgr->get_queues(), cfgMgr->get_networkconnections(), nullptr, opmgr);
48 6 : }
49 6 : ~CfgFixture() { get_iomanager()->reset(); }
50 :
51 6 : auto get_dfo_info()
52 : {
53 :
54 6 : opmgr.collect();
55 6 : auto opmon_facility = opmgr.get_backend_facility();
56 6 : auto list = opmon_facility->get_entries(std::regex(".*DFOInfo"));
57 6 : BOOST_REQUIRE_EQUAL(list.size(), 1);
58 6 : const auto& entry = list.front();
59 12 : return opmonlib::from_entry<dfmodules::opmon::DFOInfo>(entry);
60 6 : }
61 :
62 : dunedaq::opmonlib::TestOpMonManager opmgr;
63 : std::shared_ptr<dunedaq::appfwk::ConfigurationManager> cfgMgr;
64 : };
65 :
66 : BOOST_FIXTURE_TEST_SUITE(DFOModule_test, CfgFixture)
67 :
68 : void
69 2 : send_init_token(std::string connection_name = "trigdec_0")
70 : {
71 2 : dfmessages::TriggerDecisionToken token;
72 2 : token.run_number = 0;
73 2 : token.trigger_number = 0;
74 2 : token.decision_destination = connection_name;
75 :
76 4 : TLOG() << "Sending Init TriggerDecisionToken to DFO";
77 2 : get_iom_sender<dfmessages::TriggerDecisionToken>("token")->send(std::move(token), iomanager::Sender::s_block);
78 2 : }
79 : void
80 7 : send_token(dfmessages::trigger_number_t trigger_number,
81 : std::string connection_name = "trigdec_0",
82 : bool different_run = false)
83 : {
84 7 : dfmessages::TriggerDecisionToken token;
85 7 : token.run_number = different_run ? 2 : 1;
86 7 : token.trigger_number = trigger_number;
87 7 : token.decision_destination = connection_name;
88 :
89 14 : TLOG() << "Sending TriggerDecisionToken with trigger number " << trigger_number << " to DFO";
90 7 : get_iom_sender<dfmessages::TriggerDecisionToken>("token")->send(std::move(token), iomanager::Sender::s_block);
91 7 : }
92 :
93 : void
94 3 : recv_trigdec(const dfmessages::TriggerDecision& decision)
95 : {
96 6 : TLOG() << "Received TriggerDecision with trigger number " << decision.trigger_number << " from DFO";
97 3 : std::this_thread::sleep_for(std::chrono::milliseconds(100));
98 3 : send_token(decision.trigger_number);
99 3 : }
100 :
101 : std::atomic<bool> busy_signal_recvd = false;
102 : void
103 2 : recv_triginh(const dfmessages::TriggerInhibit& inhibit)
104 : {
105 4 : TLOG() << "Received TriggerInhibit with busy=" << std::boolalpha << inhibit.busy << " from DFO";
106 2 : busy_signal_recvd = inhibit.busy;
107 2 : }
108 :
109 : void
110 5 : send_trigdec(dfmessages::trigger_number_t trigger_number, bool different_run = false)
111 : {
112 5 : dunedaq::dfmessages::TriggerDecision td;
113 5 : td.trigger_number = trigger_number;
114 5 : td.run_number = different_run ? 2 : 1;
115 5 : td.trigger_timestamp = 1;
116 5 : td.trigger_type = 1;
117 5 : td.readout_type = dunedaq::dfmessages::ReadoutType::kLocalized;
118 5 : auto iom = iomanager::IOManager::get();
119 10 : TLOG() << "Sending TriggerDecision with trigger number " << trigger_number << " to DFO";
120 5 : iom->get_sender<dfmessages::TriggerDecision>("trigdec")->send(std::move(td), iomanager::Sender::s_block);
121 5 : }
122 :
123 2 : BOOST_AUTO_TEST_CASE(CopyAndMoveSemantics)
124 : {
125 1 : BOOST_REQUIRE(!std::is_copy_constructible_v<DFOModule>);
126 1 : BOOST_REQUIRE(!std::is_copy_assignable_v<DFOModule>);
127 1 : BOOST_REQUIRE(!std::is_move_constructible_v<DFOModule>);
128 1 : BOOST_REQUIRE(!std::is_move_assignable_v<DFOModule>);
129 1 : }
130 :
131 2 : BOOST_AUTO_TEST_CASE(Constructor)
132 : {
133 1 : auto dfo = appfwk::make_module("DFOModule", "test");
134 1 : }
135 :
136 2 : BOOST_AUTO_TEST_CASE(Init)
137 : {
138 1 : auto dfo = appfwk::make_module("DFOModule", "test");
139 1 : dfo->init(cfgMgr);
140 1 : }
141 :
142 2 : BOOST_AUTO_TEST_CASE(Commands)
143 : {
144 1 : auto dfo = appfwk::make_module("DFOModule", "test");
145 1 : opmgr.register_node("dfo", dfo);
146 1 : dfo->init(cfgMgr);
147 :
148 1 : appfwk::DAQModule::CommandData_t null_data;
149 1 : appfwk::DAQModule::CommandData_t start_data;
150 1 : start_data.emplace("run", 1);
151 :
152 1 : dfo->execute_command("conf", null_data);
153 1 : dfo->execute_command("start", start_data);
154 1 : dfo->execute_command("drain_dataflow", null_data);
155 1 : dfo->execute_command("scrap", null_data);
156 :
157 1 : auto metric = get_dfo_info();
158 1 : BOOST_REQUIRE_EQUAL(metric.tokens_received(), 0);
159 1 : BOOST_REQUIRE_EQUAL(metric.decisions_received(), 0);
160 1 : BOOST_REQUIRE_EQUAL(metric.decisions_sent(), 0);
161 1 : BOOST_REQUIRE_EQUAL(metric.forwarding_decision(), 0);
162 1 : BOOST_REQUIRE_EQUAL(metric.waiting_for_decision(), 0);
163 1 : BOOST_REQUIRE_EQUAL(metric.deciding_destination(), 0);
164 1 : BOOST_REQUIRE_EQUAL(metric.waiting_for_token(), 0);
165 1 : BOOST_REQUIRE_EQUAL(metric.processing_token(), 0);
166 1 : }
167 :
168 2 : BOOST_AUTO_TEST_CASE(DataFlow)
169 : {
170 1 : auto dfo = appfwk::make_module("DFOModule", "test");
171 1 : opmgr.register_node("dfo", dfo);
172 1 : dfo->init(cfgMgr);
173 :
174 1 : appfwk::DAQModule::CommandData_t null_data;
175 1 : appfwk::DAQModule::CommandData_t start_data;
176 1 : start_data.emplace("run", 1);
177 :
178 1 : dfo->execute_command("conf", null_data);
179 :
180 1 : auto iom = iomanager::IOManager::get();
181 1 : auto dec_recv = iom->get_receiver<dfmessages::TriggerDecision>("trigdec_0");
182 1 : dec_recv->add_callback(recv_trigdec);
183 1 : auto inh_recv = iom->get_receiver<dfmessages::TriggerInhibit>("triginh");
184 1 : inh_recv->add_callback(recv_triginh);
185 :
186 1 : send_trigdec(1, true);
187 1 : std::this_thread::sleep_for(std::chrono::milliseconds(50));
188 :
189 1 : send_token(999, "trigdec_0", true);
190 1 : send_token(9999, "trigdec_0", true);
191 1 : std::this_thread::sleep_for(std::chrono::milliseconds(50));
192 :
193 : // Note: Counters are reset by calling get_dfo_info!
194 1 : auto metric = get_dfo_info();
195 :
196 1 : BOOST_REQUIRE_EQUAL(metric.tokens_received(), 0);
197 :
198 1 : dfo->execute_command("start", start_data);
199 1 : send_init_token();
200 :
201 1 : std::this_thread::sleep_for(std::chrono::milliseconds(150));
202 :
203 1 : metric = get_dfo_info();
204 1 : BOOST_REQUIRE_EQUAL(metric.tokens_received(), 0);
205 1 : BOOST_REQUIRE_EQUAL(metric.decisions_received(), 0);
206 1 : BOOST_REQUIRE_EQUAL(metric.decisions_sent(), 0);
207 :
208 1 : send_trigdec(2);
209 1 : send_trigdec(3);
210 1 : std::this_thread::sleep_for(std::chrono::milliseconds(50));
211 1 : send_trigdec(4);
212 :
213 1 : metric = get_dfo_info();
214 1 : BOOST_REQUIRE_EQUAL(metric.tokens_received(), 0);
215 1 : BOOST_REQUIRE_EQUAL(metric.decisions_received(), 2);
216 1 : BOOST_REQUIRE_EQUAL(metric.decisions_sent(), 2);
217 :
218 1 : BOOST_REQUIRE(busy_signal_recvd.load());
219 1 : std::this_thread::sleep_for(std::chrono::milliseconds(400));
220 :
221 1 : metric = get_dfo_info();
222 1 : BOOST_REQUIRE_EQUAL(metric.tokens_received(), 3);
223 1 : BOOST_REQUIRE_EQUAL(metric.decisions_received(), 1);
224 1 : BOOST_REQUIRE_EQUAL(metric.decisions_sent(), 1);
225 1 : BOOST_REQUIRE(!busy_signal_recvd.load());
226 :
227 1 : dfo->execute_command("drain_dataflow", null_data);
228 1 : dfo->execute_command("scrap", null_data);
229 :
230 1 : dec_recv->remove_callback();
231 1 : inh_recv->remove_callback();
232 1 : }
233 :
234 2 : BOOST_AUTO_TEST_CASE(SendTrigDecFailed)
235 : {
236 1 : auto dfo = appfwk::make_module("DFOModule", "test");
237 1 : opmgr.register_node("dfo", dfo);
238 1 : dfo->init(cfgMgr);
239 :
240 1 : appfwk::DAQModule::CommandData_t null_data;
241 1 : appfwk::DAQModule::CommandData_t start_data;
242 1 : start_data.emplace("run", 1);
243 :
244 1 : dfo->execute_command("conf", null_data);
245 :
246 1 : dfo->execute_command("start", start_data);
247 :
248 1 : send_init_token("invalid_connection");
249 :
250 1 : std::this_thread::sleep_for(std::chrono::milliseconds(50));
251 :
252 1 : send_trigdec(1);
253 1 : std::this_thread::sleep_for(std::chrono::milliseconds(150));
254 :
255 1 : auto info = get_dfo_info();
256 1 : BOOST_REQUIRE_EQUAL(info.tokens_received(), 0);
257 1 : BOOST_REQUIRE_EQUAL(info.decisions_received(), 1);
258 1 : BOOST_REQUIRE_EQUAL(info.decisions_sent(), 0);
259 :
260 : // FWIW, tell the DFO to retry the invalid connection
261 1 : send_token(1000, "invalid_connection");
262 1 : std::this_thread::sleep_for(std::chrono::milliseconds(50));
263 :
264 : // Token for unknown dataflow app
265 1 : send_token(1000);
266 1 : std::this_thread::sleep_for(std::chrono::milliseconds(50));
267 :
268 1 : dfo->execute_command("drain_dataflow", null_data);
269 1 : dfo->execute_command("scrap", null_data);
270 1 : }
271 :
272 : BOOST_AUTO_TEST_SUITE_END()
273 : } // namespace dunedaq
|