27#include <unordered_map>
33DAQModuleManager::DAQModuleManager(
const std::string& session_name)
34 : m_session_name(session_name)
35 , m_initialized(false)
40DAQModuleManager::initialize(std::shared_ptr<ConfigurationManager> cfgMgr, opmonlib::OpMonManager& opm)
42 m_configuration_mgr = cfgMgr;
44 get_iomanager()->configure(m_session_name,
45 m_configuration_mgr->queues(),
46 m_configuration_mgr->networkconnections(),
47 m_configuration_mgr->connectivity_service(),
49 init_modules(m_configuration_mgr->modules(), opm);
51 for (
auto& plan_pair : m_configuration_mgr->action_plans()) {
52 auto cmd = plan_pair.first;
54 for (
auto& step : plan_pair.second->get_steps()) {
55 auto byType = step->cast<confmodel::DaqModulesGroupByType>();
56 auto byMod = step->cast<confmodel::DaqModulesGroupById>();
57 if (byType !=
nullptr) {
58 for (
auto& mod_type : byType->get_modules()) {
59 check_mod_has_cmd(cmd, mod_type);
61 }
else if (byMod !=
nullptr) {
62 for (
auto& mod : byMod->get_modules()) {
63 check_mod_has_cmd(cmd, mod->class_name(), mod->UID());
66 throw ActionPlanValidationFailed(
ERS_HERE, cmd,
"",
"Invalid subclass of DaqModulesGroup encountered!");
70 this->m_initialized =
true;
74DAQModuleManager::check_mod_has_cmd(
const std::string& cmd,
const std::string& mod_class,
const std::string& mod_id)
77 if (!m_modules_by_type.count(mod_class) || m_modules_by_type[mod_class].size() == 0) {
79 ers::info(ActionPlanValidationFailed(
ERS_HERE, cmd, mod_class,
"Module does not exist"));
82 throw ActionPlanValidationFailed(
ERS_HERE, cmd, mod_class,
"Module does not exist");
86 auto module_test = m_module_map[m_modules_by_type[mod_class][0]];
89 for (
auto& mod_name : m_modules_by_type[mod_class]) {
90 if (mod_id == mod_name) {
91 module_test = m_module_map[mod_name];
97 throw ActionPlanValidationFailed(
ERS_HERE, cmd, mod_class,
"No module with id " + mod_id +
" found.");
101 if (!module_test->has_command(cmd)) {
102 throw ActionPlanValidationFailed(
ERS_HERE, cmd, mod_class,
"Module does not have method " + cmd);
107DAQModuleManager::init_modules(
const std::vector<const dunedaq::confmodel::DaqModule*>& modules,
108 opmonlib::OpMonManager& opm)
110 for (
const auto mod : modules) {
111 TLOG_DEBUG(0) <<
"construct: " << mod->class_name() <<
" : " << mod->UID();
112 auto mptr =
make_module(mod->class_name(), mod->UID());
113 m_module_map.emplace(mod->UID(), mptr);
115 if (!m_modules_by_type.count(mod->class_name())) {
116 m_modules_by_type[mod->class_name()] = std::vector<std::string>();
118 m_modules_by_type[mod->class_name()].emplace_back(mod->UID());
120 opm.register_node(mod->UID(), mptr);
123 mptr->init(m_configuration_mgr);
125 throw DAQModuleInitFailed(
ERS_HERE, mod->UID(), ex);
131DAQModuleManager::cleanup()
133 get_iomanager()->reset();
134 this->m_initialized =
false;
137DAQModuleManager::dataobj_t
138DAQModuleManager::get_dataobj_for_module(
const std::string& mod_name,
const dataobj_t& cmd_data)
140 auto cmd_obj = cmd_data.get<cmd::CmdObj>();
141 const dataobj_t dummy{};
143 if (!cmd_obj.modules.empty()) {
144 for (
const auto& addressed : cmd_obj.modules) {
147 if (addressed.match.empty()) {
148 return addressed.data;
151 if (std::regex_match(mod_name, std::regex(addressed.match))) {
152 return addressed.data;
162DAQModuleManager::execute_action(
const std::string& module_name,
const std::string& action,
const dataobj_t& data_obj)
166 m_module_map[module_name]->execute_command(action, data_obj);
175DAQModuleManager::execute_action_plan_step(std::string
const& cmd,
176 const confmodel::DaqModulesGroup* step,
177 const dataobj_t& cmd_data,
178 bool execution_mode_is_serial)
180 std::string failed_mod_names(
"");
181 std::unordered_map<std::string, std::future<bool>> futures;
183 auto byType = step->cast<confmodel::DaqModulesGroupByType>();
184 auto byMod = step->cast<confmodel::DaqModulesGroupById>();
185 if (byType !=
nullptr) {
186 for (
auto& mod_class : byType->get_modules()) {
187 auto modules = m_modules_by_type[mod_class];
188 for (
auto& mod_name : modules) {
189 auto data_obj = get_dataobj_for_module(mod_name, cmd_data);
190 TLOG_DEBUG(1) <<
"Executing action " << cmd <<
" on module " << mod_name <<
" (class " << mod_class <<
")";
192 std::async(std::launch::async, &DAQModuleManager::execute_action,
this, mod_name, cmd, data_obj);
193 if (execution_mode_is_serial)
194 futures[mod_name].wait();
197 }
else if (byMod !=
nullptr) {
198 for (
auto& mod : byMod->get_modules()) {
199 auto mod_name = mod->UID();
200 auto data_obj = get_dataobj_for_module(mod_name, cmd_data);
201 TLOG_DEBUG(1) <<
"Executing action " << cmd <<
" on module " << mod_name <<
" (class " << mod->class_name()
204 std::async(std::launch::async, &DAQModuleManager::execute_action,
this, mod_name, cmd, data_obj);
205 if (execution_mode_is_serial)
206 futures[mod_name].wait();
209 throw CommandDispatchingFailed(
ERS_HERE, cmd,
"Could not get DaqModulesGroup!");
212 for (
auto& future : futures) {
213 future.second.wait();
214 auto ret = future.second.get();
216 failed_mod_names.append(future.first);
217 failed_mod_names.append(
", ");
221 if (!failed_mod_names.empty()) {
222 throw CommandDispatchingFailed(
ERS_HERE, cmd, failed_mod_names);
226std::vector<std::string>
227DAQModuleManager::get_modnames_by_cmdid(cmdlib::cmd::CmdId
id)
230 std::vector<std::string> mod_names;
231 for (
const auto& [mod_name, mod_ptr] : m_module_map) {
232 if (mod_ptr->has_command(
id))
233 mod_names.push_back(mod_name);
240DAQModuleManager::check_cmd_data(
const std::string&
id,
const dataobj_t& cmd_data)
248 auto cmd_obj = cmd_data.get<cmd::CmdObj>();
249 const dataobj_t dummy{};
252 std::vector<std::string> cmd_mod_names = get_modnames_by_cmdid(
id);
255 std::map<std::string, std::vector<std::string>> mod_to_re;
257 if (!cmd_obj.modules.empty()) {
258 for (
const auto& addressed : cmd_obj.modules) {
259 if (!addressed.match.empty()) {
261 for (
const std::string& mod_name : cmd_mod_names) {
263 if (std::regex_match(mod_name, std::regex(addressed.match))) {
264 mod_to_re[mod_name].push_back(addressed.match);
271 for (
auto i = mod_to_re.begin(), last = mod_to_re.end(); i != last;) {
272 if (i->second.size() == 1) {
273 i = mod_to_re.erase(i);
280 if (mod_to_re.size() > 0) {
281 std::string mod_names;
282 for (
const auto& [mod_name, matched_re] : mod_to_re) {
283 mod_names += mod_name +
", ";
291DAQModuleManager::execute(
const std::string& cmd,
const dataobj_t& cmd_data)
296 if (!m_initialized) {
297 throw DAQModuleManagerNotInitialized(
ERS_HERE, cmd);
300 check_cmd_data(cmd, cmd_data);
302 auto action_plan = m_configuration_mgr->action_plan(cmd);
303 if (action_plan ==
nullptr) {
305 throw ActionPlanNotFound(
ERS_HERE, cmd,
"Throwing exception");
311 TLOG_DEBUG(1) << ActionPlanNotFound(
ERS_HERE, cmd,
"Executing action on all modules in parallel");
312 std::string failed_mod_names(
"");
313 std::unordered_map<std::string, std::future<bool>> futures;
315 auto mods = get_modnames_by_cmdid(cmd);
316 for (
auto& mod : mods) {
317 TLOG_DEBUG(1) <<
"Executing action " << cmd <<
" on module " << mod;
318 auto data_obj = get_dataobj_for_module(mod, cmd_data);
319 futures[mod] = std::async(std::launch::async, &DAQModuleManager::execute_action,
this, mod, cmd, data_obj);
322 for (
auto& future : futures) {
323 future.second.wait();
324 auto ret = future.second.get();
326 failed_mod_names.append(future.first);
327 failed_mod_names.append(
", ");
331 if (!failed_mod_names.empty()) {
332 throw CommandDispatchingFailed(
ERS_HERE, cmd, failed_mod_names);
336 auto execution_policy = action_plan->get_execution_policy();
337 auto serial_execution = execution_policy ==
"modules-in-series";
340 for (
auto& step : action_plan->get_steps()) {
341 execute_action_plan_step(cmd, step, cmd_data, serial_execution);
346 if (cmd ==
"scrap") {
347 get_iomanager()->shutdown();
Base class for any user define issue.
#define TLOG_DEBUG(lvl,...)
dunedaq::conffwk::relationship_t match(T const &, T const &)
std::shared_ptr< DAQModule > make_module(std::string const &plugin_name, std::string const &instance_name)
Load a DAQModule plugin and return a shared_ptr to the contained DAQModule class.
init Command received when already ERS_EMPTY ConflictingCommandMatching
void warning(const Issue &issue)
void info(const Issue &issue)
void error(const Issue &issue)