Line data Source code
1 : /**
2 : * @file ListReverser.cpp ListReverser class
3 : * implementation
4 : *
5 : * This is part of the DUNE DAQ Software Suite, copyright 2020.
6 : * Licensing/copyright details are in the COPYING file that you should have
7 : * received with this code.
8 : */
9 :
10 : #include "listrev/dal/ListReverser.hpp"
11 : #include "listrev/dal/RandomDataListGenerator.hpp"
12 : #include "listrev/dal/RandomListGeneratorSet.hpp"
13 :
14 : #include "listrev/opmon/list_rev_info.pb.h"
15 :
16 : #include "CommonIssues.hpp"
17 : #include "ListReverser.hpp"
18 :
19 : #include "appfwk/ConfigurationManager.hpp"
20 : #include "confmodel/Connection.hpp"
21 :
22 : #include "iomanager/IOManager.hpp"
23 : #include "logging/Logging.hpp"
24 :
25 : #include <chrono>
26 : #include <memory>
27 : #include <string>
28 : #include <thread>
29 : #include <utility>
30 : #include <vector>
31 :
32 : /**
33 : * @brief Name used by TRACE TLOG calls from this source file
34 : */
35 : #define TRACE_NAME "ListReverser" // NOLINT
36 : #define TLVL_ENTER_EXIT_METHODS 10 // NOLINT
37 : #define TLVL_LIST_REVERSAL 15 // NOLINT
38 : #define TLVL_REQUEST_SENDING 16 // NOLINT
39 : #define TLVL_CONFIGURE 17 // NOLINT
40 :
41 : namespace dunedaq::listrev {
42 :
43 3 : ListReverser::ListReverser(const std::string& name)
44 3 : : DAQModule(name)
45 : {
46 3 : register_command("start", &ListReverser::do_start);
47 3 : register_command("stop", &ListReverser::do_stop);
48 3 : }
49 :
50 : void
51 3 : ListReverser::init(std::shared_ptr<appfwk::ConfigurationManager> mcfg)
52 : {
53 3 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering init() method";
54 3 : auto mdal = mcfg->get_dal<dal::ListReverser>(get_name());
55 9 : for (auto con : mdal->get_inputs()) {
56 6 : if (con->get_data_type() == datatype_to_string<IntList>()) {
57 3 : m_list_connection = con->UID();
58 : }
59 6 : if (con->get_data_type() == datatype_to_string<RequestList>()) {
60 3 : m_requests = con->UID();
61 : }
62 : }
63 :
64 3 : try {
65 3 : get_iom_receiver<IntList>(m_list_connection);
66 0 : } catch (const ers::Issue& excpt) {
67 0 : throw InvalidQueueFatalError(ERS_HERE, get_name(), "input", excpt);
68 0 : }
69 3 : try {
70 3 : get_iom_receiver<RequestList>(m_requests);
71 0 : } catch (const ers::Issue& excpt) {
72 0 : throw InvalidQueueFatalError(ERS_HERE, get_name(), "output", excpt);
73 0 : }
74 :
75 9 : for (auto con : mdal->get_outputs()) {
76 6 : if (con->get_data_type() == datatype_to_string<RequestList>()) {
77 3 : m_generator_connections.push_back(con->UID());
78 : }
79 : }
80 :
81 3 : m_send_timeout = std::chrono::milliseconds(mdal->get_send_timeout_ms());
82 3 : m_request_timeout = std::chrono::milliseconds(mdal->get_request_timeout_ms());
83 3 : m_reverser_id = mdal->get_reverser_id();
84 :
85 3 : TLOG_DEBUG(TLVL_CONFIGURE) << "ListReverser " << m_reverser_id << " configured with " << "send timeout "
86 0 : << mdal->get_send_timeout_ms() << " ms," << " request timeout "
87 0 : << mdal->get_request_timeout_ms() << "ms, " << " and " << m_generator_connections.size()
88 3 : << " generators.";
89 :
90 3 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting init() method";
91 3 : }
92 :
93 : void
94 3 : ListReverser::generate_opmon_data()
95 : {
96 3 : opmon::ListReverserInfo fcr;
97 :
98 3 : fcr.set_requests_received(m_requests_received.exchange(0));
99 3 : fcr.set_requests_sent(m_requests_sent.exchange(0));
100 3 : fcr.set_lists_received(m_lists_received.exchange(0));
101 3 : fcr.set_lists_sent(m_lists_sent.exchange(0));
102 6 : fcr.set_total_requests_received(m_total_requests_received.load());
103 6 : fcr.set_total_requests_sent(m_total_requests_sent.load());
104 6 : fcr.set_total_lists_received(m_total_lists_received.load());
105 6 : fcr.set_total_lists_sent(m_total_lists_sent.load());
106 :
107 3 : publish(std::move(fcr));
108 3 : }
109 :
110 : void
111 3 : ListReverser::do_start(const CommandData_t& /*startobj*/)
112 : {
113 3 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering do_start() method";
114 6 : get_iomanager()->add_callback<IntList>(m_list_connection,
115 6 : std::bind(&ListReverser::process_list, this, std::placeholders::_1));
116 6 : get_iomanager()->add_callback<RequestList>(
117 6 : m_requests, std::bind(&ListReverser::process_list_request, this, std::placeholders::_1));
118 :
119 6 : TLOG() << get_name() << " successfully started";
120 3 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting do_start() method";
121 3 : }
122 :
123 : void
124 3 : ListReverser::do_stop(const CommandData_t& /*stopobj*/)
125 : {
126 3 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering do_stop() method";
127 :
128 3 : std::this_thread::sleep_for(std::chrono::milliseconds(100));
129 3 : std::chrono::milliseconds stop_timeout(5000);
130 3 : auto stop_wait = std::chrono::steady_clock::now();
131 6 : size_t outstanding_wait = (m_total_requests_received.load() - m_total_lists_sent.load()) + // Requests from validator
132 6 : (m_total_requests_sent.load() - m_total_lists_received.load()); // Requests to generator
133 499 : while (outstanding_wait > 0 && std::chrono::duration_cast<std::chrono::milliseconds>(
134 497 : std::chrono::steady_clock::now() - stop_wait) < stop_timeout) {
135 496 : std::this_thread::sleep_for(std::chrono::milliseconds(10));
136 992 : outstanding_wait = (m_total_requests_received.load() - m_total_lists_sent.load()) + // Requests from validator
137 1488 : (m_total_requests_sent.load() - m_total_lists_received.load()); // Requests to generator
138 : }
139 :
140 3 : get_iomanager()->remove_callback<RequestList>(m_requests);
141 3 : get_iomanager()->remove_callback<IntList>(m_list_connection);
142 6 : TLOG() << get_name() << " successfully stopped";
143 :
144 3 : std::ostringstream oss_summ;
145 6 : oss_summ << ": Exiting do_stop() method, received " << m_total_requests_received.load() << " request messages, "
146 9 : << "sent " << m_total_requests_sent.load() << ", received " << m_total_lists_received.load()
147 6 : << " lists, and sent " << m_total_lists_sent.load() << " reversed list messages";
148 3 : ers::info(ProgressUpdate(ERS_HERE, get_name(), oss_summ.str()));
149 :
150 3 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting do_stop() method";
151 3 : }
152 :
153 : void
154 2 : ListReverser::process_list_request(const RequestList& request)
155 : {
156 2 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering process_list_request() method";
157 2 : {
158 2 : std::lock_guard<std::mutex> lk(m_map_mutex);
159 2 : if (!m_pending_lists.count(request.list_id)) {
160 2 : m_pending_lists[request.list_id] = PendingList(request.destination, request.list_id, m_reverser_id);
161 2 : ++m_requests_received;
162 2 : ++m_total_requests_received;
163 : }
164 2 : }
165 :
166 4 : for (auto const& gen_conn : m_generator_connections) {
167 2 : TLOG_DEBUG(TLVL_REQUEST_SENDING) << "Sending request for " << request.list_id << " with destination "
168 2 : << m_list_connection << " to " << gen_conn;
169 2 : RequestList req(request.list_id, m_list_connection);
170 2 : get_iomanager()->get_sender<RequestList>(gen_conn)->send(std::move(req), m_send_timeout);
171 2 : ++m_requests_sent;
172 2 : ++m_total_requests_sent;
173 2 : }
174 2 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting process_list_request() method";
175 2 : }
176 :
177 : /**
178 : * @brief Format a std::vector<int> to a stream
179 : * @param t ostream Instance
180 : * @param ints Vector to format
181 : * @return ostream Instance
182 : */
183 : std::ostream&
184 1 : operator<<(std::ostream& t, std::vector<int> ints)
185 : {
186 1 : t << "{";
187 1 : bool first = true;
188 5 : for (auto& i : ints) {
189 4 : if (!first)
190 3 : t << ", ";
191 4 : first = false;
192 4 : t << i;
193 : }
194 1 : return t << "}";
195 : }
196 :
197 : void
198 1 : ListReverser::process_list(const IntList& list)
199 : {
200 1 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering process_list() method";
201 :
202 1 : std::lock_guard<std::mutex> lk(m_map_mutex);
203 1 : ++m_lists_received;
204 1 : ++m_total_lists_received;
205 1 : TLOG_DEBUG(TLVL_LIST_REVERSAL) << get_name() << ": Received list #" << list.list_id << " from " << list.generator_id
206 1 : << ". It has size " << list.list.size() << ". Reversing its contents";
207 :
208 1 : if (m_pending_lists.count(list.list_id) == 0) {
209 :
210 0 : std::ostringstream oss_warn;
211 0 : oss_warn << "send " << list.list_id << " to \"" << m_pending_lists[list.list_id].requestor
212 0 : << "\" (late list receive)";
213 0 : ers::warning(dunedaq::iomanager::TimeoutExpired(ERS_HERE, get_name(), oss_warn.str(), m_send_timeout.count()));
214 0 : return;
215 0 : }
216 :
217 1 : auto workingVector = list.list;
218 1 : std::reverse(workingVector.begin(), workingVector.end());
219 1 : IntList wrapped(list.list_id, m_reverser_id, workingVector);
220 :
221 1 : ReversedList::Data this_data;
222 1 : this_data.original = list;
223 1 : this_data.reversed = wrapped;
224 :
225 1 : m_pending_lists[list.list_id].list.lists.push_back(this_data);
226 :
227 1 : std::ostringstream oss_prog;
228 2 : oss_prog << "Reversed list #" << list.list_id << " from " << list.generator_id << ", new contents " << workingVector
229 1 : << " and size " << workingVector.size() << ". ";
230 1 : TLOG_DEBUG() << ProgressUpdate(ERS_HERE, get_name(), oss_prog.str());
231 :
232 1 : if (m_pending_lists[list.list_id].list.lists.size() >= m_generator_connections.size() ||
233 1 : std::chrono::duration_cast<std::chrono::milliseconds>(
234 1 : std::chrono::steady_clock::now() - m_pending_lists[list.list_id].start_time) > m_request_timeout) {
235 :
236 : bool successfullyWasSent = false;
237 : int failCount = 0;
238 2 : while (!successfullyWasSent && failCount < 100) {
239 1 : TLOG_DEBUG(TLVL_LIST_REVERSAL) << get_name() << ": Sending the reversed lists " << list.list_id;
240 1 : try {
241 2 : get_iomanager()
242 2 : ->get_sender<ReversedList>(m_pending_lists[list.list_id].requestor)
243 1 : ->send(std::move(m_pending_lists[list.list_id].list), m_send_timeout);
244 1 : successfullyWasSent = true;
245 1 : ++m_lists_sent;
246 1 : ++m_total_lists_sent;
247 1 : m_pending_lists.erase(list.list_id);
248 0 : } catch (const dunedaq::iomanager::TimeoutExpired& excpt) {
249 0 : std::ostringstream oss_warn;
250 0 : oss_warn << "send " << list.list_id << " to \"" << m_pending_lists[list.list_id].requestor << "\"";
251 0 : ers::warning(dunedaq::iomanager::TimeoutExpired(ERS_HERE, get_name(), oss_warn.str(), m_send_timeout.count()));
252 0 : ++failCount;
253 0 : }
254 : }
255 : }
256 :
257 1 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting process_list() method";
258 1 : }
259 :
260 : } // namespace dunedaq::listrev
261 :
262 3 : DEFINE_DUNE_DAQ_MODULE(dunedaq::listrev::ListReverser)
263 :
264 : // Local Variables:
265 : // c-basic-offset: 2
266 : // End:
|