DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
DAQModuleManager.cpp
Go to the documentation of this file.
1
10
11#include "appfwk/DAQModule.hpp"
12#include "appfwk/cmd/Nljs.hpp"
13
14#include "cmdlib/cmd/Nljs.hpp"
18#include "confmodel/Session.hpp"
20#include "logging/Logging.hpp"
21
22#include <future>
23#include <map>
24#include <memory>
25#include <regex>
26#include <set>
27#include <string>
28#include <unordered_map>
29#include <utility>
30#include <vector>
31
32namespace dunedaq::appfwk {
33
34DAQModuleManager::DAQModuleManager(const std::string& session_name)
35 : m_session_name(session_name)
36 , m_initialized(false)
37{
38}
39
40void
41DAQModuleManager::initialize(std::shared_ptr<ConfigurationManager> cfgMgr, opmonlib::OpMonManager& opm)
42{
43 m_configuration_mgr = cfgMgr; // Make a copy
44 cfgMgr->initialize();
45 get_iomanager()->configure(m_session_name,
46 m_configuration_mgr->queues(),
47 m_configuration_mgr->networkconnections(),
48 m_configuration_mgr->connectivity_service(),
49 opm);
50 init_modules(m_configuration_mgr->modules(), opm);
51
52 for (auto& plan_pair : m_configuration_mgr->action_plans()) {
53 auto cmd = plan_pair.first;
54 std::map<std::string, std::set<std::string>> modules_with_cmd;
55 for (const auto& [mod_type, module_list] : m_modules_by_type) {
56 if (module_list.size() > 0 && m_module_map[module_list[0]]->has_command(cmd)) {
57 modules_with_cmd[mod_type] = std::set<std::string>(module_list.begin(), module_list.end());
58 }
59 }
60
61 for (auto& step : plan_pair.second->get_steps()) {
62 auto byType = step->cast<confmodel::DaqModulesGroupByType>();
63 auto byMod = step->cast<confmodel::DaqModulesGroupById>();
64 if (byType != nullptr) {
65 for (auto& mod_type : byType->get_modules()) {
66 check_mod_has_cmd(cmd, mod_type, byType->get_optional());
67 modules_with_cmd.erase(mod_type);
68 }
69 } else if (byMod != nullptr) {
70 for (auto& mod : byMod->get_modules()) {
71 check_mod_has_cmd(cmd, mod->class_name(), byMod->get_optional(), mod->UID());
72 modules_with_cmd[mod->class_name()].erase(mod->UID());
73 }
74 } else {
75 throw ActionPlanValidationFailed(ERS_HERE, cmd, "", "Invalid subclass of DaqModulesGroup encountered!");
76 }
77 }
78
79 for (const auto& [mod_type, module_list] : modules_with_cmd) {
80 for (auto& mod : module_list) {
81 ers::error(ActionPlanValidationFailed(
82 ERS_HERE, cmd, mod, "ActionPlan is defined, module has command, but module is not in any steps"));
83 }
84 }
85 }
86 this->m_initialized = true;
87}
88
89void
90DAQModuleManager::check_mod_has_cmd(const std::string& cmd,
91 const std::string& mod_class,
92 bool is_optional,
93 const std::string& mod_id)
94{
95 if (!m_modules_by_type.count(mod_class) || m_modules_by_type[mod_class].size() == 0) {
96 if (is_optional)
97 return;
98 if (mod_id == "") {
100 ActionPlanValidationFailed(ERS_HERE, cmd, mod_class, "No modules of class " + mod_class + " in application!"));
101 return;
102 } else {
103 throw ActionPlanValidationFailed(
104 ERS_HERE, cmd, mod_class, "No modules of class " + mod_class + " in application!");
105 }
106 }
107
108 auto module_test = m_module_map[m_modules_by_type[mod_class][0]];
109 if (mod_id != "") {
110 bool match = false;
111 for (auto& mod_name : m_modules_by_type[mod_class]) {
112 if (mod_id == mod_name) {
113 module_test = m_module_map[mod_name];
114 match = true;
115 break;
116 }
117 }
118 if (!match && !is_optional) {
119 throw ActionPlanValidationFailed(ERS_HERE, cmd, mod_class, "No module with id " + mod_id + " found.");
120 }
121 }
122
123 if (!module_test->has_command(cmd)) {
124 throw ActionPlanValidationFailed(ERS_HERE, cmd, mod_class, "Module does not have command " + cmd + " registered.");
125 }
126}
127
128void
129DAQModuleManager::init_modules(const std::vector<const dunedaq::confmodel::DaqModule*>& modules,
130 opmonlib::OpMonManager& opm)
131{
132 for (const auto mod : modules) {
133 TLOG_DEBUG(0) << "construct: " << mod->class_name() << " : " << mod->UID();
134 auto mptr = make_module(mod->class_name(), mod->UID());
135 // Once constructed, DAQModules should not try to regsiter any more commands
136 mptr->set_command_registration_allowed(false);
137 m_module_map.emplace(mod->UID(), mptr);
138
139 if (!m_modules_by_type.count(mod->class_name())) {
140 m_modules_by_type[mod->class_name()] = std::vector<std::string>();
141 }
142 m_modules_by_type[mod->class_name()].emplace_back(mod->UID());
143
144 opm.register_node(mod->UID(), mptr);
145
146 try {
147 mptr->init(m_configuration_mgr);
148 } catch (ers::Issue& ex) {
149 throw DAQModuleInitFailed(ERS_HERE, mod->UID(), ex);
150 }
151 }
152}
153
154void
155DAQModuleManager::cleanup()
156{
157 get_iomanager()->reset();
158 this->m_initialized = false;
159}
160
161DAQModuleManager::dataobj_t
162DAQModuleManager::get_dataobj_for_module(const std::string& mod_name, const dataobj_t& cmd_data)
163{
164 auto cmd_obj = cmd_data.get<cmd::CmdObj>();
165 const dataobj_t dummy{};
166
167 if (!cmd_obj.modules.empty()) {
168 for (const auto& addressed : cmd_obj.modules) {
169
170 // First exception: empty = `all`
171 if (addressed.match.empty()) {
172 return addressed.data;
173 } else {
174 // match module name with regex
175 if (std::regex_match(mod_name, std::regex(addressed.match))) {
176 return addressed.data;
177 }
178 }
179 }
180 }
181 // No matches
182 return dummy;
183}
184
185bool
186DAQModuleManager::execute_action(const std::string& module_name, const std::string& action, const dataobj_t& data_obj)
187{
188 try {
189 TLOG_DEBUG(2) << "Executing " << module_name << " -> " << action;
190 m_module_map[module_name]->execute_command(action, data_obj);
191 } catch (ers::Issue& ex) {
192 ers::error(ex);
193 return false;
194 }
195 return true;
196}
197
198void
199DAQModuleManager::execute_action_plan_step(std::string const& cmd,
200 const confmodel::DaqModulesGroup* step,
201 const dataobj_t& cmd_data,
202 bool execution_mode_is_serial)
203{
204 std::string failed_mod_names("");
205 std::unordered_map<std::string, std::future<bool>> futures;
206
207 auto byType = step->cast<confmodel::DaqModulesGroupByType>();
208 auto byMod = step->cast<confmodel::DaqModulesGroupById>();
209 if (byType != nullptr) {
210 for (auto& mod_class : byType->get_modules()) {
211 auto modules = m_modules_by_type[mod_class];
212 for (auto& mod_name : modules) {
213 auto data_obj = get_dataobj_for_module(mod_name, cmd_data);
214 TLOG_DEBUG(1) << "Executing action " << cmd << " on module " << mod_name << " (class " << mod_class << ")";
215 futures[mod_name] =
216 std::async(std::launch::async, &DAQModuleManager::execute_action, this, mod_name, cmd, data_obj);
217 if (execution_mode_is_serial)
218 futures[mod_name].wait();
219 }
220 }
221 } else if (byMod != nullptr) {
222 for (auto& mod : byMod->get_modules()) {
223 auto mod_name = mod->UID();
224 auto data_obj = get_dataobj_for_module(mod_name, cmd_data);
225
226 if (byMod->get_optional() && !m_module_map.count(mod_name)) {
227 continue;
228 }
229
230 TLOG_DEBUG(1) << "Executing action " << cmd << " on module " << mod_name << " (class " << mod->class_name()
231 << ")";
232 futures[mod_name] =
233 std::async(std::launch::async, &DAQModuleManager::execute_action, this, mod_name, cmd, data_obj);
234 if (execution_mode_is_serial)
235 futures[mod_name].wait();
236 }
237 } else {
238 throw CommandDispatchingFailed(ERS_HERE, cmd, "Could not get DaqModulesGroup!");
239 }
240
241 for (auto& future : futures) {
242 future.second.wait();
243 auto ret = future.second.get();
244 if (!ret) {
245 failed_mod_names.append(future.first);
246 failed_mod_names.append(", ");
247 }
248 }
249 // Throw if any dispatching failed
250 if (!failed_mod_names.empty()) {
251 throw CommandDispatchingFailed(ERS_HERE, cmd, failed_mod_names);
252 }
253}
254
255std::vector<std::string>
256DAQModuleManager::get_modnames_by_cmdid(cmdlib::cmd::CmdId id)
257{
258 // Make a convenience array with module names that have the requested command
259 std::vector<std::string> mod_names;
260 for (const auto& [mod_name, mod_ptr] : m_module_map) {
261 if (mod_ptr->has_command(id))
262 mod_names.push_back(mod_name);
263 }
264
265 return mod_names;
266}
267
268void
269DAQModuleManager::check_cmd_data(const std::string& id, const dataobj_t& cmd_data)
270{
271 // This method ensures that each module is only matched once per command.
272 // If multiple matches are found, an ers::Issue is thrown
273 // Disclaimenr for the occasional reader: this is the first implementation of the
274 // multiple-matches detection logic. The author is painfully aware that it can be
275 // vastly improved, in style if not in performance.
276
277 auto cmd_obj = cmd_data.get<cmd::CmdObj>();
278 const dataobj_t dummy{};
279
280 // Make a convenience array with module names that have the requested command
281 std::vector<std::string> cmd_mod_names = get_modnames_by_cmdid(id);
282
283 // containers for error tracking
284 std::map<std::string, std::vector<std::string>> mod_to_re;
285
286 if (!cmd_obj.modules.empty()) {
287 for (const auto& addressed : cmd_obj.modules) {
288 if (!addressed.match.empty()) {
289 // Find module names matching the regex
290 for (const std::string& mod_name : cmd_mod_names) {
291 // match module name with regex
292 if (std::regex_match(mod_name, std::regex(addressed.match))) {
293 mod_to_re[mod_name].push_back(addressed.match);
294 }
295 }
296 }
297 }
298
299 // Select modules with multiple matches
300 for (auto i = mod_to_re.begin(), last = mod_to_re.end(); i != last;) {
301 if (i->second.size() == 1) {
302 i = mod_to_re.erase(i);
303 } else {
304 ++i;
305 }
306 }
307
308 // Catch cases
309 if (mod_to_re.size() > 0) {
310 std::string mod_names;
311 for (const auto& [mod_name, matched_re] : mod_to_re) {
312 mod_names += mod_name + ", ";
313 }
314 throw ConflictingCommandMatching(ERS_HERE, id, mod_names);
315 }
316 }
317}
318
319void
320DAQModuleManager::execute(const std::string& cmd, const dataobj_t& cmd_data)
321{
322
323 TLOG_DEBUG(1) << "Command id:" << cmd;
324
325 if (!m_initialized) {
326 throw DAQModuleManagerNotInitialized(ERS_HERE, cmd);
327 }
328
329 check_cmd_data(cmd, cmd_data);
330
331 auto action_plan = m_configuration_mgr->action_plan(cmd);
332 if (action_plan == nullptr) {
333 if (ACTION_PLANS_REQUIRED) {
334 throw ActionPlanNotFound(ERS_HERE, cmd, "Throwing exception");
335 } else if (ACTION_PLANS_REQUIRED_WARNING) {
336 ers::warning(ActionPlanNotFound(ERS_HERE, cmd, "Returning without executing actions"));
337 return;
338 } else {
339 // Emulate old behavior
340 TLOG_DEBUG(1) << ActionPlanNotFound(ERS_HERE, cmd, "Executing action on all modules in parallel");
341 std::string failed_mod_names("");
342 std::unordered_map<std::string, std::future<bool>> futures;
343
344 auto mods = get_modnames_by_cmdid(cmd);
345 for (auto& mod : mods) {
346 TLOG_DEBUG(1) << "Executing action " << cmd << " on module " << mod;
347 auto data_obj = get_dataobj_for_module(mod, cmd_data);
348 futures[mod] = std::async(std::launch::async, &DAQModuleManager::execute_action, this, mod, cmd, data_obj);
349 }
350
351 for (auto& future : futures) {
352 future.second.wait();
353 auto ret = future.second.get();
354 if (!ret) {
355 failed_mod_names.append(future.first);
356 failed_mod_names.append(", ");
357 }
358 }
359 // Throw if any dispatching failed
360 if (!failed_mod_names.empty()) {
361 throw CommandDispatchingFailed(ERS_HERE, cmd, failed_mod_names);
362 }
363 }
364 } else {
365 auto execution_policy = action_plan->get_execution_policy();
366 auto serial_execution = execution_policy == "modules-in-series";
367
368 // We validated the action plans already
369 for (auto& step : action_plan->get_steps()) {
370 execute_action_plan_step(cmd, step, cmd_data, serial_execution);
371 }
372 }
373
374 // Shutdown IOManager at scrap
375 if (cmd == "scrap") {
376 get_iomanager()->shutdown();
377 }
378}
379
380} // namespace dunedaq::appfwk
#define ERS_HERE
Base class for any user define issue.
Definition Issue.hpp:69
#define TLOG_DEBUG(lvl,...)
Definition Logging.hpp:112
dunedaq::conffwk::relationship_t match(T const &, T const &)
std::shared_ptr< DAQModule > make_module(std::string const &plugin_name, std::string const &instance_name)
Load a DAQModule plugin and return a shared_ptr to the contained DAQModule class.
Definition DAQModule.hxx:40
init Command received when already ERS_EMPTY ConflictingCommandMatching
void warning(const Issue &issue)
Definition ers.hpp:115
void error(const Issue &issue)
Definition ers.hpp:81