LCOV - code coverage report
Current view: top level - appfwk/src - DAQModuleManager.cpp (source / functions) Coverage Total Hit
Test: code.result Lines: 84.3 % 255 215
Test Date: 2025-12-21 13:07:08 Functions: 95.2 % 21 20

            Line data    Source code
       1              : /**
       2              :  * @file DAQModuleManager.cpp DAQModuleManager implementataion
       3              :  *
       4              :  * This is part of the DUNE DAQ Application Framework, copyright 2020.
       5              :  * Licensing/copyright details are in the COPYING file that you should have
       6              :  * received with this code.
       7              :  */
       8              : 
       9              : #include "DAQModuleManager.hpp"
      10              : 
      11              : #include "appfwk/DAQModule.hpp"
      12              : #include "appfwk/cmd/Nljs.hpp"
      13              : 
      14              : #include "cmdlib/cmd/Nljs.hpp"
      15              : #include "confmodel/DaqModulesGroup.hpp"
      16              : #include "confmodel/DaqModulesGroupById.hpp"
      17              : #include "confmodel/DaqModulesGroupByType.hpp"
      18              : #include "confmodel/Session.hpp"
      19              : #include "iomanager/IOManager.hpp"
      20              : #include "logging/Logging.hpp"
      21              : 
      22              : #include <future>
      23              : #include <map>
      24              : #include <memory>
      25              : #include <regex>
      26              : #include <set>
      27              : #include <string>
      28              : #include <unordered_map>
      29              : #include <utility>
      30              : #include <vector>
      31              : 
      32              : namespace dunedaq::appfwk {
      33              : 
      34           25 : DAQModuleManager::DAQModuleManager(const std::string& session_name)
      35           25 :   : m_session_name(session_name)
      36           50 :   , m_initialized(false)
      37              : {
      38           25 : }
      39              : 
      40              : void
      41           19 : DAQModuleManager::initialize(std::shared_ptr<ConfigurationManager> cfgMgr, opmonlib::OpMonManager& opm)
      42              : {
      43           19 :   set_config_mgr(cfgMgr);
      44           19 :   cfgMgr->initialize();
      45           36 :   get_iomanager()->configure(m_session_name,
      46              :                              m_configuration_mgr->get_queues(),
      47              :                              m_configuration_mgr->get_networkconnections(),
      48           18 :                              m_configuration_mgr->get_connectivity_service(),
      49              :                              opm);
      50           18 :   init_modules(m_configuration_mgr->get_modules(), opm);
      51              : 
      52           18 :   validate_action_plans();
      53              : 
      54           16 :   this->m_initialized = true;
      55           16 : }
      56              : 
      57              : std::optional<ValidationReport>
      58           33 : DAQModuleManager::check_mod_has_cmd(const std::string& cmd,
      59              :                                     const std::string& mod_class,
      60              :                                     bool is_optional,
      61              :                                     const std::string& mod_id,
      62              :                                     bool throw_on_fatal)
      63              : {
      64           33 :   std::string app = m_configuration_mgr->get_app_name();
      65              : 
      66           33 :   if (!m_modules_by_type.count(mod_class) || m_modules_by_type[mod_class].size() == 0) {
      67            2 :     if (is_optional) {
      68            1 :       ValidationReport report(ValidationReport::Severity::Ignored,
      69              :                            app,
      70              :                            mod_class,
      71              :                            cmd,
      72            1 :                            "No modules of class " + mod_class + " in application (optional step)");
      73              : 
      74            1 :       return report;
      75            1 :     }
      76            1 :     if (mod_id == "") {
      77            1 :       ValidationReport report(ValidationReport::Severity::Warning,
      78              :                            app,
      79              :                            mod_class,
      80              :                            cmd,
      81            1 :                            "No modules of class " + mod_class + " in application!");
      82            1 :       ers::warning(ActionPlanValidationFailed(ERS_HERE, report.get_command(), report.get_module(), report.get_message()));
      83            1 :       return report;
      84            1 :     } else {
      85            0 :       ValidationReport report(ValidationReport::Severity::Fatal,
      86              :                            app,
      87              :                            mod_class,
      88              :                            cmd,
      89            0 :                            "No modules of class " + mod_class + " in application!");
      90            0 :       if (throw_on_fatal)
      91            0 :         throw ActionPlanValidationFailed(
      92            0 :           ERS_HERE, report.get_command(), report.get_module(), report.get_message());
      93              :       else
      94            0 :         ers::error(ActionPlanValidationFailed(
      95            0 :           ERS_HERE, report.get_command(), report.get_module(), report.get_message()));
      96            0 :       return report;
      97            0 :     }
      98              :   }
      99              : 
     100           31 :   auto module_test = m_module_map[m_modules_by_type[mod_class][0]];
     101           31 :   if (mod_id != "") {
     102            8 :     bool match = false;
     103           12 :     for (auto& mod_name : m_modules_by_type[mod_class]) {
     104           10 :       if (mod_id == mod_name) {
     105            6 :         module_test = m_module_map[mod_name];
     106            6 :         match = true;
     107            6 :         break;
     108              :       }
     109              :     }
     110            8 :     if (!match && !is_optional) {
     111            1 :       ValidationReport report(
     112            1 :         ValidationReport::Severity::Fatal, app, mod_class, cmd, "No module with id " + mod_id + " found.");
     113              : 
     114            1 :       if (throw_on_fatal)
     115            1 :         throw ActionPlanValidationFailed(ERS_HERE, report.get_command(), report.get_module(), report.get_message());
     116              :       else
     117            0 :         ers::error(
     118            0 :           ActionPlanValidationFailed(ERS_HERE, report.get_command(), report.get_module(), report.get_message()));
     119            0 :       return report;
     120            1 :     }
     121              :   }
     122              : 
     123           30 :   if (!module_test->has_command(cmd)) {
     124            1 :     ValidationReport report(
     125            1 :       ValidationReport::Severity::Fatal, app, mod_class, cmd, "Module does not have command " + cmd + " registered.");
     126              : 
     127            1 :     if (throw_on_fatal)
     128            1 :       throw ActionPlanValidationFailed(ERS_HERE, report.get_command(), report.get_module(), report.get_message());
     129              :     else
     130            0 :       ers::error(ActionPlanValidationFailed(ERS_HERE, report.get_command(), report.get_module(), report.get_message()));
     131            0 :     return report;
     132            1 :   }
     133           29 :   return {};
     134           35 : }
     135              : 
     136              : void
     137           18 : DAQModuleManager::construct_modules(const std::vector<const dunedaq::confmodel::DaqModule*>& modules)
     138              : {
     139           39 :   for (const auto mod : modules) {
     140           21 :     TLOG_DEBUG(0) << "construct: " << mod->class_name() << " : " << mod->UID();
     141           21 :     auto mptr = make_module(mod->class_name(), mod->UID());
     142              :     // Once constructed, DAQModules should not try to regsiter any more commands
     143           21 :     mptr->set_command_registration_allowed(false);
     144           21 :     m_module_map.emplace(mod->UID(), mptr);
     145              : 
     146           21 :     if (!m_modules_by_type.count(mod->class_name())) {
     147           17 :       m_modules_by_type[mod->class_name()] = std::vector<std::string>();
     148              :     }
     149           21 :     m_modules_by_type[mod->class_name()].emplace_back(mod->UID());
     150           21 :   }
     151           18 : }
     152              : 
     153              : void
     154           18 : DAQModuleManager::init_modules(const std::vector<const dunedaq::confmodel::DaqModule*>& modules,
     155              :                                opmonlib::OpMonManager& opm)
     156              : {
     157           18 :   construct_modules(modules);
     158              : 
     159           39 :   for (const auto mod : modules) {
     160           21 :     auto mptr = m_module_map[mod->UID()];
     161           21 :     opm.register_node(mod->UID(), mptr);
     162              : 
     163           21 :     try {
     164           21 :       mptr->init(m_configuration_mgr);
     165            0 :     } catch (ers::Issue& ex) {
     166            0 :       throw DAQModuleInitFailed(ERS_HERE, mod->UID(), ex);
     167            0 :     }
     168           21 :   }
     169           18 : }
     170              : 
     171              : std::vector<ValidationReport>
     172           18 : DAQModuleManager::validate_action_plans(bool throw_on_fatal)
     173              : {
     174           18 :   std::vector<ValidationReport> reports;
     175           18 :   std::string app = m_configuration_mgr->get_app_name();
     176              : 
     177           45 :   for (auto& plan_pair : m_configuration_mgr->get_action_plans()) {
     178           29 :     auto cmd = plan_pair.first;
     179           29 :     TLOG_DEBUG(0) << app << ": Checking action plan " << cmd;
     180           29 :     std::map<std::string, std::set<std::string>> modules_with_cmd;
     181           57 :     for (const auto& [mod_type, module_list] : m_modules_by_type) {
     182           28 :       if (module_list.size() > 0 && m_module_map[module_list[0]]->has_command(cmd)) {
     183           27 :         modules_with_cmd[mod_type] = std::set<std::string>(module_list.begin(), module_list.end());
     184              :       }
     185              :     }
     186              : 
     187           56 :     for (auto& step : plan_pair.second->get_steps()) {
     188           29 :       auto byType = step->cast<confmodel::DaqModulesGroupByType>();
     189           29 :       auto byMod = step->cast<confmodel::DaqModulesGroupById>();
     190           29 :       if (byType != nullptr) {
     191           49 :         for (auto& mod_type : byType->get_modules()) {
     192           25 :           auto report = check_mod_has_cmd(cmd, mod_type, byType->get_optional(), "", throw_on_fatal);
     193           24 :           if (report)
     194            2 :             reports.push_back(report.value());
     195           24 :           modules_with_cmd.erase(mod_type);
     196           24 :         }
     197            4 :       } else if (byMod != nullptr) {
     198           11 :         for (auto& mod : byMod->get_modules()) {
     199            8 :           auto report = check_mod_has_cmd(cmd, mod->class_name(), byMod->get_optional(), mod->UID(), throw_on_fatal);
     200            7 :           if (report)
     201            0 :             reports.push_back(report.value());
     202            7 :           modules_with_cmd[mod->class_name()].erase(mod->UID());
     203            7 :         }
     204              :       } else {
     205            0 :         reports.emplace_back(
     206            0 :           ValidationReport::Severity::Fatal, app, "N/A", cmd, "Invalid subclass of DaqModulesGroup encountered!");
     207            0 :         if (throw_on_fatal)
     208            0 :           throw ActionPlanValidationFailed(
     209            0 :             ERS_HERE, reports.back().get_command(), reports.back().get_module(), reports.back().get_message());
     210              :         else
     211            0 :           ers::error(ActionPlanValidationFailed(
     212            0 :             ERS_HERE, reports.back().get_command(), reports.back().get_module(), reports.back().get_message()));
     213              :       }
     214              :     }
     215              : 
     216           31 :     for (const auto& [mod_type, module_list] : modules_with_cmd) {
     217            6 :       for (auto& mod : module_list) {
     218            2 :         reports.emplace_back(ValidationReport::Severity::Error,
     219              :                              app,
     220              :                              mod_type,
     221              :                              cmd,
     222            4 :                              "ActionPlan is defined, module has command, but module " + mod + " is not in any steps");
     223            2 :         ers::error(ActionPlanValidationFailed(
     224            4 :           ERS_HERE, reports.back().get_command(), reports.back().get_module(), reports.back().get_message()));
     225              :       }
     226              :     }
     227           31 :   }
     228              : 
     229           16 :   return reports;
     230           20 : }
     231              : 
     232              : void
     233            1 : DAQModuleManager::cleanup()
     234              : {
     235            1 :   get_iomanager()->reset();
     236            1 :   this->m_initialized = false;
     237            1 : }
     238              : 
     239              : DAQModule::CommandData_t
     240           15 : DAQModuleManager::get_command_data_for_module(const std::string& mod_name, const DAQModule::CommandData_t& cmd_data)
     241              : {
     242           15 :   auto cmd_obj = cmd_data.get<cmd::CmdObj>();
     243           15 :   const DAQModule::CommandData_t dummy{};
     244              : 
     245           15 :   if (!cmd_obj.modules.empty()) {
     246            3 :     for (const auto& addressed : cmd_obj.modules) {
     247              : 
     248              :       // First exception: empty = `all`
     249            3 :       if (addressed.match.empty()) {
     250            3 :         return static_cast<DAQModule::CommandData_t>(addressed.data);
     251              :       } else {
     252              :         // match module name with regex
     253            3 :         if (std::regex_match(mod_name, std::regex(addressed.match))) {
     254            3 :           return static_cast<DAQModule::CommandData_t>(addressed.data);
     255              :         }
     256              :       }
     257              :     }
     258              :   }
     259              :   // No matches
     260           12 :   return dummy;
     261           15 : }
     262              : 
     263              : bool
     264           14 : DAQModuleManager::execute_action(const std::string& module_name,
     265              :                                  const std::string& action,
     266              :                                  const DAQModule::CommandData_t& command_data)
     267              : {
     268           14 :   try {
     269           14 :     TLOG_DEBUG(2) << "Executing " << module_name << " -> " << action;
     270           14 :     m_module_map[module_name]->execute_command(action, command_data);
     271            4 :   } catch (ers::Issue& ex) {
     272            4 :     ers::error(ex);
     273            4 :     return false;
     274            4 :   }
     275              :   return true;
     276              : }
     277              : 
     278              : void
     279           12 : DAQModuleManager::execute_action_plan_step(std::string const& cmd,
     280              :                                            const confmodel::DaqModulesGroup* step,
     281              :                                            const DAQModule::CommandData_t& cmd_data,
     282              :                                            bool execution_mode_is_serial)
     283              : {
     284           12 :   std::string failed_mod_names("");
     285           12 :   std::unordered_map<std::string, std::future<bool>> futures;
     286              : 
     287           12 :   auto byType = step->cast<confmodel::DaqModulesGroupByType>();
     288           12 :   auto byMod = step->cast<confmodel::DaqModulesGroupById>();
     289           12 :   if (byType != nullptr) {
     290           18 :     for (auto& mod_class : byType->get_modules()) {
     291            9 :       auto modules = m_modules_by_type[mod_class];
     292           18 :       for (auto& mod_name : modules) {
     293            9 :         auto command_data = get_command_data_for_module(mod_name, cmd_data);
     294            9 :         TLOG_DEBUG(1) << "Executing action " << cmd << " on module " << mod_name << " (class " << mod_class << ")";
     295            9 :         futures[mod_name] =
     296           18 :           std::async(std::launch::async, &DAQModuleManager::execute_action, this, mod_name, cmd, command_data);
     297            9 :         if (execution_mode_is_serial)
     298            0 :           futures[mod_name].wait();
     299            9 :       }
     300            9 :     }
     301            3 :   } else if (byMod != nullptr) {
     302            9 :     for (auto& mod : byMod->get_modules()) {
     303            6 :       auto mod_name = mod->UID();
     304            6 :       auto command_data = get_command_data_for_module(mod_name, cmd_data);
     305              : 
     306            6 :       if (byMod->get_optional() && !m_module_map.count(mod_name)) {
     307            1 :         continue;
     308              :       }
     309              : 
     310            5 :       TLOG_DEBUG(1) << "Executing action " << cmd << " on module " << mod_name << " (class " << mod->class_name()
     311            5 :                     << ")";
     312            5 :       futures[mod_name] =
     313           10 :         std::async(std::launch::async, &DAQModuleManager::execute_action, this, mod_name, cmd, command_data);
     314            5 :       if (execution_mode_is_serial)
     315            0 :         futures[mod_name].wait();
     316            6 :     }
     317              :   } else {
     318            0 :     throw CommandDispatchingFailed(ERS_HERE, cmd, "Could not get DaqModulesGroup!");
     319              :   }
     320              : 
     321           26 :   for (auto& future : futures) {
     322           14 :     future.second.wait();
     323           14 :     auto ret = future.second.get();
     324           14 :     if (!ret) {
     325            4 :       failed_mod_names.append(future.first);
     326            4 :       failed_mod_names.append(", ");
     327              :     }
     328              :   }
     329              :   // Throw if any dispatching failed
     330           12 :   if (!failed_mod_names.empty()) {
     331            3 :     throw CommandDispatchingFailed(ERS_HERE, cmd, failed_mod_names);
     332              :   }
     333           15 : }
     334              : 
     335              : std::vector<std::string>
     336           19 : DAQModuleManager::get_modnames_by_cmdid(cmdlib::cmd::CmdId id)
     337              : {
     338              :   // Make a convenience array with module names that have the requested command
     339           19 :   std::vector<std::string> mod_names;
     340           42 :   for (const auto& [mod_name, mod_ptr] : m_module_map) {
     341           23 :     if (mod_ptr->has_command(id))
     342           17 :       mod_names.push_back(mod_name);
     343              :   }
     344              : 
     345           19 :   return mod_names;
     346            0 : }
     347              : 
     348              : void
     349           16 : DAQModuleManager::check_command_data(const std::string& id, const DAQModule::CommandData_t& cmd_data)
     350              : {
     351              :   // This method ensures that each module is only matched once per command.
     352              :   // If multiple matches are found, an ers::Issue is thrown
     353              :   // Disclaimenr for the occasional reader: this is the first implementation of the
     354              :   // multiple-matches detection logic. The author is painfully aware that it can be
     355              :   // vastly improved, in style if not in performance.
     356              : 
     357           16 :   auto cmd_obj = cmd_data.get<cmd::CmdObj>();
     358           16 :   const DAQModule::CommandData_t dummy{};
     359              : 
     360              :   // Make a convenience array with module names that have the requested command
     361           16 :   std::vector<std::string> cmd_mod_names = get_modnames_by_cmdid(id);
     362              : 
     363              :   // containers for error tracking
     364           16 :   std::map<std::string, std::vector<std::string>> mod_to_re;
     365              : 
     366           16 :   if (!cmd_obj.modules.empty()) {
     367           17 :     for (const auto& addressed : cmd_obj.modules) {
     368           10 :       if (!addressed.match.empty()) {
     369              :         // Find module names matching the regex
     370           14 :         for (const std::string& mod_name : cmd_mod_names) {
     371              :           // match module name with regex
     372            7 :           if (std::regex_match(mod_name, std::regex(addressed.match))) {
     373            5 :             mod_to_re[mod_name].push_back(addressed.match);
     374              :           }
     375              :         }
     376              :       }
     377              :     }
     378              : 
     379              :     // Select modules with multiple matches
     380           11 :     for (auto i = mod_to_re.begin(), last = mod_to_re.end(); i != last;) {
     381            4 :       if (i->second.size() == 1) {
     382            3 :         i = mod_to_re.erase(i);
     383              :       } else {
     384            1 :         ++i;
     385              :       }
     386              :     }
     387              : 
     388              :     // Catch cases
     389            7 :     if (mod_to_re.size() > 0) {
     390            1 :       std::string mod_names;
     391            2 :       for (const auto& [mod_name, matched_re] : mod_to_re) {
     392            1 :         mod_names += mod_name + ", ";
     393              :       }
     394            1 :       throw ConflictingCommandMatching(ERS_HERE, id, mod_names);
     395            1 :     }
     396              :   }
     397           19 : }
     398              : 
     399              : void
     400           18 : DAQModuleManager::execute(const std::string& cmd, const DAQModule::CommandData_t& cmd_data)
     401              : {
     402              : 
     403           18 :   TLOG_DEBUG(1) << "Command id:" << cmd;
     404              : 
     405           18 :   if (!m_initialized) {
     406            2 :     throw DAQModuleManagerNotInitialized(ERS_HERE, cmd);
     407              :   }
     408              : 
     409           16 :   check_command_data(cmd, cmd_data);
     410              : 
     411           15 :   auto action_plan = m_configuration_mgr->get_action_plan(cmd);
     412           15 :   if (action_plan == nullptr) {
     413            3 :     if (ACTION_PLANS_REQUIRED) {
     414              :       throw ActionPlanNotFound(ERS_HERE, cmd, "Throwing exception");
     415            3 :     } else if (ACTION_PLANS_REQUIRED_WARNING) {
     416              :       ers::warning(ActionPlanNotFound(ERS_HERE, cmd, "Returning without executing actions"));
     417              :       return;
     418              :     } else {
     419              :       // Emulate old behavior
     420            3 :       TLOG_DEBUG(1) << ActionPlanNotFound(ERS_HERE, cmd, "Executing action on all modules in parallel");
     421            3 :       std::string failed_mod_names("");
     422            3 :       std::unordered_map<std::string, std::future<bool>> futures;
     423              : 
     424            3 :       auto mods = get_modnames_by_cmdid(cmd);
     425            3 :       for (auto& mod : mods) {
     426            0 :         TLOG_DEBUG(1) << "Executing action " << cmd << " on module " << mod;
     427            0 :         auto command_data = get_command_data_for_module(mod, cmd_data);
     428            0 :         futures[mod] = std::async(std::launch::async, &DAQModuleManager::execute_action, this, mod, cmd, command_data);
     429            0 :       }
     430              : 
     431            3 :       for (auto& future : futures) {
     432            0 :         future.second.wait();
     433            0 :         auto ret = future.second.get();
     434            0 :         if (!ret) {
     435            0 :           failed_mod_names.append(future.first);
     436            0 :           failed_mod_names.append(", ");
     437              :         }
     438              :       }
     439              :       // Throw if any dispatching failed
     440            3 :       if (!failed_mod_names.empty()) {
     441            0 :         throw CommandDispatchingFailed(ERS_HERE, cmd, failed_mod_names);
     442              :       }
     443            3 :     }
     444              :   } else {
     445           12 :     auto execution_policy = action_plan->get_execution_policy();
     446           12 :     auto serial_execution = execution_policy == "modules-in-series";
     447              : 
     448              :     // We validated the action plans already
     449           21 :     for (auto& step : action_plan->get_steps()) {
     450           12 :       execute_action_plan_step(cmd, step, cmd_data, serial_execution);
     451              :     }
     452           12 :   }
     453              : 
     454              :   // Shutdown IOManager at scrap
     455           12 :   if (cmd == "scrap") {
     456            0 :     get_iomanager()->shutdown();
     457              :   }
     458              : }
     459              : 
     460              : } // namespace dunedaq::appfwk
        

Generated by: LCOV version 2.0-1