28#include <unordered_map>
34DAQModuleManager::DAQModuleManager(
const std::string& session_name)
35 : m_session_name(session_name)
36 , m_initialized(false)
41DAQModuleManager::initialize(std::shared_ptr<ConfigurationManager> cfgMgr, opmonlib::OpMonManager& opm)
43 m_configuration_mgr = cfgMgr;
45 get_iomanager()->configure(m_session_name,
46 m_configuration_mgr->queues(),
47 m_configuration_mgr->networkconnections(),
48 m_configuration_mgr->connectivity_service(),
50 init_modules(m_configuration_mgr->modules(), opm);
52 for (
auto& plan_pair : m_configuration_mgr->action_plans()) {
53 auto cmd = plan_pair.first;
54 std::map<std::string, std::set<std::string>> modules_with_cmd;
55 for (
const auto& [mod_type, module_list] : m_modules_by_type) {
56 if (module_list.size() > 0 && m_module_map[module_list[0]]->has_command(cmd)) {
57 modules_with_cmd[mod_type] = std::set<std::string>(module_list.begin(), module_list.end());
61 for (
auto& step : plan_pair.second->get_steps()) {
62 auto byType = step->cast<confmodel::DaqModulesGroupByType>();
63 auto byMod = step->cast<confmodel::DaqModulesGroupById>();
64 if (byType !=
nullptr) {
65 for (
auto& mod_type : byType->get_modules()) {
66 check_mod_has_cmd(cmd, mod_type, byType->get_optional());
67 modules_with_cmd.erase(mod_type);
69 }
else if (byMod !=
nullptr) {
70 for (
auto& mod : byMod->get_modules()) {
71 check_mod_has_cmd(cmd, mod->class_name(), byMod->get_optional(), mod->UID());
72 modules_with_cmd[mod->class_name()].erase(mod->UID());
75 throw ActionPlanValidationFailed(
ERS_HERE, cmd,
"",
"Invalid subclass of DaqModulesGroup encountered!");
79 for (
const auto& [mod_type, module_list] : modules_with_cmd) {
80 for (
auto& mod : module_list) {
82 ERS_HERE, cmd, mod,
"ActionPlan is defined, module has command, but module is not in any steps"));
86 this->m_initialized =
true;
90DAQModuleManager::check_mod_has_cmd(
const std::string& cmd,
91 const std::string& mod_class,
93 const std::string& mod_id)
95 if (!m_modules_by_type.count(mod_class) || m_modules_by_type[mod_class].size() == 0) {
100 ActionPlanValidationFailed(
ERS_HERE, cmd, mod_class,
"No modules of class " + mod_class +
" in application!"));
103 throw ActionPlanValidationFailed(
104 ERS_HERE, cmd, mod_class,
"No modules of class " + mod_class +
" in application!");
108 auto module_test = m_module_map[m_modules_by_type[mod_class][0]];
111 for (
auto& mod_name : m_modules_by_type[mod_class]) {
112 if (mod_id == mod_name) {
113 module_test = m_module_map[mod_name];
118 if (!match && !is_optional) {
119 throw ActionPlanValidationFailed(
ERS_HERE, cmd, mod_class,
"No module with id " + mod_id +
" found.");
123 if (!module_test->has_command(cmd)) {
124 throw ActionPlanValidationFailed(
ERS_HERE, cmd, mod_class,
"Module does not have command " + cmd +
" registered.");
129DAQModuleManager::init_modules(
const std::vector<const dunedaq::confmodel::DaqModule*>& modules,
130 opmonlib::OpMonManager& opm)
132 for (
const auto mod : modules) {
133 TLOG_DEBUG(0) <<
"construct: " << mod->class_name() <<
" : " << mod->UID();
134 auto mptr =
make_module(mod->class_name(), mod->UID());
136 mptr->set_command_registration_allowed(
false);
137 m_module_map.emplace(mod->UID(), mptr);
139 if (!m_modules_by_type.count(mod->class_name())) {
140 m_modules_by_type[mod->class_name()] = std::vector<std::string>();
142 m_modules_by_type[mod->class_name()].emplace_back(mod->UID());
144 opm.register_node(mod->UID(), mptr);
147 mptr->init(m_configuration_mgr);
149 throw DAQModuleInitFailed(
ERS_HERE, mod->UID(), ex);
155DAQModuleManager::cleanup()
157 get_iomanager()->reset();
158 this->m_initialized =
false;
161DAQModuleManager::dataobj_t
162DAQModuleManager::get_dataobj_for_module(
const std::string& mod_name,
const dataobj_t& cmd_data)
164 auto cmd_obj = cmd_data.get<cmd::CmdObj>();
165 const dataobj_t dummy{};
167 if (!cmd_obj.modules.empty()) {
168 for (
const auto& addressed : cmd_obj.modules) {
171 if (addressed.match.empty()) {
172 return addressed.data;
175 if (std::regex_match(mod_name, std::regex(addressed.match))) {
176 return addressed.data;
186DAQModuleManager::execute_action(
const std::string& module_name,
const std::string& action,
const dataobj_t& data_obj)
190 m_module_map[module_name]->execute_command(action, data_obj);
199DAQModuleManager::execute_action_plan_step(std::string
const& cmd,
200 const confmodel::DaqModulesGroup* step,
201 const dataobj_t& cmd_data,
202 bool execution_mode_is_serial)
204 std::string failed_mod_names(
"");
205 std::unordered_map<std::string, std::future<bool>> futures;
207 auto byType = step->cast<confmodel::DaqModulesGroupByType>();
208 auto byMod = step->cast<confmodel::DaqModulesGroupById>();
209 if (byType !=
nullptr) {
210 for (
auto& mod_class : byType->get_modules()) {
211 auto modules = m_modules_by_type[mod_class];
212 for (
auto& mod_name : modules) {
213 auto data_obj = get_dataobj_for_module(mod_name, cmd_data);
214 TLOG_DEBUG(1) <<
"Executing action " << cmd <<
" on module " << mod_name <<
" (class " << mod_class <<
")";
216 std::async(std::launch::async, &DAQModuleManager::execute_action,
this, mod_name, cmd, data_obj);
217 if (execution_mode_is_serial)
218 futures[mod_name].wait();
221 }
else if (byMod !=
nullptr) {
222 for (
auto& mod : byMod->get_modules()) {
223 auto mod_name = mod->UID();
224 auto data_obj = get_dataobj_for_module(mod_name, cmd_data);
226 if (byMod->get_optional() && !m_module_map.count(mod_name)) {
230 TLOG_DEBUG(1) <<
"Executing action " << cmd <<
" on module " << mod_name <<
" (class " << mod->class_name()
233 std::async(std::launch::async, &DAQModuleManager::execute_action,
this, mod_name, cmd, data_obj);
234 if (execution_mode_is_serial)
235 futures[mod_name].wait();
238 throw CommandDispatchingFailed(
ERS_HERE, cmd,
"Could not get DaqModulesGroup!");
241 for (
auto& future : futures) {
242 future.second.wait();
243 auto ret = future.second.get();
245 failed_mod_names.append(future.first);
246 failed_mod_names.append(
", ");
250 if (!failed_mod_names.empty()) {
251 throw CommandDispatchingFailed(
ERS_HERE, cmd, failed_mod_names);
255std::vector<std::string>
256DAQModuleManager::get_modnames_by_cmdid(cmdlib::cmd::CmdId
id)
259 std::vector<std::string> mod_names;
260 for (
const auto& [mod_name, mod_ptr] : m_module_map) {
261 if (mod_ptr->has_command(
id))
262 mod_names.push_back(mod_name);
269DAQModuleManager::check_cmd_data(
const std::string&
id,
const dataobj_t& cmd_data)
277 auto cmd_obj = cmd_data.get<cmd::CmdObj>();
278 const dataobj_t dummy{};
281 std::vector<std::string> cmd_mod_names = get_modnames_by_cmdid(
id);
284 std::map<std::string, std::vector<std::string>> mod_to_re;
286 if (!cmd_obj.modules.empty()) {
287 for (
const auto& addressed : cmd_obj.modules) {
288 if (!addressed.match.empty()) {
290 for (
const std::string& mod_name : cmd_mod_names) {
292 if (std::regex_match(mod_name, std::regex(addressed.match))) {
293 mod_to_re[mod_name].push_back(addressed.match);
300 for (
auto i = mod_to_re.begin(), last = mod_to_re.end(); i != last;) {
301 if (i->second.size() == 1) {
302 i = mod_to_re.erase(i);
309 if (mod_to_re.size() > 0) {
310 std::string mod_names;
311 for (
const auto& [mod_name, matched_re] : mod_to_re) {
312 mod_names += mod_name +
", ";
320DAQModuleManager::execute(
const std::string& cmd,
const dataobj_t& cmd_data)
325 if (!m_initialized) {
326 throw DAQModuleManagerNotInitialized(
ERS_HERE, cmd);
329 check_cmd_data(cmd, cmd_data);
331 auto action_plan = m_configuration_mgr->action_plan(cmd);
332 if (action_plan ==
nullptr) {
333 if (ACTION_PLANS_REQUIRED) {
334 throw ActionPlanNotFound(
ERS_HERE, cmd,
"Throwing exception");
335 }
else if (ACTION_PLANS_REQUIRED_WARNING) {
340 TLOG_DEBUG(1) << ActionPlanNotFound(
ERS_HERE, cmd,
"Executing action on all modules in parallel");
341 std::string failed_mod_names(
"");
342 std::unordered_map<std::string, std::future<bool>> futures;
344 auto mods = get_modnames_by_cmdid(cmd);
345 for (
auto& mod : mods) {
346 TLOG_DEBUG(1) <<
"Executing action " << cmd <<
" on module " << mod;
347 auto data_obj = get_dataobj_for_module(mod, cmd_data);
348 futures[mod] = std::async(std::launch::async, &DAQModuleManager::execute_action,
this, mod, cmd, data_obj);
351 for (
auto& future : futures) {
352 future.second.wait();
353 auto ret = future.second.get();
355 failed_mod_names.append(future.first);
356 failed_mod_names.append(
", ");
360 if (!failed_mod_names.empty()) {
361 throw CommandDispatchingFailed(
ERS_HERE, cmd, failed_mod_names);
365 auto execution_policy = action_plan->get_execution_policy();
366 auto serial_execution = execution_policy ==
"modules-in-series";
369 for (
auto& step : action_plan->get_steps()) {
370 execute_action_plan_step(cmd, step, cmd_data, serial_execution);
375 if (cmd ==
"scrap") {
376 get_iomanager()->shutdown();
Base class for any user define issue.
#define TLOG_DEBUG(lvl,...)
dunedaq::conffwk::relationship_t match(T const &, T const &)
std::shared_ptr< DAQModule > make_module(std::string const &plugin_name, std::string const &instance_name)
Load a DAQModule plugin and return a shared_ptr to the contained DAQModule class.
init Command received when already ERS_EMPTY ConflictingCommandMatching
void warning(const Issue &issue)
void error(const Issue &issue)