28#include <unordered_map>
34DAQModuleManager::DAQModuleManager(
const std::string& session_name)
35 : m_session_name(session_name)
36 , m_initialized(false)
41DAQModuleManager::initialize(std::shared_ptr<ConfigurationManager> cfgMgr, opmonlib::OpMonManager& opm)
43 set_config_mgr(cfgMgr);
45 get_iomanager()->configure(m_session_name,
46 m_configuration_mgr->get_queues(),
47 m_configuration_mgr->get_networkconnections(),
48 m_configuration_mgr->get_connectivity_service(),
50 init_modules(m_configuration_mgr->get_modules(), opm);
52 validate_action_plans();
54 this->m_initialized =
true;
57std::optional<ValidationReport>
58DAQModuleManager::check_mod_has_cmd(
const std::string& cmd,
59 const std::string& mod_class,
61 const std::string& mod_id,
64 std::string app = m_configuration_mgr->get_app_name();
66 if (!m_modules_by_type.count(mod_class) || m_modules_by_type[mod_class].size() == 0) {
68 ValidationReport report(ValidationReport::Severity::Ignored,
72 "No modules of class " + mod_class +
" in application (optional step)");
77 ValidationReport report(ValidationReport::Severity::Warning,
81 "No modules of class " + mod_class +
" in application!");
82 ers::warning(ActionPlanValidationFailed(
ERS_HERE, report.get_command(), report.get_module(), report.get_message()));
85 ValidationReport report(ValidationReport::Severity::Fatal,
89 "No modules of class " + mod_class +
" in application!");
91 throw ActionPlanValidationFailed(
92 ERS_HERE, report.get_command(), report.get_module(), report.get_message());
95 ERS_HERE, report.get_command(), report.get_module(), report.get_message()));
100 auto module_test = m_module_map[m_modules_by_type[mod_class][0]];
103 for (
auto& mod_name : m_modules_by_type[mod_class]) {
104 if (mod_id == mod_name) {
105 module_test = m_module_map[mod_name];
110 if (!match && !is_optional) {
111 ValidationReport report(
112 ValidationReport::Severity::Fatal, app, mod_class, cmd,
"No module with id " + mod_id +
" found.");
115 throw ActionPlanValidationFailed(
ERS_HERE, report.get_command(), report.get_module(), report.get_message());
118 ActionPlanValidationFailed(
ERS_HERE, report.get_command(), report.get_module(), report.get_message()));
123 if (!module_test->has_command(cmd)) {
124 ValidationReport report(
125 ValidationReport::Severity::Fatal, app, mod_class, cmd,
"Module does not have command " + cmd +
" registered.");
128 throw ActionPlanValidationFailed(
ERS_HERE, report.get_command(), report.get_module(), report.get_message());
130 ers::error(ActionPlanValidationFailed(
ERS_HERE, report.get_command(), report.get_module(), report.get_message()));
137DAQModuleManager::construct_modules(
const std::vector<const dunedaq::confmodel::DaqModule*>& modules)
139 for (
const auto mod : modules) {
140 TLOG_DEBUG(0) <<
"construct: " << mod->class_name() <<
" : " << mod->UID();
141 auto mptr =
make_module(mod->class_name(), mod->UID());
143 mptr->set_command_registration_allowed(
false);
144 m_module_map.emplace(mod->UID(), mptr);
146 if (!m_modules_by_type.count(mod->class_name())) {
147 m_modules_by_type[mod->class_name()] = std::vector<std::string>();
149 m_modules_by_type[mod->class_name()].emplace_back(mod->UID());
154DAQModuleManager::init_modules(
const std::vector<const dunedaq::confmodel::DaqModule*>& modules,
155 opmonlib::OpMonManager& opm)
157 construct_modules(modules);
159 for (
const auto mod : modules) {
160 auto mptr = m_module_map[mod->UID()];
161 opm.register_node(mod->UID(), mptr);
164 mptr->init(m_configuration_mgr);
166 throw DAQModuleInitFailed(
ERS_HERE, mod->UID(), ex);
171std::vector<ValidationReport>
172DAQModuleManager::validate_action_plans(
bool throw_on_fatal)
174 std::vector<ValidationReport> reports;
175 std::string app = m_configuration_mgr->get_app_name();
177 for (
auto& plan_pair : m_configuration_mgr->get_action_plans()) {
178 auto cmd = plan_pair.first;
179 TLOG_DEBUG(0) << app <<
": Checking action plan " << cmd;
180 std::map<std::string, std::set<std::string>> modules_with_cmd;
181 for (
const auto& [mod_type, module_list] : m_modules_by_type) {
182 if (module_list.size() > 0 && m_module_map[module_list[0]]->has_command(cmd)) {
183 modules_with_cmd[mod_type] = std::set<std::string>(module_list.begin(), module_list.end());
187 for (
auto& step : plan_pair.second->get_steps()) {
188 auto byType = step->cast<confmodel::DaqModulesGroupByType>();
189 auto byMod = step->cast<confmodel::DaqModulesGroupById>();
190 if (byType !=
nullptr) {
191 for (
auto& mod_type : byType->get_modules()) {
192 auto report = check_mod_has_cmd(cmd, mod_type, byType->get_optional(),
"", throw_on_fatal);
194 reports.push_back(report.value());
195 modules_with_cmd.erase(mod_type);
197 }
else if (byMod !=
nullptr) {
198 for (
auto& mod : byMod->get_modules()) {
199 auto report = check_mod_has_cmd(cmd, mod->class_name(), byMod->get_optional(), mod->UID(), throw_on_fatal);
201 reports.push_back(report.value());
202 modules_with_cmd[mod->class_name()].erase(mod->UID());
205 reports.emplace_back(
206 ValidationReport::Severity::Fatal, app,
"N/A", cmd,
"Invalid subclass of DaqModulesGroup encountered!");
208 throw ActionPlanValidationFailed(
209 ERS_HERE, reports.back().get_command(), reports.back().get_module(), reports.back().get_message());
212 ERS_HERE, reports.back().get_command(), reports.back().get_module(), reports.back().get_message()));
216 for (
const auto& [mod_type, module_list] : modules_with_cmd) {
217 for (
auto& mod : module_list) {
218 reports.emplace_back(ValidationReport::Severity::Error,
222 "ActionPlan is defined, module has command, but module " + mod +
" is not in any steps");
224 ERS_HERE, reports.back().get_command(), reports.back().get_module(), reports.back().get_message()));
233DAQModuleManager::cleanup()
235 get_iomanager()->reset();
236 this->m_initialized =
false;
239DAQModule::CommandData_t
240DAQModuleManager::get_command_data_for_module(
const std::string& mod_name,
const DAQModule::CommandData_t& cmd_data)
242 auto cmd_obj = cmd_data.get<cmd::CmdObj>();
243 const DAQModule::CommandData_t dummy{};
245 if (!cmd_obj.modules.empty()) {
246 for (
const auto& addressed : cmd_obj.modules) {
249 if (addressed.match.empty()) {
250 return static_cast<DAQModule::CommandData_t
>(addressed.data);
253 if (std::regex_match(mod_name, std::regex(addressed.match))) {
254 return static_cast<DAQModule::CommandData_t
>(addressed.data);
264DAQModuleManager::execute_action(
const std::string& module_name,
265 const std::string& action,
266 const DAQModule::CommandData_t& command_data)
270 m_module_map[module_name]->execute_command(action, command_data);
279DAQModuleManager::execute_action_plan_step(std::string
const& cmd,
280 const confmodel::DaqModulesGroup* step,
281 const DAQModule::CommandData_t& cmd_data,
282 bool execution_mode_is_serial)
284 std::string failed_mod_names(
"");
285 std::unordered_map<std::string, std::future<bool>> futures;
287 auto byType = step->cast<confmodel::DaqModulesGroupByType>();
288 auto byMod = step->cast<confmodel::DaqModulesGroupById>();
289 if (byType !=
nullptr) {
290 for (
auto& mod_class : byType->get_modules()) {
291 auto modules = m_modules_by_type[mod_class];
292 for (
auto& mod_name : modules) {
293 auto command_data = get_command_data_for_module(mod_name, cmd_data);
294 TLOG_DEBUG(1) <<
"Executing action " << cmd <<
" on module " << mod_name <<
" (class " << mod_class <<
")";
296 std::async(std::launch::async, &DAQModuleManager::execute_action,
this, mod_name, cmd, command_data);
297 if (execution_mode_is_serial)
298 futures[mod_name].wait();
301 }
else if (byMod !=
nullptr) {
302 for (
auto& mod : byMod->get_modules()) {
303 auto mod_name = mod->UID();
304 auto command_data = get_command_data_for_module(mod_name, cmd_data);
306 if (byMod->get_optional() && !m_module_map.count(mod_name)) {
310 TLOG_DEBUG(1) <<
"Executing action " << cmd <<
" on module " << mod_name <<
" (class " << mod->class_name()
313 std::async(std::launch::async, &DAQModuleManager::execute_action,
this, mod_name, cmd, command_data);
314 if (execution_mode_is_serial)
315 futures[mod_name].wait();
318 throw CommandDispatchingFailed(
ERS_HERE, cmd,
"Could not get DaqModulesGroup!");
321 for (
auto& future : futures) {
322 future.second.wait();
323 auto ret = future.second.get();
325 failed_mod_names.append(future.first);
326 failed_mod_names.append(
", ");
330 if (!failed_mod_names.empty()) {
331 throw CommandDispatchingFailed(
ERS_HERE, cmd, failed_mod_names);
335std::vector<std::string>
336DAQModuleManager::get_modnames_by_cmdid(cmdlib::cmd::CmdId
id)
339 std::vector<std::string> mod_names;
340 for (
const auto& [mod_name, mod_ptr] : m_module_map) {
341 if (mod_ptr->has_command(
id))
342 mod_names.push_back(mod_name);
349DAQModuleManager::check_command_data(
const std::string&
id,
const DAQModule::CommandData_t& cmd_data)
357 auto cmd_obj = cmd_data.get<cmd::CmdObj>();
358 const DAQModule::CommandData_t dummy{};
361 std::vector<std::string> cmd_mod_names = get_modnames_by_cmdid(
id);
364 std::map<std::string, std::vector<std::string>> mod_to_re;
366 if (!cmd_obj.modules.empty()) {
367 for (
const auto& addressed : cmd_obj.modules) {
368 if (!addressed.match.empty()) {
370 for (
const std::string& mod_name : cmd_mod_names) {
372 if (std::regex_match(mod_name, std::regex(addressed.match))) {
373 mod_to_re[mod_name].push_back(addressed.match);
380 for (
auto i = mod_to_re.begin(), last = mod_to_re.end(); i != last;) {
381 if (i->second.size() == 1) {
382 i = mod_to_re.erase(i);
389 if (mod_to_re.size() > 0) {
390 std::string mod_names;
391 for (
const auto& [mod_name, matched_re] : mod_to_re) {
392 mod_names += mod_name +
", ";
400DAQModuleManager::execute(
const std::string& cmd,
const DAQModule::CommandData_t& cmd_data)
405 if (!m_initialized) {
406 throw DAQModuleManagerNotInitialized(
ERS_HERE, cmd);
409 check_command_data(cmd, cmd_data);
411 auto action_plan = m_configuration_mgr->get_action_plan(cmd);
412 if (action_plan ==
nullptr) {
413 if (ACTION_PLANS_REQUIRED) {
414 throw ActionPlanNotFound(
ERS_HERE, cmd,
"Throwing exception");
415 }
else if (ACTION_PLANS_REQUIRED_WARNING) {
420 TLOG_DEBUG(1) << ActionPlanNotFound(
ERS_HERE, cmd,
"Executing action on all modules in parallel");
421 std::string failed_mod_names(
"");
422 std::unordered_map<std::string, std::future<bool>> futures;
424 auto mods = get_modnames_by_cmdid(cmd);
425 for (
auto& mod : mods) {
426 TLOG_DEBUG(1) <<
"Executing action " << cmd <<
" on module " << mod;
427 auto command_data = get_command_data_for_module(mod, cmd_data);
428 futures[mod] = std::async(std::launch::async, &DAQModuleManager::execute_action,
this, mod, cmd, command_data);
431 for (
auto& future : futures) {
432 future.second.wait();
433 auto ret = future.second.get();
435 failed_mod_names.append(future.first);
436 failed_mod_names.append(
", ");
440 if (!failed_mod_names.empty()) {
441 throw CommandDispatchingFailed(
ERS_HERE, cmd, failed_mod_names);
445 auto execution_policy = action_plan->get_execution_policy();
446 auto serial_execution = execution_policy ==
"modules-in-series";
449 for (
auto& step : action_plan->get_steps()) {
450 execute_action_plan_step(cmd, step, cmd_data, serial_execution);
455 if (cmd ==
"scrap") {
456 get_iomanager()->shutdown();
Base class for any user define issue.
#define TLOG_DEBUG(lvl,...)
dunedaq::conffwk::relationship_t match(T const &, T const &)
std::shared_ptr< DAQModule > make_module(std::string const &plugin_name, std::string const &instance_name)
Load a DAQModule plugin and return a shared_ptr to the contained DAQModule class.
init Command received when already ERS_EMPTY ConflictingCommandMatching
void warning(const Issue &issue)
void error(const Issue &issue)