Line data Source code
1 : /**
2 : * @file ReversedListValidator.cpp ReversedListValidator class
3 : * implementation
4 : *
5 : * This is part of the DUNE DAQ Software Suite, copyright 2020.
6 : * Licensing/copyright details are in the COPYING file that you should have
7 : * received with this code.
8 : */
9 :
10 : #include "ReversedListValidator.hpp"
11 : #include "CommonIssues.hpp"
12 :
13 : #include "listrev/dal/RandomDataListGenerator.hpp"
14 : #include "listrev/dal/RandomListGeneratorSet.hpp"
15 : #include "listrev/dal/ReversedListValidator.hpp"
16 : #include "listrev/opmon/list_rev_info.pb.h"
17 :
18 : #include "appfwk/ConfigurationManager.hpp"
19 : #include "confmodel/Connection.hpp"
20 : #include "iomanager/IOManager.hpp"
21 : #include "logging/Logging.hpp"
22 :
23 : #include <chrono>
24 : #include <functional>
25 : #include <list>
26 : #include <memory>
27 : #include <string>
28 : #include <thread>
29 : #include <utility>
30 : #include <vector>
31 :
32 : /**
33 : * @brief Name used by TRACE TLOG calls from this source file
34 : */
35 : #define TRACE_NAME "ReversedListValidator" // NOLINT
36 : #define TLVL_ENTER_EXIT_METHODS 10 // NOLINT
37 : #define TLVL_LIST_VALIDATION 15 // NOLINT
38 : #define TLVL_REQUEST_SENDING 16 // NOLINT
39 : #define TLVL_PROCESS_LIST 17 // NOLINT
40 :
41 : namespace dunedaq::listrev {
42 :
43 4 : ReversedListValidator::ReversedListValidator(const std::string& name)
44 : : DAQModule(name)
45 4 : , m_work_thread(std::bind(&ReversedListValidator::do_work, this, std::placeholders::_1))
46 : {
47 4 : register_command("start", &ReversedListValidator::do_start);
48 4 : register_command("stop", &ReversedListValidator::do_stop);
49 4 : }
50 :
51 : void
52 4 : ReversedListValidator::init(std::shared_ptr<appfwk::ConfigurationManager> mcfg)
53 : {
54 4 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering init() method";
55 :
56 4 : auto mdal = mcfg->get_dal<dal::ReversedListValidator>(get_name());
57 4 : for (auto con : mdal->get_inputs()) {
58 4 : if (con->get_data_type() == datatype_to_string<ReversedList>()) {
59 4 : m_list_connection = con->UID();
60 : break;
61 : }
62 : }
63 12 : for (auto con : mdal->get_outputs()) {
64 8 : if (con->get_data_type() == datatype_to_string<CreateList>()) {
65 4 : m_create_connection = con->UID();
66 : }
67 8 : if (con->get_data_type() == datatype_to_string<RequestList>()) {
68 4 : m_num_reversers++;
69 4 : m_reveserIds.push_back(con->UID());
70 : }
71 : }
72 :
73 8 : for (auto gen : mdal->get_generatorSet()->get_generators()) {
74 4 : m_generatorIds.push_back(gen->get_generator_id());
75 : }
76 4 : m_num_generators = m_generatorIds.size();
77 :
78 : // these are just tests to check if the connections are ok
79 4 : auto iom = iomanager::IOManager::get();
80 4 : iom->get_receiver<ReversedList>(m_list_connection);
81 4 : iom->get_sender<CreateList>(m_create_connection);
82 :
83 4 : m_send_timeout = std::chrono::milliseconds(mdal->get_send_timeout_ms());
84 4 : m_request_timeout = std::chrono::milliseconds(mdal->get_request_timeout_ms());
85 4 : m_max_outstanding_requests = mdal->get_max_outstanding_requests();
86 :
87 4 : m_list_creator =
88 4 : ListCreator(m_create_connection, m_send_timeout, mdal->get_min_list_size(), mdal->get_max_list_size());
89 :
90 4 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting init() method";
91 4 : }
92 :
93 : void
94 4 : ReversedListValidator::generate_opmon_data()
95 : {
96 4 : opmon::ReversedListValidatorInfo fcr;
97 :
98 8 : fcr.set_total_requests(m_requests_total.load());
99 4 : fcr.set_new_requests(m_new_requests.exchange(0));
100 8 : fcr.set_total_lists(m_total_lists.load());
101 4 : fcr.set_new_lists(m_new_lists.exchange(0));
102 8 : fcr.set_total_valid_pairs(m_total_valid_pairs.load());
103 4 : fcr.set_valid_list_pairs(m_valid_list_pairs.exchange(0));
104 8 : fcr.set_total_invalid_pairs(m_total_invalid_pairs.load());
105 4 : fcr.set_invalid_list_pairs(m_invalid_list_pairs.exchange(0));
106 :
107 4 : publish(std::move(fcr));
108 4 : }
109 :
110 : void
111 4 : ReversedListValidator::do_start(const CommandData_t& /*args*/)
112 : {
113 4 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering do_start() method";
114 4 : m_next_id = 0;
115 4 : m_work_thread.start_working_thread();
116 8 : get_iomanager()->add_callback<ReversedList>(
117 8 : m_list_connection, std::bind(&ReversedListValidator::process_list, this, std::placeholders::_1));
118 8 : TLOG() << get_name() << " successfully started";
119 4 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting do_start() method";
120 4 : }
121 :
122 : void
123 4 : ReversedListValidator::do_stop(const CommandData_t& /*args*/)
124 : {
125 4 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering do_stop() method";
126 4 : m_work_thread.stop_working_thread();
127 4 : std::this_thread::sleep_for(std::chrono::milliseconds(100));
128 :
129 4 : std::chrono::milliseconds stop_timeout(5000);
130 4 : auto stop_wait = std::chrono::steady_clock::now();
131 4 : size_t outstanding_wait = 1;
132 1988 : while (outstanding_wait > 0 && std::chrono::duration_cast<std::chrono::milliseconds>(
133 1988 : std::chrono::steady_clock::now() - stop_wait) < stop_timeout) {
134 1984 : std::this_thread::sleep_for(std::chrono::milliseconds(10));
135 1984 : std::lock_guard<std::mutex> lk(m_outstanding_id_mutex);
136 1984 : outstanding_wait = m_outstanding_ids.size();
137 1984 : }
138 :
139 8 : TLOG() << get_name() << " Removing callback, there are " << outstanding_wait << " requests left outstanding.";
140 :
141 4 : get_iomanager()->remove_callback<ReversedList>(m_list_connection);
142 8 : TLOG() << get_name() << " successfully stopped";
143 :
144 4 : std::ostringstream oss_summ;
145 8 : oss_summ << ": Exiting do_stop() method, received " << m_total_lists.load() << " reversed list messages, "
146 12 : << "compared " << m_total_valid_pairs.load() + m_total_invalid_pairs.load()
147 8 : << " reversed lists to their original data, and found " << m_total_invalid_pairs.load() << " mismatches. ";
148 4 : ers::info(ProgressUpdate(ERS_HERE, get_name(), oss_summ.str()));
149 :
150 4 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting do_stop() method";
151 4 : }
152 :
153 : /**
154 : * @brief Format a std::vector<int> to a stream
155 : * @param t ostream Instance
156 : * @param ints Vector to format
157 : * @return ostream Instance
158 : */
159 : std::ostream&
160 8 : operator<<(std::ostream& t, std::vector<int> ints)
161 : {
162 8 : t << "{";
163 8 : bool first = true;
164 90 : for (auto& i : ints) {
165 82 : if (!first)
166 74 : t << ", ";
167 82 : first = false;
168 82 : t << i;
169 : }
170 8 : return t << "}";
171 : }
172 :
173 : void
174 4 : ReversedListValidator::do_work(std::atomic<bool>& running_flag)
175 : {
176 4 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering do_work() method";
177 4 : m_request_start = std::chrono::steady_clock::now();
178 :
179 1736500 : while (running_flag.load()) {
180 1736492 : TLOG_DEBUG(TLVL_LIST_VALIDATION) << get_name() << ": Locking out id list";
181 1736492 : std::lock_guard<std::mutex> lk(m_outstanding_id_mutex);
182 :
183 1736492 : TLOG_DEBUG(TLVL_LIST_VALIDATION) << get_name() << ": Sending new requests";
184 3473021 : auto next_req_time = [&]() {
185 1736529 : auto ms = 1000.0 / m_request_rate_hz;
186 1736529 : auto off = ms * m_next_id;
187 1736529 : return m_request_start + std::chrono::milliseconds(static_cast<int>(off));
188 1736492 : };
189 :
190 1736529 : while (m_outstanding_ids.size() < m_max_outstanding_requests &&
191 1736529 : std::chrono::steady_clock::now() > next_req_time()) {
192 37 : auto size = m_list_creator.send_create(++m_next_id); // NOLINT(runtime/increment_decrement)
193 37 : m_outstanding_ids[m_next_id] = OutstandingList(size);
194 37 : send_request(m_next_id);
195 37 : ++m_requests_total;
196 37 : ++m_new_requests;
197 : }
198 :
199 1736492 : TLOG_DEBUG(TLVL_LIST_VALIDATION) << get_name() << ": End of do_work loop";
200 1736492 : }
201 :
202 4 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting do_work() method";
203 4 : }
204 :
205 : void
206 3 : ReversedListValidator::process_list(const ReversedList& list)
207 : {
208 3 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering process_list() method";
209 :
210 3 : ++m_total_lists;
211 3 : ++m_new_lists;
212 :
213 3 : size_t requested_size = 0;
214 3 : if (m_outstanding_ids.count(list.list_id)) {
215 3 : requested_size = m_outstanding_ids.at(list.list_id).size;
216 : }
217 :
218 3 : std::ostringstream oss_prog;
219 3 : oss_prog << "Validating list set #" << list.list_id << " with requested size " << requested_size << " from reverser " << list.reverser_id
220 3 : << ". ";
221 3 : TLOG_DEBUG() << ProgressUpdate(ERS_HERE, get_name(), oss_prog.str());
222 :
223 3 : if (list.lists.size() != m_num_generators) {
224 0 : ers::error(MissingListError(ERS_HERE, get_name(), list.list_id, m_num_generators, list.lists.size()));
225 : }
226 :
227 6 : for (auto& list_data : list.lists) {
228 :
229 3 : std::ostringstream oss_prog;
230 3 : oss_prog << "Validating list #" << list.list_id << " from generator " << list_data.original.generator_id
231 3 : << ", original contents " << list_data.original.list << " and reversed contents "
232 3 : << list_data.reversed.list << ". ";
233 3 : TLOG_DEBUG() << ProgressUpdate(ERS_HERE, get_name(), oss_prog.str());
234 :
235 3 : if (list_data.original.list.size() != requested_size || list_data.reversed.list.size() != requested_size) {
236 1 : ers::error(ListSizeError(ERS_HERE, get_name(), list.list_id, requested_size, list_data.original.list.size(), list_data.reversed.list.size()));
237 1 : ++m_invalid_list_pairs;
238 1 : ++m_total_invalid_pairs;
239 1 : continue;
240 : }
241 :
242 2 : TLOG_DEBUG(TLVL_LIST_VALIDATION)
243 2 : << get_name() << ": Re-reversing the reversed list so that it can be compared to the original list";
244 2 : auto reversed = list_data.reversed.list;
245 2 : std::reverse(reversed.begin(), reversed.end());
246 :
247 2 : TLOG_DEBUG(TLVL_LIST_VALIDATION) << get_name() << ": Comparing the doubly-reversed list with the original list";
248 2 : if (reversed != list_data.original.list) {
249 1 : std::ostringstream oss_rev;
250 1 : oss_rev << reversed;
251 1 : std::ostringstream oss_orig;
252 1 : oss_orig << list_data.original.list;
253 1 : ers::error(DataMismatchError(ERS_HERE, get_name(), list.list_id, oss_rev.str(), oss_orig.str()));
254 1 : ++m_invalid_list_pairs;
255 1 : ++m_total_invalid_pairs;
256 1 : } else {
257 1 : ++m_valid_list_pairs;
258 1 : ++m_total_valid_pairs;
259 : }
260 3 : }
261 :
262 3 : std::lock_guard<std::mutex> lk(m_outstanding_id_mutex);
263 3 : m_outstanding_ids.erase(list.list_id);
264 :
265 3 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting process_list() method";
266 3 : }
267 :
268 : void
269 37 : ReversedListValidator::send_request(int id)
270 : {
271 37 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering send_request() method";
272 :
273 37 : auto reverser_id = id % m_num_reversers;
274 :
275 37 : RequestList req;
276 37 : req.list_id = id;
277 37 : req.destination = m_list_connection;
278 :
279 37 : get_iomanager()->get_sender<RequestList>(m_reveserIds[reverser_id])->send(std::move(req), m_send_timeout);
280 :
281 37 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting send_request() method";
282 37 : }
283 :
284 : } // namespace dunedaq::listrev
285 :
286 4 : DEFINE_DUNE_DAQ_MODULE(dunedaq::listrev::ReversedListValidator)
287 :
288 : // Local Variables:
289 : // c-basic-offset: 2
290 : // End:
|