Line data Source code
1 : /**
2 : * @file RandomDataListGenerator.cpp RandomDataListGenerator class
3 : * implementation
4 : *
5 : * This is part of the DUNE DAQ Software Suite, copyright 2020.
6 : * Licensing/copyright details are in the COPYING file that you should have
7 : * received with this code.
8 : */
9 :
10 : #include "listrev/dal/RandomDataListGenerator.hpp"
11 : #include "listrev/opmon/list_rev_info.pb.h"
12 :
13 : #include "CommonIssues.hpp"
14 : #include "RandomDataListGenerator.hpp"
15 :
16 : #include "appfwk/ConfigurationManager.hpp"
17 :
18 : #include "confmodel/Connection.hpp"
19 :
20 : #include "iomanager/IOManager.hpp"
21 : #include "logging/Logging.hpp"
22 :
23 : #include <chrono>
24 : #include <cstdlib>
25 : #include <memory>
26 : #include <set>
27 : #include <string>
28 : #include <thread>
29 : #include <utility>
30 : #include <vector>
31 :
32 : /**
33 : * @brief Name used by TRACE TLOG calls from this source file
34 : */
35 : #define TRACE_NAME "RandomDataListGenerator" // NOLINT
36 : #define TLVL_ENTER_EXIT_METHODS 10 // NOLINT
37 : #define TLVL_LIST_GENERATION 15 // NOLINT
38 :
39 : namespace dunedaq::listrev {
40 :
41 2 : RandomDataListGenerator::RandomDataListGenerator(const std::string& name)
42 2 : : dunedaq::appfwk::DAQModule(name)
43 : {
44 2 : register_command("conf", &RandomDataListGenerator::do_conf);
45 2 : register_command("start", &RandomDataListGenerator::do_start);
46 2 : register_command("stop", &RandomDataListGenerator::do_stop);
47 2 : register_command("scrap", &RandomDataListGenerator::do_unconfigure);
48 2 : register_command("hello", &RandomDataListGenerator::do_hello);
49 2 : }
50 :
51 : void
52 2 : RandomDataListGenerator::init(std::shared_ptr<appfwk::ConfigurationManager> mcfg)
53 : {
54 2 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering init() method";
55 2 : auto mdal = mcfg->get_dal<dal::RandomDataListGenerator>(get_name());
56 :
57 2 : if (mdal == nullptr) {
58 0 : throw appfwk::CommandFailed(ERS_HERE, get_name(), "init", "Unable to load module configuration");
59 : }
60 :
61 6 : for (auto con : mdal->get_inputs()) {
62 4 : if (con->get_data_type() == datatype_to_string<CreateList>()) {
63 2 : m_create_connection = con->UID();
64 : }
65 4 : if (con->get_data_type() == datatype_to_string<RequestList>()) {
66 2 : m_request_connection = con->UID();
67 : }
68 : }
69 :
70 : // these are just tests to check if the connections are ok
71 2 : auto iom = iomanager::IOManager::get();
72 2 : iom->get_receiver<RequestList>(m_request_connection);
73 2 : iom->get_receiver<CreateList>(m_create_connection, get_name());
74 :
75 2 : m_send_timeout = std::chrono::milliseconds(mdal->get_send_timeout_ms());
76 2 : m_request_timeout = std::chrono::milliseconds(mdal->get_request_timeout_ms());
77 2 : m_generator_id = mdal->get_generator_id();
78 2 : m_list_mode =
79 2 : static_cast<ListMode>(m_generator_id % (static_cast<uint16_t>(ListMode::MAX) + 1)); // NOLINT(build/unsigned)
80 :
81 2 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting init() method";
82 2 : }
83 :
84 : void
85 3 : RandomDataListGenerator::generate_opmon_data()
86 : {
87 3 : opmon::RandomListGeneratorInfo fcr;
88 :
89 6 : fcr.set_generated_numbers(m_generated_tot.load());
90 3 : fcr.set_new_generated_numbers(m_generated.exchange(0));
91 6 : fcr.set_lists_sent(m_sent_tot.load());
92 3 : fcr.set_new_lists_sent(m_sent.exchange(0));
93 :
94 3 : publish(std::move(fcr));
95 3 : }
96 :
97 : void
98 2 : RandomDataListGenerator::do_conf(const CommandData_t& /*args*/)
99 : {
100 2 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering do_conf() method";
101 :
102 2 : auto iom = iomanager::IOManager::get();
103 : // Add this callback early as this is a pub/sub connection
104 2 : iom->add_callback<CreateList>(m_create_connection,
105 : get_name(),
106 2 : std::bind(&RandomDataListGenerator::process_create_list, this, std::placeholders::_1));
107 :
108 4 : TLOG() << get_name() << " successfully configured";
109 2 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting do_conf() method";
110 2 : }
111 :
112 : void
113 2 : RandomDataListGenerator::do_start(const CommandData_t& /*args*/)
114 : {
115 2 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering do_start() method";
116 :
117 2 : auto iom = iomanager::IOManager::get();
118 2 : iom->add_callback<RequestList>(
119 2 : m_request_connection, std::bind(&RandomDataListGenerator::process_request_list, this, std::placeholders::_1));
120 :
121 4 : TLOG() << get_name() << " successfully started";
122 2 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting do_start() method";
123 2 : }
124 :
125 : void
126 2 : RandomDataListGenerator::do_stop(const CommandData_t& /*args*/)
127 : {
128 2 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering do_stop() method";
129 :
130 2 : std::this_thread::sleep_for(std::chrono::milliseconds(100));
131 2 : std::chrono::milliseconds stop_timeout(5000);
132 2 : auto stop_wait = std::chrono::steady_clock::now();
133 4 : size_t outstanding_wait = m_generated_tot.load() - m_sent_tot.load();
134 2 : while (outstanding_wait > 0 && std::chrono::duration_cast<std::chrono::milliseconds>(
135 0 : std::chrono::steady_clock::now() - stop_wait) < stop_timeout) {
136 0 : std::this_thread::sleep_for(std::chrono::milliseconds(10));
137 0 : outstanding_wait = m_generated_tot.load() - m_sent_tot.load();
138 : }
139 :
140 2 : auto iom = iomanager::IOManager::get();
141 2 : iom->remove_callback<RequestList>(m_request_connection);
142 2 : iom->remove_callback<CreateList>(m_create_connection, get_name());
143 2 : m_storage.flush();
144 :
145 4 : TLOG() << get_name() << " successfully stopped";
146 :
147 2 : std::ostringstream oss_summ;
148 4 : oss_summ << ": Exiting do_stop() method, " << "generated " << m_generated_tot.load() << " lists, " << "and sent "
149 4 : << m_sent_tot.load() << " list messages";
150 2 : ers::info(ProgressUpdate(ERS_HERE, get_name(), oss_summ.str()));
151 :
152 2 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting do_stop() method";
153 2 : }
154 :
155 : void
156 2 : RandomDataListGenerator::do_unconfigure(const CommandData_t& /*args*/)
157 : {
158 2 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering do_unconfigure() method";
159 :
160 2 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting do_unconfigure() method";
161 2 : }
162 :
163 : void
164 1 : RandomDataListGenerator::do_hello(const CommandData_t& /*args*/)
165 : {
166 1 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering hello() method";
167 2 : TLOG() << "Hello my friend!";
168 1 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting do_hello() method";
169 1 : }
170 :
171 : /**
172 : * @brief Format a std::vector<int> to a stream
173 : * @param t ostream Instance
174 : * @param ints Vector to format
175 : * @return ostream Instance
176 : */
177 : std::ostream&
178 2 : operator<<(std::ostream& t, std::vector<int> ints)
179 : {
180 2 : t << "{";
181 2 : bool first = true;
182 23 : for (auto& i : ints) {
183 21 : if (!first)
184 19 : t << ", ";
185 21 : first = false;
186 21 : t << i;
187 : }
188 2 : return t << "}";
189 : }
190 :
191 : void
192 2 : RandomDataListGenerator::process_create_list(const CreateList& create_request)
193 : {
194 2 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering process_create_list() method";
195 2 : std::vector<int> theList(create_request.list_size);
196 :
197 2 : TLOG_DEBUG(TLVL_LIST_GENERATION) << get_name() << ": Start of fill loop";
198 23 : for (size_t idx = 0; idx < create_request.list_size; ++idx) {
199 21 : switch (m_list_mode) {
200 21 : case ListMode::Random:
201 21 : theList[idx] = (rand() % 1000) + 1; // NOLINT, as we don't need *true* randomness here
202 21 : break;
203 0 : case ListMode::Ascending:
204 0 : theList[idx] = create_request.list_id + idx;
205 0 : break;
206 0 : case ListMode::Evens:
207 0 : theList[idx] = (create_request.list_id % 2 == 0 ? 0 : 1) + create_request.list_id + idx * 2;
208 0 : break;
209 0 : case ListMode::Odds:
210 0 : theList[idx] = (create_request.list_id % 2 == 0 ? 1 : 0) + create_request.list_id + idx * 2;
211 0 : break;
212 0 : case ListMode::Descending:
213 0 : theList[idx] = create_request.list_id - idx;
214 0 : break;
215 : }
216 : }
217 2 : ++m_generated_tot;
218 2 : ++m_generated;
219 4 : std::ostringstream oss_prog;
220 4 : oss_prog << "Generated list #" << create_request.list_id << " with contents " << theList << " and size "
221 2 : << theList.size() << ". ";
222 2 : TLOG_DEBUG() << ProgressUpdate(ERS_HERE, get_name(), oss_prog.str());
223 :
224 2 : m_storage.add_list(IntList(create_request.list_id, m_generator_id, theList));
225 :
226 2 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting process_create_list() method";
227 2 : }
228 :
229 : void
230 3 : RandomDataListGenerator::process_request_list(const RequestList& request)
231 : {
232 3 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering process_request_list() method";
233 3 : auto start = std::chrono::steady_clock::now();
234 3 : IntList output;
235 3 : bool list_found = false;
236 :
237 65004 : while (std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - start) <
238 65001 : m_request_timeout) {
239 65000 : if (m_storage.has_list(request.list_id)) {
240 2 : output = m_storage.get_list(request.list_id);
241 2 : list_found = true;
242 2 : break;
243 : }
244 64998 : std::this_thread::sleep_for(std::chrono::microseconds(100));
245 : }
246 :
247 3 : if (!list_found) {
248 1 : std::ostringstream oss_warn;
249 1 : oss_warn << "wait for list \"" << request.list_id << "\"";
250 1 : ers::warning(dunedaq::iomanager::TimeoutExpired(ERS_HERE, get_name(), oss_warn.str(), m_request_timeout.count()));
251 1 : return;
252 1 : }
253 :
254 2 : try {
255 2 : dunedaq::get_iomanager()->get_sender<IntList>(request.destination)->send(std::move(output), m_send_timeout);
256 :
257 2 : ++m_sent;
258 2 : ++m_sent_tot;
259 0 : } catch (const dunedaq::iomanager::TimeoutExpired& excpt) {
260 0 : std::ostringstream oss_warn;
261 0 : oss_warn << "send to destination \"" << request.destination << "\"";
262 0 : ers::warning(dunedaq::iomanager::TimeoutExpired(ERS_HERE, get_name(), oss_warn.str(), m_send_timeout.count()));
263 0 : }
264 2 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting process_request_list() method";
265 3 : }
266 :
267 : } // namespace dunedaq::listrev
268 :
269 2 : DEFINE_DUNE_DAQ_MODULE(dunedaq::listrev::RandomDataListGenerator)
270 :
271 : // Local Variables:
272 : // c-basic-offset: 2
273 : // End:
|