DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
DAQModuleManager.cpp
Go to the documentation of this file.
1
10
11#include "appfwk/DAQModule.hpp"
12#include "appfwk/cmd/Nljs.hpp"
13
14#include "cmdlib/cmd/Nljs.hpp"
18#include "confmodel/Session.hpp"
20#include "logging/Logging.hpp"
21
22#include <future>
23#include <map>
24#include <memory>
25#include <regex>
26#include <string>
27#include <unordered_map>
28#include <utility>
29#include <vector>
30
31namespace dunedaq::appfwk {
32
33DAQModuleManager::DAQModuleManager(const std::string& session_name)
34 : m_session_name(session_name)
35 , m_initialized(false)
36{
37}
38
39void
40DAQModuleManager::initialize(std::shared_ptr<ConfigurationManager> cfgMgr, opmonlib::OpMonManager& opm)
41{
42 m_configuration_mgr = cfgMgr; // Make a copy
43 cfgMgr->initialize();
44 get_iomanager()->configure(m_session_name,
45 m_configuration_mgr->queues(),
46 m_configuration_mgr->networkconnections(),
47 m_configuration_mgr->connectivity_service(),
48 opm);
49 init_modules(m_configuration_mgr->modules(), opm);
50
51 for (auto& plan_pair : m_configuration_mgr->action_plans()) {
52 auto cmd = plan_pair.first;
53
54 for (auto& step : plan_pair.second->get_steps()) {
55 auto byType = step->cast<confmodel::DaqModulesGroupByType>();
56 auto byMod = step->cast<confmodel::DaqModulesGroupById>();
57 if (byType != nullptr) {
58 for (auto& mod_type : byType->get_modules()) {
59 check_mod_has_cmd(cmd, mod_type);
60 }
61 } else if (byMod != nullptr) {
62 for (auto& mod : byMod->get_modules()) {
63 check_mod_has_cmd(cmd, mod->class_name(), mod->UID());
64 }
65 } else {
66 throw ActionPlanValidationFailed(ERS_HERE, cmd, "", "Invalid subclass of DaqModulesGroup encountered!");
67 }
68 }
69 }
70 this->m_initialized = true;
71}
72
73void
74DAQModuleManager::check_mod_has_cmd(const std::string& cmd, const std::string& mod_class, const std::string& mod_id)
75{
76
77 if (!m_modules_by_type.count(mod_class) || m_modules_by_type[mod_class].size() == 0) {
78 if (mod_id == "") {
79 ers::info(ActionPlanValidationFailed(ERS_HERE, cmd, mod_class, "Module does not exist"));
80 return;
81 } else {
82 throw ActionPlanValidationFailed(ERS_HERE, cmd, mod_class, "Module does not exist");
83 }
84 }
85
86 auto module_test = m_module_map[m_modules_by_type[mod_class][0]];
87 if (mod_id != "") {
88 bool match = false;
89 for (auto& mod_name : m_modules_by_type[mod_class]) {
90 if (mod_id == mod_name) {
91 module_test = m_module_map[mod_name];
92 match = true;
93 break;
94 }
95 }
96 if (!match) {
97 throw ActionPlanValidationFailed(ERS_HERE, cmd, mod_class, "No module with id " + mod_id + " found.");
98 }
99 }
100
101 if (!module_test->has_command(cmd)) {
102 throw ActionPlanValidationFailed(ERS_HERE, cmd, mod_class, "Module does not have method " + cmd);
103 }
104}
105
106void
107DAQModuleManager::init_modules(const std::vector<const dunedaq::confmodel::DaqModule*>& modules,
108 opmonlib::OpMonManager& opm)
109{
110 for (const auto mod : modules) {
111 TLOG_DEBUG(0) << "construct: " << mod->class_name() << " : " << mod->UID();
112 auto mptr = make_module(mod->class_name(), mod->UID());
113 m_module_map.emplace(mod->UID(), mptr);
114
115 if (!m_modules_by_type.count(mod->class_name())) {
116 m_modules_by_type[mod->class_name()] = std::vector<std::string>();
117 }
118 m_modules_by_type[mod->class_name()].emplace_back(mod->UID());
119
120 opm.register_node(mod->UID(), mptr);
121
122 try {
123 mptr->init(m_configuration_mgr);
124 } catch (ers::Issue& ex) {
125 throw DAQModuleInitFailed(ERS_HERE, mod->UID(), ex);
126 }
127 }
128}
129
130void
131DAQModuleManager::cleanup()
132{
133 get_iomanager()->reset();
134 this->m_initialized = false;
135}
136
137DAQModuleManager::dataobj_t
138DAQModuleManager::get_dataobj_for_module(const std::string& mod_name, const dataobj_t& cmd_data)
139{
140 auto cmd_obj = cmd_data.get<cmd::CmdObj>();
141 const dataobj_t dummy{};
142
143 if (!cmd_obj.modules.empty()) {
144 for (const auto& addressed : cmd_obj.modules) {
145
146 // First exception: empty = `all`
147 if (addressed.match.empty()) {
148 return addressed.data;
149 } else {
150 // match module name with regex
151 if (std::regex_match(mod_name, std::regex(addressed.match))) {
152 return addressed.data;
153 }
154 }
155 }
156 }
157 // No matches
158 return dummy;
159}
160
161bool
162DAQModuleManager::execute_action(const std::string& module_name, const std::string& action, const dataobj_t& data_obj)
163{
164 try {
165 TLOG_DEBUG(2) << "Executing " << module_name << " -> " << action;
166 m_module_map[module_name]->execute_command(action, data_obj);
167 } catch (ers::Issue& ex) {
168 ers::error(ex);
169 return false;
170 }
171 return true;
172}
173
174void
175DAQModuleManager::execute_action_plan_step(std::string const& cmd,
176 const confmodel::DaqModulesGroup* step,
177 const dataobj_t& cmd_data,
178 bool execution_mode_is_serial)
179{
180 std::string failed_mod_names("");
181 std::unordered_map<std::string, std::future<bool>> futures;
182
183 auto byType = step->cast<confmodel::DaqModulesGroupByType>();
184 auto byMod = step->cast<confmodel::DaqModulesGroupById>();
185 if (byType != nullptr) {
186 for (auto& mod_class : byType->get_modules()) {
187 auto modules = m_modules_by_type[mod_class];
188 for (auto& mod_name : modules) {
189 auto data_obj = get_dataobj_for_module(mod_name, cmd_data);
190 TLOG_DEBUG(1) << "Executing action " << cmd << " on module " << mod_name << " (class " << mod_class << ")";
191 futures[mod_name] =
192 std::async(std::launch::async, &DAQModuleManager::execute_action, this, mod_name, cmd, data_obj);
193 if (execution_mode_is_serial)
194 futures[mod_name].wait();
195 }
196 }
197 } else if (byMod != nullptr) {
198 for (auto& mod : byMod->get_modules()) {
199 auto mod_name = mod->UID();
200 auto data_obj = get_dataobj_for_module(mod_name, cmd_data);
201 TLOG_DEBUG(1) << "Executing action " << cmd << " on module " << mod_name << " (class " << mod->class_name()
202 << ")";
203 futures[mod_name] =
204 std::async(std::launch::async, &DAQModuleManager::execute_action, this, mod_name, cmd, data_obj);
205 if (execution_mode_is_serial)
206 futures[mod_name].wait();
207 }
208 } else {
209 throw CommandDispatchingFailed(ERS_HERE, cmd, "Could not get DaqModulesGroup!");
210 }
211
212 for (auto& future : futures) {
213 future.second.wait();
214 auto ret = future.second.get();
215 if (!ret) {
216 failed_mod_names.append(future.first);
217 failed_mod_names.append(", ");
218 }
219 }
220 // Throw if any dispatching failed
221 if (!failed_mod_names.empty()) {
222 throw CommandDispatchingFailed(ERS_HERE, cmd, failed_mod_names);
223 }
224}
225
226std::vector<std::string>
227DAQModuleManager::get_modnames_by_cmdid(cmdlib::cmd::CmdId id)
228{
229 // Make a convenience array with module names that have the requested command
230 std::vector<std::string> mod_names;
231 for (const auto& [mod_name, mod_ptr] : m_module_map) {
232 if (mod_ptr->has_command(id))
233 mod_names.push_back(mod_name);
234 }
235
236 return mod_names;
237}
238
239void
240DAQModuleManager::check_cmd_data(const std::string& id, const dataobj_t& cmd_data)
241{
242 // This method ensures that each module is only matched once per command.
243 // If multiple matches are found, an ers::Issue is thrown
244 // Disclaimenr for the occasional reader: this is the first implementation of the
245 // multiple-matches detection logic. The author is painfully aware that it can be
246 // vastly improved, in style if not in performance.
247
248 auto cmd_obj = cmd_data.get<cmd::CmdObj>();
249 const dataobj_t dummy{};
250
251 // Make a convenience array with module names that have the requested command
252 std::vector<std::string> cmd_mod_names = get_modnames_by_cmdid(id);
253
254 // containers for error tracking
255 std::map<std::string, std::vector<std::string>> mod_to_re;
256
257 if (!cmd_obj.modules.empty()) {
258 for (const auto& addressed : cmd_obj.modules) {
259 if (!addressed.match.empty()) {
260 // Find module names matching the regex
261 for (const std::string& mod_name : cmd_mod_names) {
262 // match module name with regex
263 if (std::regex_match(mod_name, std::regex(addressed.match))) {
264 mod_to_re[mod_name].push_back(addressed.match);
265 }
266 }
267 }
268 }
269
270 // Select modules with multiple matches
271 for (auto i = mod_to_re.begin(), last = mod_to_re.end(); i != last;) {
272 if (i->second.size() == 1) {
273 i = mod_to_re.erase(i);
274 } else {
275 ++i;
276 }
277 }
278
279 // Catch cases
280 if (mod_to_re.size() > 0) {
281 std::string mod_names;
282 for (const auto& [mod_name, matched_re] : mod_to_re) {
283 mod_names += mod_name + ", ";
284 }
285 throw ConflictingCommandMatching(ERS_HERE, id, mod_names);
286 }
287 }
288}
289
290void
291DAQModuleManager::execute(const std::string& cmd, const dataobj_t& cmd_data)
292{
293
294 TLOG_DEBUG(1) << "Command id:" << cmd;
295
296 if (!m_initialized) {
297 throw DAQModuleManagerNotInitialized(ERS_HERE, cmd);
298 }
299
300 check_cmd_data(cmd, cmd_data);
301
302 auto action_plan = m_configuration_mgr->action_plan(cmd);
303 if (action_plan == nullptr) {
304#if 0
305 throw ActionPlanNotFound(ERS_HERE, cmd, "Throwing exception");
306#elif 0
307 ers::warning(ActionPlanNotFound(ERS_HERE, cmd, "Returning without executing actions"));
308 return;
309#else
310 // Emulate old behavior
311 TLOG_DEBUG(1) << ActionPlanNotFound(ERS_HERE, cmd, "Executing action on all modules in parallel");
312 std::string failed_mod_names("");
313 std::unordered_map<std::string, std::future<bool>> futures;
314
315 auto mods = get_modnames_by_cmdid(cmd);
316 for (auto& mod : mods) {
317 TLOG_DEBUG(1) << "Executing action " << cmd << " on module " << mod;
318 auto data_obj = get_dataobj_for_module(mod, cmd_data);
319 futures[mod] = std::async(std::launch::async, &DAQModuleManager::execute_action, this, mod, cmd, data_obj);
320 }
321
322 for (auto& future : futures) {
323 future.second.wait();
324 auto ret = future.second.get();
325 if (!ret) {
326 failed_mod_names.append(future.first);
327 failed_mod_names.append(", ");
328 }
329 }
330 // Throw if any dispatching failed
331 if (!failed_mod_names.empty()) {
332 throw CommandDispatchingFailed(ERS_HERE, cmd, failed_mod_names);
333 }
334#endif
335 } else {
336 auto execution_policy = action_plan->get_execution_policy();
337 auto serial_execution = execution_policy == "modules-in-series";
338
339 // We validated the action plans already
340 for (auto& step : action_plan->get_steps()) {
341 execute_action_plan_step(cmd, step, cmd_data, serial_execution);
342 }
343 }
344
345 // Shutdown IOManager at scrap
346 if (cmd == "scrap") {
347 get_iomanager()->shutdown();
348 }
349}
350
351} // namespace dunedaq::appfwk
#define ERS_HERE
Base class for any user define issue.
Definition Issue.hpp:69
#define TLOG_DEBUG(lvl,...)
Definition Logging.hpp:112
dunedaq::conffwk::relationship_t match(T const &, T const &)
std::shared_ptr< DAQModule > make_module(std::string const &plugin_name, std::string const &instance_name)
Load a DAQModule plugin and return a shared_ptr to the contained DAQModule class.
Definition DAQModule.hxx:35
init Command received when already ERS_EMPTY ConflictingCommandMatching
void warning(const Issue &issue)
Definition ers.hpp:115
void info(const Issue &issue)
Definition ers.hpp:95
void error(const Issue &issue)
Definition ers.hpp:81