DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
DAQModuleManager.cpp
Go to the documentation of this file.
1
10
11#include "appfwk/DAQModule.hpp"
12#include "appfwk/cmd/Nljs.hpp"
13
14#include "cmdlib/cmd/Nljs.hpp"
18#include "confmodel/Session.hpp"
20#include "logging/Logging.hpp"
21
22#include <future>
23#include <map>
24#include <memory>
25#include <regex>
26#include <set>
27#include <string>
28#include <unordered_map>
29#include <utility>
30#include <vector>
31
32namespace dunedaq::appfwk {
33
34DAQModuleManager::DAQModuleManager(const std::string& session_name)
35 : m_session_name(session_name)
36 , m_initialized(false)
37{
38}
39
40void
41DAQModuleManager::initialize(std::shared_ptr<ConfigurationManager> cfgMgr, opmonlib::OpMonManager& opm)
42{
43 set_config_mgr(cfgMgr);
44 cfgMgr->initialize();
45 get_iomanager()->configure(m_session_name,
46 m_configuration_mgr->get_queues(),
47 m_configuration_mgr->get_networkconnections(),
48 m_configuration_mgr->get_connectivity_service(),
49 opm);
50 init_modules(m_configuration_mgr->get_modules(), opm);
51
52 validate_action_plans();
53
54 this->m_initialized = true;
55}
56
57std::optional<ValidationReport>
58DAQModuleManager::check_mod_has_cmd(const std::string& cmd,
59 const std::string& mod_class,
60 bool is_optional,
61 const std::string& mod_id,
62 bool throw_on_fatal)
63{
64 std::string app = m_configuration_mgr->get_app_name();
65
66 if (!m_modules_by_type.count(mod_class) || m_modules_by_type[mod_class].size() == 0) {
67 if (is_optional) {
68 ValidationReport report(ValidationReport::Severity::Ignored,
69 app,
70 mod_class,
71 cmd,
72 "No modules of class " + mod_class + " in application (optional step)");
73
74 return report;
75 }
76 if (mod_id == "") {
77 ValidationReport report(ValidationReport::Severity::Warning,
78 app,
79 mod_class,
80 cmd,
81 "No modules of class " + mod_class + " in application!");
82 ers::warning(ActionPlanValidationFailed(ERS_HERE, report.get_command(), report.get_module(), report.get_message()));
83 return report;
84 } else {
85 ValidationReport report(ValidationReport::Severity::Fatal,
86 app,
87 mod_class,
88 cmd,
89 "No modules of class " + mod_class + " in application!");
90 if (throw_on_fatal)
91 throw ActionPlanValidationFailed(
92 ERS_HERE, report.get_command(), report.get_module(), report.get_message());
93 else
94 ers::error(ActionPlanValidationFailed(
95 ERS_HERE, report.get_command(), report.get_module(), report.get_message()));
96 return report;
97 }
98 }
99
100 auto module_test = m_module_map[m_modules_by_type[mod_class][0]];
101 if (mod_id != "") {
102 bool match = false;
103 for (auto& mod_name : m_modules_by_type[mod_class]) {
104 if (mod_id == mod_name) {
105 module_test = m_module_map[mod_name];
106 match = true;
107 break;
108 }
109 }
110 if (!match && !is_optional) {
111 ValidationReport report(
112 ValidationReport::Severity::Fatal, app, mod_class, cmd, "No module with id " + mod_id + " found.");
113
114 if (throw_on_fatal)
115 throw ActionPlanValidationFailed(ERS_HERE, report.get_command(), report.get_module(), report.get_message());
116 else
118 ActionPlanValidationFailed(ERS_HERE, report.get_command(), report.get_module(), report.get_message()));
119 return report;
120 }
121 }
122
123 if (!module_test->has_command(cmd)) {
124 ValidationReport report(
125 ValidationReport::Severity::Fatal, app, mod_class, cmd, "Module does not have command " + cmd + " registered.");
126
127 if (throw_on_fatal)
128 throw ActionPlanValidationFailed(ERS_HERE, report.get_command(), report.get_module(), report.get_message());
129 else
130 ers::error(ActionPlanValidationFailed(ERS_HERE, report.get_command(), report.get_module(), report.get_message()));
131 return report;
132 }
133 return {};
134}
135
136void
137DAQModuleManager::construct_modules(const std::vector<const dunedaq::confmodel::DaqModule*>& modules)
138{
139 for (const auto mod : modules) {
140 TLOG_DEBUG(0) << "construct: " << mod->class_name() << " : " << mod->UID();
141 auto mptr = make_module(mod->class_name(), mod->UID());
142 // Once constructed, DAQModules should not try to regsiter any more commands
143 mptr->set_command_registration_allowed(false);
144 m_module_map.emplace(mod->UID(), mptr);
145
146 if (!m_modules_by_type.count(mod->class_name())) {
147 m_modules_by_type[mod->class_name()] = std::vector<std::string>();
148 }
149 m_modules_by_type[mod->class_name()].emplace_back(mod->UID());
150 }
151}
152
153void
154DAQModuleManager::init_modules(const std::vector<const dunedaq::confmodel::DaqModule*>& modules,
155 opmonlib::OpMonManager& opm)
156{
157 construct_modules(modules);
158
159 for (const auto mod : modules) {
160 auto mptr = m_module_map[mod->UID()];
161 opm.register_node(mod->UID(), mptr);
162
163 try {
164 mptr->init(m_configuration_mgr);
165 } catch (ers::Issue& ex) {
166 throw DAQModuleInitFailed(ERS_HERE, mod->UID(), ex);
167 }
168 }
169}
170
171std::vector<ValidationReport>
172DAQModuleManager::validate_action_plans(bool throw_on_fatal)
173{
174 std::vector<ValidationReport> reports;
175 std::string app = m_configuration_mgr->get_app_name();
176
177 for (auto& plan_pair : m_configuration_mgr->get_action_plans()) {
178 auto cmd = plan_pair.first;
179 TLOG_DEBUG(0) << app << ": Checking action plan " << cmd;
180 std::map<std::string, std::set<std::string>> modules_with_cmd;
181 for (const auto& [mod_type, module_list] : m_modules_by_type) {
182 if (module_list.size() > 0 && m_module_map[module_list[0]]->has_command(cmd)) {
183 modules_with_cmd[mod_type] = std::set<std::string>(module_list.begin(), module_list.end());
184 }
185 }
186
187 for (auto& step : plan_pair.second->get_steps()) {
188 auto byType = step->cast<confmodel::DaqModulesGroupByType>();
189 auto byMod = step->cast<confmodel::DaqModulesGroupById>();
190 if (byType != nullptr) {
191 for (auto& mod_type : byType->get_modules()) {
192 auto report = check_mod_has_cmd(cmd, mod_type, byType->get_optional(), "", throw_on_fatal);
193 if (report)
194 reports.push_back(report.value());
195 modules_with_cmd.erase(mod_type);
196 }
197 } else if (byMod != nullptr) {
198 for (auto& mod : byMod->get_modules()) {
199 auto report = check_mod_has_cmd(cmd, mod->class_name(), byMod->get_optional(), mod->UID(), throw_on_fatal);
200 if (report)
201 reports.push_back(report.value());
202 modules_with_cmd[mod->class_name()].erase(mod->UID());
203 }
204 } else {
205 reports.emplace_back(
206 ValidationReport::Severity::Fatal, app, "N/A", cmd, "Invalid subclass of DaqModulesGroup encountered!");
207 if (throw_on_fatal)
208 throw ActionPlanValidationFailed(
209 ERS_HERE, reports.back().get_command(), reports.back().get_module(), reports.back().get_message());
210 else
211 ers::error(ActionPlanValidationFailed(
212 ERS_HERE, reports.back().get_command(), reports.back().get_module(), reports.back().get_message()));
213 }
214 }
215
216 for (const auto& [mod_type, module_list] : modules_with_cmd) {
217 for (auto& mod : module_list) {
218 reports.emplace_back(ValidationReport::Severity::Error,
219 app,
220 mod_type,
221 cmd,
222 "ActionPlan is defined, module has command, but module " + mod + " is not in any steps");
223 ers::error(ActionPlanValidationFailed(
224 ERS_HERE, reports.back().get_command(), reports.back().get_module(), reports.back().get_message()));
225 }
226 }
227 }
228
229 return reports;
230}
231
232void
233DAQModuleManager::cleanup()
234{
235 get_iomanager()->reset();
236 this->m_initialized = false;
237}
238
239DAQModule::CommandData_t
240DAQModuleManager::get_command_data_for_module(const std::string& mod_name, const DAQModule::CommandData_t& cmd_data)
241{
242 auto cmd_obj = cmd_data.get<cmd::CmdObj>();
243 const DAQModule::CommandData_t dummy{};
244
245 if (!cmd_obj.modules.empty()) {
246 for (const auto& addressed : cmd_obj.modules) {
247
248 // First exception: empty = `all`
249 if (addressed.match.empty()) {
250 return static_cast<DAQModule::CommandData_t>(addressed.data);
251 } else {
252 // match module name with regex
253 if (std::regex_match(mod_name, std::regex(addressed.match))) {
254 return static_cast<DAQModule::CommandData_t>(addressed.data);
255 }
256 }
257 }
258 }
259 // No matches
260 return dummy;
261}
262
263bool
264DAQModuleManager::execute_action(const std::string& module_name,
265 const std::string& action,
266 const DAQModule::CommandData_t& command_data)
267{
268 try {
269 TLOG_DEBUG(2) << "Executing " << module_name << " -> " << action;
270 m_module_map[module_name]->execute_command(action, command_data);
271 } catch (ers::Issue& ex) {
272 ers::error(ex);
273 return false;
274 }
275 return true;
276}
277
278void
279DAQModuleManager::execute_action_plan_step(std::string const& cmd,
280 const confmodel::DaqModulesGroup* step,
281 const DAQModule::CommandData_t& cmd_data,
282 bool execution_mode_is_serial)
283{
284 std::string failed_mod_names("");
285 std::unordered_map<std::string, std::future<bool>> futures;
286
287 auto byType = step->cast<confmodel::DaqModulesGroupByType>();
288 auto byMod = step->cast<confmodel::DaqModulesGroupById>();
289 if (byType != nullptr) {
290 for (auto& mod_class : byType->get_modules()) {
291 auto modules = m_modules_by_type[mod_class];
292 for (auto& mod_name : modules) {
293 auto command_data = get_command_data_for_module(mod_name, cmd_data);
294 TLOG_DEBUG(1) << "Executing action " << cmd << " on module " << mod_name << " (class " << mod_class << ")";
295 futures[mod_name] =
296 std::async(std::launch::async, &DAQModuleManager::execute_action, this, mod_name, cmd, command_data);
297 if (execution_mode_is_serial)
298 futures[mod_name].wait();
299 }
300 }
301 } else if (byMod != nullptr) {
302 for (auto& mod : byMod->get_modules()) {
303 auto mod_name = mod->UID();
304 auto command_data = get_command_data_for_module(mod_name, cmd_data);
305
306 if (byMod->get_optional() && !m_module_map.count(mod_name)) {
307 continue;
308 }
309
310 TLOG_DEBUG(1) << "Executing action " << cmd << " on module " << mod_name << " (class " << mod->class_name()
311 << ")";
312 futures[mod_name] =
313 std::async(std::launch::async, &DAQModuleManager::execute_action, this, mod_name, cmd, command_data);
314 if (execution_mode_is_serial)
315 futures[mod_name].wait();
316 }
317 } else {
318 throw CommandDispatchingFailed(ERS_HERE, cmd, "Could not get DaqModulesGroup!");
319 }
320
321 for (auto& future : futures) {
322 future.second.wait();
323 auto ret = future.second.get();
324 if (!ret) {
325 failed_mod_names.append(future.first);
326 failed_mod_names.append(", ");
327 }
328 }
329 // Throw if any dispatching failed
330 if (!failed_mod_names.empty()) {
331 throw CommandDispatchingFailed(ERS_HERE, cmd, failed_mod_names);
332 }
333}
334
335std::vector<std::string>
336DAQModuleManager::get_modnames_by_cmdid(cmdlib::cmd::CmdId id)
337{
338 // Make a convenience array with module names that have the requested command
339 std::vector<std::string> mod_names;
340 for (const auto& [mod_name, mod_ptr] : m_module_map) {
341 if (mod_ptr->has_command(id))
342 mod_names.push_back(mod_name);
343 }
344
345 return mod_names;
346}
347
348void
349DAQModuleManager::check_command_data(const std::string& id, const DAQModule::CommandData_t& cmd_data)
350{
351 // This method ensures that each module is only matched once per command.
352 // If multiple matches are found, an ers::Issue is thrown
353 // Disclaimenr for the occasional reader: this is the first implementation of the
354 // multiple-matches detection logic. The author is painfully aware that it can be
355 // vastly improved, in style if not in performance.
356
357 auto cmd_obj = cmd_data.get<cmd::CmdObj>();
358 const DAQModule::CommandData_t dummy{};
359
360 // Make a convenience array with module names that have the requested command
361 std::vector<std::string> cmd_mod_names = get_modnames_by_cmdid(id);
362
363 // containers for error tracking
364 std::map<std::string, std::vector<std::string>> mod_to_re;
365
366 if (!cmd_obj.modules.empty()) {
367 for (const auto& addressed : cmd_obj.modules) {
368 if (!addressed.match.empty()) {
369 // Find module names matching the regex
370 for (const std::string& mod_name : cmd_mod_names) {
371 // match module name with regex
372 if (std::regex_match(mod_name, std::regex(addressed.match))) {
373 mod_to_re[mod_name].push_back(addressed.match);
374 }
375 }
376 }
377 }
378
379 // Select modules with multiple matches
380 for (auto i = mod_to_re.begin(), last = mod_to_re.end(); i != last;) {
381 if (i->second.size() == 1) {
382 i = mod_to_re.erase(i);
383 } else {
384 ++i;
385 }
386 }
387
388 // Catch cases
389 if (mod_to_re.size() > 0) {
390 std::string mod_names;
391 for (const auto& [mod_name, matched_re] : mod_to_re) {
392 mod_names += mod_name + ", ";
393 }
394 throw ConflictingCommandMatching(ERS_HERE, id, mod_names);
395 }
396 }
397}
398
399void
400DAQModuleManager::execute(const std::string& cmd, const DAQModule::CommandData_t& cmd_data)
401{
402
403 TLOG_DEBUG(1) << "Command id:" << cmd;
404
405 if (!m_initialized) {
406 throw DAQModuleManagerNotInitialized(ERS_HERE, cmd);
407 }
408
409 check_command_data(cmd, cmd_data);
410
411 auto action_plan = m_configuration_mgr->get_action_plan(cmd);
412 if (action_plan == nullptr) {
413 if (ACTION_PLANS_REQUIRED) {
414 throw ActionPlanNotFound(ERS_HERE, cmd, "Throwing exception");
415 } else if (ACTION_PLANS_REQUIRED_WARNING) {
416 ers::warning(ActionPlanNotFound(ERS_HERE, cmd, "Returning without executing actions"));
417 return;
418 } else {
419 // Emulate old behavior
420 TLOG_DEBUG(1) << ActionPlanNotFound(ERS_HERE, cmd, "Executing action on all modules in parallel");
421 std::string failed_mod_names("");
422 std::unordered_map<std::string, std::future<bool>> futures;
423
424 auto mods = get_modnames_by_cmdid(cmd);
425 for (auto& mod : mods) {
426 TLOG_DEBUG(1) << "Executing action " << cmd << " on module " << mod;
427 auto command_data = get_command_data_for_module(mod, cmd_data);
428 futures[mod] = std::async(std::launch::async, &DAQModuleManager::execute_action, this, mod, cmd, command_data);
429 }
430
431 for (auto& future : futures) {
432 future.second.wait();
433 auto ret = future.second.get();
434 if (!ret) {
435 failed_mod_names.append(future.first);
436 failed_mod_names.append(", ");
437 }
438 }
439 // Throw if any dispatching failed
440 if (!failed_mod_names.empty()) {
441 throw CommandDispatchingFailed(ERS_HERE, cmd, failed_mod_names);
442 }
443 }
444 } else {
445 auto execution_policy = action_plan->get_execution_policy();
446 auto serial_execution = execution_policy == "modules-in-series";
447
448 // We validated the action plans already
449 for (auto& step : action_plan->get_steps()) {
450 execute_action_plan_step(cmd, step, cmd_data, serial_execution);
451 }
452 }
453
454 // Shutdown IOManager at scrap
455 if (cmd == "scrap") {
456 get_iomanager()->shutdown();
457 }
458}
459
460} // namespace dunedaq::appfwk
#define ERS_HERE
Base class for any user define issue.
Definition Issue.hpp:69
#define TLOG_DEBUG(lvl,...)
Definition Logging.hpp:112
dunedaq::conffwk::relationship_t match(T const &, T const &)
std::shared_ptr< DAQModule > make_module(std::string const &plugin_name, std::string const &instance_name)
Load a DAQModule plugin and return a shared_ptr to the contained DAQModule class.
Definition DAQModule.hxx:40
init Command received when already ERS_EMPTY ConflictingCommandMatching
void warning(const Issue &issue)
Definition ers.hpp:115
void error(const Issue &issue)
Definition ers.hpp:81