Line data Source code
1 : /**
2 : * @file DAQModuleManager.cpp DAQModuleManager implementataion
3 : *
4 : * This is part of the DUNE DAQ Application Framework, copyright 2020.
5 : * Licensing/copyright details are in the COPYING file that you should have
6 : * received with this code.
7 : */
8 :
9 : #include "DAQModuleManager.hpp"
10 :
11 : #include "appfwk/DAQModule.hpp"
12 : #include "appfwk/cmd/Nljs.hpp"
13 :
14 : #include "cmdlib/cmd/Nljs.hpp"
15 : #include "confmodel/DaqModulesGroup.hpp"
16 : #include "confmodel/DaqModulesGroupById.hpp"
17 : #include "confmodel/DaqModulesGroupByType.hpp"
18 : #include "confmodel/Session.hpp"
19 : #include "iomanager/IOManager.hpp"
20 : #include "logging/Logging.hpp"
21 :
22 : #include <future>
23 : #include <map>
24 : #include <memory>
25 : #include <regex>
26 : #include <set>
27 : #include <string>
28 : #include <unordered_map>
29 : #include <utility>
30 : #include <vector>
31 :
32 : namespace dunedaq::appfwk {
33 :
34 25 : DAQModuleManager::DAQModuleManager(const std::string& session_name)
35 25 : : m_session_name(session_name)
36 50 : , m_initialized(false)
37 : {
38 25 : }
39 :
40 : void
41 19 : DAQModuleManager::initialize(std::shared_ptr<ConfigurationManager> cfgMgr, opmonlib::OpMonManager& opm)
42 : {
43 19 : set_config_mgr(cfgMgr);
44 19 : cfgMgr->initialize();
45 36 : get_iomanager()->configure(m_session_name,
46 : m_configuration_mgr->get_queues(),
47 : m_configuration_mgr->get_networkconnections(),
48 18 : m_configuration_mgr->get_connectivity_service(),
49 : opm);
50 18 : init_modules(m_configuration_mgr->get_modules(), opm);
51 :
52 18 : validate_action_plans();
53 :
54 16 : this->m_initialized = true;
55 16 : }
56 :
57 : std::optional<ValidationReport>
58 33 : DAQModuleManager::check_mod_has_cmd(const std::string& cmd,
59 : const std::string& mod_class,
60 : bool is_optional,
61 : const std::string& mod_id,
62 : bool throw_on_fatal)
63 : {
64 33 : std::string app = m_configuration_mgr->get_app_name();
65 :
66 33 : if (!m_modules_by_type.count(mod_class) || m_modules_by_type[mod_class].size() == 0) {
67 2 : if (is_optional) {
68 1 : ValidationReport report(ValidationReport::Severity::Ignored,
69 : app,
70 : mod_class,
71 : cmd,
72 1 : "No modules of class " + mod_class + " in application (optional step)");
73 :
74 1 : return report;
75 1 : }
76 1 : if (mod_id == "") {
77 1 : ValidationReport report(ValidationReport::Severity::Warning,
78 : app,
79 : mod_class,
80 : cmd,
81 1 : "No modules of class " + mod_class + " in application!");
82 1 : ers::warning(ActionPlanValidationFailed(ERS_HERE, report.get_command(), report.get_module(), report.get_message()));
83 1 : return report;
84 1 : } else {
85 0 : ValidationReport report(ValidationReport::Severity::Fatal,
86 : app,
87 : mod_class,
88 : cmd,
89 0 : "No modules of class " + mod_class + " in application!");
90 0 : if (throw_on_fatal)
91 0 : throw ActionPlanValidationFailed(
92 0 : ERS_HERE, report.get_command(), report.get_module(), report.get_message());
93 : else
94 0 : ers::error(ActionPlanValidationFailed(
95 0 : ERS_HERE, report.get_command(), report.get_module(), report.get_message()));
96 0 : return report;
97 0 : }
98 : }
99 :
100 31 : auto module_test = m_module_map[m_modules_by_type[mod_class][0]];
101 31 : if (mod_id != "") {
102 8 : bool match = false;
103 12 : for (auto& mod_name : m_modules_by_type[mod_class]) {
104 10 : if (mod_id == mod_name) {
105 6 : module_test = m_module_map[mod_name];
106 6 : match = true;
107 6 : break;
108 : }
109 : }
110 8 : if (!match && !is_optional) {
111 1 : ValidationReport report(
112 1 : ValidationReport::Severity::Fatal, app, mod_class, cmd, "No module with id " + mod_id + " found.");
113 :
114 1 : if (throw_on_fatal)
115 1 : throw ActionPlanValidationFailed(ERS_HERE, report.get_command(), report.get_module(), report.get_message());
116 : else
117 0 : ers::error(
118 0 : ActionPlanValidationFailed(ERS_HERE, report.get_command(), report.get_module(), report.get_message()));
119 0 : return report;
120 1 : }
121 : }
122 :
123 30 : if (!module_test->has_command(cmd)) {
124 1 : ValidationReport report(
125 1 : ValidationReport::Severity::Fatal, app, mod_class, cmd, "Module does not have command " + cmd + " registered.");
126 :
127 1 : if (throw_on_fatal)
128 1 : throw ActionPlanValidationFailed(ERS_HERE, report.get_command(), report.get_module(), report.get_message());
129 : else
130 0 : ers::error(ActionPlanValidationFailed(ERS_HERE, report.get_command(), report.get_module(), report.get_message()));
131 0 : return report;
132 1 : }
133 29 : return {};
134 35 : }
135 :
136 : void
137 18 : DAQModuleManager::construct_modules(const std::vector<const dunedaq::confmodel::DaqModule*>& modules)
138 : {
139 39 : for (const auto mod : modules) {
140 21 : TLOG_DEBUG(0) << "construct: " << mod->class_name() << " : " << mod->UID();
141 21 : auto mptr = make_module(mod->class_name(), mod->UID());
142 : // Once constructed, DAQModules should not try to regsiter any more commands
143 21 : mptr->set_command_registration_allowed(false);
144 21 : m_module_map.emplace(mod->UID(), mptr);
145 :
146 21 : if (!m_modules_by_type.count(mod->class_name())) {
147 17 : m_modules_by_type[mod->class_name()] = std::vector<std::string>();
148 : }
149 21 : m_modules_by_type[mod->class_name()].emplace_back(mod->UID());
150 21 : }
151 18 : }
152 :
153 : void
154 18 : DAQModuleManager::init_modules(const std::vector<const dunedaq::confmodel::DaqModule*>& modules,
155 : opmonlib::OpMonManager& opm)
156 : {
157 18 : construct_modules(modules);
158 :
159 39 : for (const auto mod : modules) {
160 21 : auto mptr = m_module_map[mod->UID()];
161 21 : opm.register_node(mod->UID(), mptr);
162 :
163 21 : try {
164 21 : mptr->init(m_configuration_mgr);
165 0 : } catch (ers::Issue& ex) {
166 0 : throw DAQModuleInitFailed(ERS_HERE, mod->UID(), ex);
167 0 : }
168 21 : }
169 18 : }
170 :
171 : std::vector<ValidationReport>
172 18 : DAQModuleManager::validate_action_plans(bool throw_on_fatal)
173 : {
174 18 : std::vector<ValidationReport> reports;
175 18 : std::string app = m_configuration_mgr->get_app_name();
176 :
177 45 : for (auto& plan_pair : m_configuration_mgr->get_action_plans()) {
178 29 : auto cmd = plan_pair.first;
179 29 : TLOG_DEBUG(0) << app << ": Checking action plan " << cmd;
180 29 : std::map<std::string, std::set<std::string>> modules_with_cmd;
181 57 : for (const auto& [mod_type, module_list] : m_modules_by_type) {
182 28 : if (module_list.size() > 0 && m_module_map[module_list[0]]->has_command(cmd)) {
183 27 : modules_with_cmd[mod_type] = std::set<std::string>(module_list.begin(), module_list.end());
184 : }
185 : }
186 :
187 56 : for (auto& step : plan_pair.second->get_steps()) {
188 29 : auto byType = step->cast<confmodel::DaqModulesGroupByType>();
189 29 : auto byMod = step->cast<confmodel::DaqModulesGroupById>();
190 29 : if (byType != nullptr) {
191 49 : for (auto& mod_type : byType->get_modules()) {
192 25 : auto report = check_mod_has_cmd(cmd, mod_type, byType->get_optional(), "", throw_on_fatal);
193 24 : if (report)
194 2 : reports.push_back(report.value());
195 24 : modules_with_cmd.erase(mod_type);
196 24 : }
197 4 : } else if (byMod != nullptr) {
198 11 : for (auto& mod : byMod->get_modules()) {
199 8 : auto report = check_mod_has_cmd(cmd, mod->class_name(), byMod->get_optional(), mod->UID(), throw_on_fatal);
200 7 : if (report)
201 0 : reports.push_back(report.value());
202 7 : modules_with_cmd[mod->class_name()].erase(mod->UID());
203 7 : }
204 : } else {
205 0 : reports.emplace_back(
206 0 : ValidationReport::Severity::Fatal, app, "N/A", cmd, "Invalid subclass of DaqModulesGroup encountered!");
207 0 : if (throw_on_fatal)
208 0 : throw ActionPlanValidationFailed(
209 0 : ERS_HERE, reports.back().get_command(), reports.back().get_module(), reports.back().get_message());
210 : else
211 0 : ers::error(ActionPlanValidationFailed(
212 0 : ERS_HERE, reports.back().get_command(), reports.back().get_module(), reports.back().get_message()));
213 : }
214 : }
215 :
216 31 : for (const auto& [mod_type, module_list] : modules_with_cmd) {
217 6 : for (auto& mod : module_list) {
218 2 : reports.emplace_back(ValidationReport::Severity::Error,
219 : app,
220 : mod_type,
221 : cmd,
222 4 : "ActionPlan is defined, module has command, but module " + mod + " is not in any steps");
223 2 : ers::error(ActionPlanValidationFailed(
224 4 : ERS_HERE, reports.back().get_command(), reports.back().get_module(), reports.back().get_message()));
225 : }
226 : }
227 31 : }
228 :
229 16 : return reports;
230 20 : }
231 :
232 : void
233 1 : DAQModuleManager::cleanup()
234 : {
235 1 : get_iomanager()->reset();
236 1 : this->m_initialized = false;
237 1 : }
238 :
239 : DAQModule::CommandData_t
240 15 : DAQModuleManager::get_command_data_for_module(const std::string& mod_name, const DAQModule::CommandData_t& cmd_data)
241 : {
242 15 : auto cmd_obj = cmd_data.get<cmd::CmdObj>();
243 15 : const DAQModule::CommandData_t dummy{};
244 :
245 15 : if (!cmd_obj.modules.empty()) {
246 3 : for (const auto& addressed : cmd_obj.modules) {
247 :
248 : // First exception: empty = `all`
249 3 : if (addressed.match.empty()) {
250 3 : return static_cast<DAQModule::CommandData_t>(addressed.data);
251 : } else {
252 : // match module name with regex
253 3 : if (std::regex_match(mod_name, std::regex(addressed.match))) {
254 3 : return static_cast<DAQModule::CommandData_t>(addressed.data);
255 : }
256 : }
257 : }
258 : }
259 : // No matches
260 12 : return dummy;
261 15 : }
262 :
263 : bool
264 14 : DAQModuleManager::execute_action(const std::string& module_name,
265 : const std::string& action,
266 : const DAQModule::CommandData_t& command_data)
267 : {
268 14 : try {
269 14 : TLOG_DEBUG(2) << "Executing " << module_name << " -> " << action;
270 14 : m_module_map[module_name]->execute_command(action, command_data);
271 4 : } catch (ers::Issue& ex) {
272 4 : ers::error(ex);
273 4 : return false;
274 4 : }
275 : return true;
276 : }
277 :
278 : void
279 12 : DAQModuleManager::execute_action_plan_step(std::string const& cmd,
280 : const confmodel::DaqModulesGroup* step,
281 : const DAQModule::CommandData_t& cmd_data,
282 : bool execution_mode_is_serial)
283 : {
284 12 : std::string failed_mod_names("");
285 12 : std::unordered_map<std::string, std::future<bool>> futures;
286 :
287 12 : auto byType = step->cast<confmodel::DaqModulesGroupByType>();
288 12 : auto byMod = step->cast<confmodel::DaqModulesGroupById>();
289 12 : if (byType != nullptr) {
290 18 : for (auto& mod_class : byType->get_modules()) {
291 9 : auto modules = m_modules_by_type[mod_class];
292 18 : for (auto& mod_name : modules) {
293 9 : auto command_data = get_command_data_for_module(mod_name, cmd_data);
294 9 : TLOG_DEBUG(1) << "Executing action " << cmd << " on module " << mod_name << " (class " << mod_class << ")";
295 9 : futures[mod_name] =
296 18 : std::async(std::launch::async, &DAQModuleManager::execute_action, this, mod_name, cmd, command_data);
297 9 : if (execution_mode_is_serial)
298 0 : futures[mod_name].wait();
299 9 : }
300 9 : }
301 3 : } else if (byMod != nullptr) {
302 9 : for (auto& mod : byMod->get_modules()) {
303 6 : auto mod_name = mod->UID();
304 6 : auto command_data = get_command_data_for_module(mod_name, cmd_data);
305 :
306 6 : if (byMod->get_optional() && !m_module_map.count(mod_name)) {
307 1 : continue;
308 : }
309 :
310 5 : TLOG_DEBUG(1) << "Executing action " << cmd << " on module " << mod_name << " (class " << mod->class_name()
311 5 : << ")";
312 5 : futures[mod_name] =
313 10 : std::async(std::launch::async, &DAQModuleManager::execute_action, this, mod_name, cmd, command_data);
314 5 : if (execution_mode_is_serial)
315 0 : futures[mod_name].wait();
316 6 : }
317 : } else {
318 0 : throw CommandDispatchingFailed(ERS_HERE, cmd, "Could not get DaqModulesGroup!");
319 : }
320 :
321 26 : for (auto& future : futures) {
322 14 : future.second.wait();
323 14 : auto ret = future.second.get();
324 14 : if (!ret) {
325 4 : failed_mod_names.append(future.first);
326 4 : failed_mod_names.append(", ");
327 : }
328 : }
329 : // Throw if any dispatching failed
330 12 : if (!failed_mod_names.empty()) {
331 3 : throw CommandDispatchingFailed(ERS_HERE, cmd, failed_mod_names);
332 : }
333 15 : }
334 :
335 : std::vector<std::string>
336 19 : DAQModuleManager::get_modnames_by_cmdid(cmdlib::cmd::CmdId id)
337 : {
338 : // Make a convenience array with module names that have the requested command
339 19 : std::vector<std::string> mod_names;
340 42 : for (const auto& [mod_name, mod_ptr] : m_module_map) {
341 23 : if (mod_ptr->has_command(id))
342 17 : mod_names.push_back(mod_name);
343 : }
344 :
345 19 : return mod_names;
346 0 : }
347 :
348 : void
349 16 : DAQModuleManager::check_command_data(const std::string& id, const DAQModule::CommandData_t& cmd_data)
350 : {
351 : // This method ensures that each module is only matched once per command.
352 : // If multiple matches are found, an ers::Issue is thrown
353 : // Disclaimenr for the occasional reader: this is the first implementation of the
354 : // multiple-matches detection logic. The author is painfully aware that it can be
355 : // vastly improved, in style if not in performance.
356 :
357 16 : auto cmd_obj = cmd_data.get<cmd::CmdObj>();
358 16 : const DAQModule::CommandData_t dummy{};
359 :
360 : // Make a convenience array with module names that have the requested command
361 16 : std::vector<std::string> cmd_mod_names = get_modnames_by_cmdid(id);
362 :
363 : // containers for error tracking
364 16 : std::map<std::string, std::vector<std::string>> mod_to_re;
365 :
366 16 : if (!cmd_obj.modules.empty()) {
367 17 : for (const auto& addressed : cmd_obj.modules) {
368 10 : if (!addressed.match.empty()) {
369 : // Find module names matching the regex
370 14 : for (const std::string& mod_name : cmd_mod_names) {
371 : // match module name with regex
372 7 : if (std::regex_match(mod_name, std::regex(addressed.match))) {
373 5 : mod_to_re[mod_name].push_back(addressed.match);
374 : }
375 : }
376 : }
377 : }
378 :
379 : // Select modules with multiple matches
380 11 : for (auto i = mod_to_re.begin(), last = mod_to_re.end(); i != last;) {
381 4 : if (i->second.size() == 1) {
382 3 : i = mod_to_re.erase(i);
383 : } else {
384 1 : ++i;
385 : }
386 : }
387 :
388 : // Catch cases
389 7 : if (mod_to_re.size() > 0) {
390 1 : std::string mod_names;
391 2 : for (const auto& [mod_name, matched_re] : mod_to_re) {
392 1 : mod_names += mod_name + ", ";
393 : }
394 1 : throw ConflictingCommandMatching(ERS_HERE, id, mod_names);
395 1 : }
396 : }
397 19 : }
398 :
399 : void
400 18 : DAQModuleManager::execute(const std::string& cmd, const DAQModule::CommandData_t& cmd_data)
401 : {
402 :
403 18 : TLOG_DEBUG(1) << "Command id:" << cmd;
404 :
405 18 : if (!m_initialized) {
406 2 : throw DAQModuleManagerNotInitialized(ERS_HERE, cmd);
407 : }
408 :
409 16 : check_command_data(cmd, cmd_data);
410 :
411 15 : auto action_plan = m_configuration_mgr->get_action_plan(cmd);
412 15 : if (action_plan == nullptr) {
413 3 : if (ACTION_PLANS_REQUIRED) {
414 : throw ActionPlanNotFound(ERS_HERE, cmd, "Throwing exception");
415 3 : } else if (ACTION_PLANS_REQUIRED_WARNING) {
416 : ers::warning(ActionPlanNotFound(ERS_HERE, cmd, "Returning without executing actions"));
417 : return;
418 : } else {
419 : // Emulate old behavior
420 3 : TLOG_DEBUG(1) << ActionPlanNotFound(ERS_HERE, cmd, "Executing action on all modules in parallel");
421 3 : std::string failed_mod_names("");
422 3 : std::unordered_map<std::string, std::future<bool>> futures;
423 :
424 3 : auto mods = get_modnames_by_cmdid(cmd);
425 3 : for (auto& mod : mods) {
426 0 : TLOG_DEBUG(1) << "Executing action " << cmd << " on module " << mod;
427 0 : auto command_data = get_command_data_for_module(mod, cmd_data);
428 0 : futures[mod] = std::async(std::launch::async, &DAQModuleManager::execute_action, this, mod, cmd, command_data);
429 0 : }
430 :
431 3 : for (auto& future : futures) {
432 0 : future.second.wait();
433 0 : auto ret = future.second.get();
434 0 : if (!ret) {
435 0 : failed_mod_names.append(future.first);
436 0 : failed_mod_names.append(", ");
437 : }
438 : }
439 : // Throw if any dispatching failed
440 3 : if (!failed_mod_names.empty()) {
441 0 : throw CommandDispatchingFailed(ERS_HERE, cmd, failed_mod_names);
442 : }
443 3 : }
444 : } else {
445 12 : auto execution_policy = action_plan->get_execution_policy();
446 12 : auto serial_execution = execution_policy == "modules-in-series";
447 :
448 : // We validated the action plans already
449 21 : for (auto& step : action_plan->get_steps()) {
450 12 : execute_action_plan_step(cmd, step, cmd_data, serial_execution);
451 : }
452 12 : }
453 :
454 : // Shutdown IOManager at scrap
455 12 : if (cmd == "scrap") {
456 0 : get_iomanager()->shutdown();
457 : }
458 : }
459 :
460 : } // namespace dunedaq::appfwk
|