Line data Source code
1 : /**
2 : * @file bookkeeper.cpp Bookkeeper class retriving informations from clients
3 : *
4 : * This is part of the DUNE DAQ , copyright 2020.
5 : * Licensing/copyright details are in the COPYING file that you should have
6 : * received with this code.
7 : */
8 :
9 : #include "snbmodules/bookkeeper.hpp"
10 :
11 : #include <set>
12 : #include <string>
13 : #include <utility>
14 : #include <vector>
15 :
16 : namespace dunedaq::snbmodules {
17 : // TODO Aug-14-2022 Leo Joly leo.vincent.andre.joly@cern.ch : Obsolete, is this needed anymore ?
18 : void
19 0 : Bookkeeper::create_new_transfer(const std::string& protocol,
20 : const std::string& src,
21 : const std::set<std::string>& dests,
22 : const std::set<std::filesystem::path>& files,
23 : const nlohmann::json& protocol_options)
24 : {
25 : // suppress warnings
26 0 : (void)protocol;
27 0 : (void)src;
28 0 : (void)dests;
29 0 : (void)files;
30 0 : (void)protocol_options;
31 :
32 : // TLOG() << "debug : creating new transfer with protocol " << protocol;
33 :
34 : // // Check protocol
35 : // std::optional<protocol_type::e_protocol_type> _protocol = protocol_type::string_to_protocols(protocol);
36 : // if (!_protocol.has_value())
37 : // {
38 : // TLOG() << "debug : invalid protocol !";
39 : // return;
40 : // }
41 :
42 : // // Check files and found metadata
43 : // std::set<TransferMetadata *> files_meta;
44 : // auto list = get_m_transfers().at(src);
45 : // int nb_files = 0;
46 :
47 : // for (std::filesystem::path file : files)
48 : // {
49 : // bool found = false;
50 : // for (auto filemeta : list)
51 : // {
52 : // if (filemeta.get_file_name() == file)
53 : // {
54 : // files_meta.emplace(&filemeta);
55 : // found = true;
56 : // nb_files++;
57 : // break;
58 : // }
59 : // }
60 : // if (!found)
61 : // {
62 : // TLOG() << "debug : file " << file << " not found ! (ignoring)";
63 : // }
64 : // }
65 :
66 : // if (nb_files == 0)
67 : // {
68 : // TLOG() << "debug : no file found ! exiting transfer creation.";
69 : // return;
70 : // }
71 :
72 : // // Create transfer
73 : // GroupMetadata transfer("transfer" + std::to_string(transfers.size()), _protocol.value(), files_meta);
74 : // transfers_link[transfer.get_group_id()] = src;
75 : // transfers.emplace(transfer);
76 :
77 : // // Notify clients
78 : // for (auto client : src)
79 : // {
80 : // std::string session_name = client + "_ses" + transfer.get_group_id();
81 :
82 : // TLOG() << "debug : notifying src client " << client;
83 : // send_notification(notification_type::e_notification_type::NEW_TRANSFER, get_bookkeeper_id(), session_name,
84 : // client, 1000, transfer.export_to_string());
85 : // }
86 0 : }
87 :
88 : // Only for stand alone application
89 : void
90 0 : Bookkeeper::input_action(char input)
91 : {
92 0 : switch (input) {
93 0 : case 'q': {
94 0 : TLOG() << "Exiting...";
95 0 : exit(0);
96 0 : break;
97 : }
98 0 : case 'd': {
99 0 : display_information();
100 0 : break;
101 : }
102 0 : case 'n': {
103 0 : TLOG() << "Creating new transfer ...";
104 0 : TLOG() << "Choose protocol in the list";
105 0 : for (int enum_i = protocol_type::e_protocol_type::BITTORRENT; enum_i != protocol_type::e_protocol_type::dummy;
106 : enum_i++) {
107 0 : TLOG() << enum_i << " - "
108 0 : << protocol_type::protocols_to_string(static_cast<protocol_type::e_protocol_type>(enum_i));
109 : }
110 0 : int protocol = -1;
111 0 : std::cin >> protocol;
112 : // Check input
113 0 : if (protocol < 0 || protocol > protocol_type::e_protocol_type::dummy) {
114 0 : TLOG() << "Invalid protocol";
115 0 : break;
116 : }
117 :
118 0 : TLOG() << "Choose clients (q when finished)";
119 0 : std::set<std::string> choosen_clients;
120 0 : while (true) {
121 0 : std::string client;
122 0 : std::cin >> client;
123 0 : if (client == "q") {
124 : break;
125 : }
126 : // Check input
127 0 : if (std::find(get_clients_conn().begin(), get_clients_conn().end(), client) == get_clients_conn().end()) {
128 0 : TLOG() << "Invalid client";
129 0 : break;
130 : }
131 :
132 0 : choosen_clients.emplace(client);
133 0 : }
134 0 : if (choosen_clients.empty()) {
135 0 : TLOG() << "No client selected";
136 0 : break;
137 : }
138 0 : if (choosen_clients.size() < 2) {
139 0 : TLOG() << "At least 2 clients are required";
140 0 : break;
141 : }
142 :
143 0 : TLOG() << "Choose file to transmit (q when finished)";
144 0 : std::set<std::shared_ptr<TransferMetadata>> choosen_files;
145 0 : while (true) {
146 0 : uint64_t initial_size = choosen_files.size();
147 0 : std::string file;
148 0 : std::cin >> file;
149 0 : if (file == "q") {
150 : break;
151 : }
152 :
153 0 : for (const std::string& client : choosen_clients) {
154 : // Check input
155 0 : auto& list = get_transfers().at(client);
156 0 : bool found = false;
157 0 : for (std::shared_ptr<TransferMetadata> filemeta : list) {
158 0 : if (filemeta->get_file_name() == file) {
159 0 : choosen_files.emplace(filemeta);
160 0 : found = true;
161 0 : break;
162 : }
163 0 : }
164 0 : if (found == true) {
165 : break;
166 : }
167 : }
168 0 : if (initial_size == choosen_files.size()) {
169 0 : TLOG() << "Invalid file";
170 0 : break;
171 : }
172 0 : }
173 0 : if (choosen_files.empty()) {
174 0 : TLOG() << "No file selected";
175 0 : break;
176 : }
177 :
178 : // create_new_transfer(protocol_type::protocols_to_string(protocol_type::string_to_protocols(protocol).value()),
179 : // choosen_clients, choosen_files);
180 : break;
181 0 : }
182 :
183 0 : case 's': {
184 0 : TLOG() << "Choose transfers to start ... ";
185 0 : std::set<std::string> choosen_transfers;
186 0 : while (true) {
187 0 : std::string input;
188 0 : std::cin >> input;
189 0 : if (input == "q") {
190 : break;
191 : }
192 : // Check input
193 0 : if (m_clients_per_grp_transfer.find(input) == m_clients_per_grp_transfer.end()) {
194 0 : TLOG() << "Invalid transfer";
195 0 : break;
196 : }
197 0 : choosen_transfers.emplace(input);
198 0 : }
199 0 : if (choosen_transfers.size() == 0) {
200 0 : TLOG() << "No transfer selected";
201 0 : break;
202 : }
203 :
204 0 : for (const auto& transfer : choosen_transfers) {
205 0 : #pragma GCC diagnostic push
206 0 : #pragma GCC diagnostic ignored "-Wdeprecated-declarations"
207 0 : start_transfers(transfer);
208 0 : #pragma GCC diagnostic pop
209 : }
210 0 : break;
211 0 : }
212 :
213 0 : default:
214 0 : TLOG() << "Unknown command";
215 0 : break;
216 : }
217 0 : }
218 :
219 : void
220 0 : Bookkeeper::start_transfers(const std::string& transfer_id)
221 : {
222 0 : TLOG() << "Starting transfer " << transfer_id;
223 :
224 0 : if (m_clients_per_grp_transfer.find(transfer_id) != m_clients_per_grp_transfer.end()) {
225 0 : for (const std::string& client : m_clients_per_grp_transfer[transfer_id]) {
226 0 : std::string session_name = client;
227 0 : session_name += "_ses";
228 0 : session_name += transfer_id;
229 0 : send_notification(
230 0 : notification_type::e_notification_type::START_TRANSFER, get_bookkeeper_id(), session_name, client);
231 0 : }
232 : } else {
233 0 : ers::warning(InvalidGroupTransferIDError(ERS_HERE, transfer_id, get_bookkeeper_id()));
234 : }
235 0 : }
236 :
237 : void
238 0 : Bookkeeper::do_work(std::atomic<bool>& running_flag)
239 : {
240 0 : TLOG() << "JAB " << __LINE__;
241 : // Just one request on startup, after that the clients will have to send by themself
242 0 : for (const std::string& client : get_clients_conn()) {
243 0 : TLOG() << "JAB " << __LINE__ << " " << client;
244 0 : request_connection_and_available_files(client);
245 : }
246 0 : TLOG() << "JAB " << __LINE__;
247 :
248 0 : auto time_point = std::chrono::high_resolution_clock::now();
249 :
250 0 : while (running_flag.load()) {
251 0 : TLOG() << "JAB " << __LINE__;
252 0 : lookups_connections();
253 0 : TLOG() << "JAB " << __LINE__ << " " << get_bookkeepers_conn().size();
254 0 : std::optional<NotificationData> msg = listen_for_notification(get_bookkeepers_conn().front());
255 0 : TLOG() << "JAB " << __LINE__;
256 0 : if (msg.has_value()) {
257 0 : TLOG() << "JAB " << __LINE__;
258 0 : action_on_receive_notification(msg.value());
259 : }
260 0 : TLOG() << "JAB " << __LINE__;
261 :
262 : // check alives clients and available files
263 0 : std::this_thread::sleep_for(std::chrono::milliseconds(100));
264 :
265 : // Auto update metadata every 2 seconds
266 0 : if (std::chrono::duration_cast<std::chrono::seconds>(std::chrono::high_resolution_clock::now() - time_point)
267 0 : .count() >= m_refresh_rate) {
268 0 : time_point = std::chrono::high_resolution_clock::now();
269 0 : request_update_metadata();
270 0 : display_information();
271 : }
272 0 : }
273 0 : }
274 :
275 : bool
276 0 : Bookkeeper::start()
277 : {
278 0 : auto time_point = std::chrono::high_resolution_clock::now();
279 :
280 : // Just one request on startup, after that the clients will have to send by themself
281 0 : for (const std::string& client : get_clients_conn()) {
282 0 : request_connection_and_available_files(client);
283 : }
284 :
285 0 : while (true) {
286 0 : std::optional<NotificationData> msg = listen_for_notification(get_bookkeepers_conn().front());
287 0 : if (msg.has_value()) {
288 0 : action_on_receive_notification(msg.value());
289 : }
290 :
291 0 : std::string input;
292 0 : getline(std::cin, input);
293 0 : if (input.empty() == false) {
294 0 : input_action(input[0]);
295 : }
296 :
297 : // check alives clients and available files
298 0 : for (const std::string& client : get_clients_conn()) {
299 0 : if (m_transfers.find(client) != m_transfers.end()) {
300 : // already known client
301 0 : continue;
302 : }
303 :
304 0 : request_connection_and_available_files(client);
305 : }
306 :
307 0 : std::this_thread::sleep_for(std::chrono::milliseconds(100));
308 :
309 : // Auto update metadata every 2 seconds
310 0 : if (std::chrono::duration_cast<std::chrono::seconds>(std::chrono::high_resolution_clock::now() - time_point)
311 0 : .count() >= 5) {
312 0 : time_point = std::chrono::high_resolution_clock::now();
313 0 : request_update_metadata();
314 0 : display_information();
315 : }
316 0 : }
317 : }
318 :
319 : void
320 0 : Bookkeeper::request_connection_and_available_files(const std::string& client)
321 : {
322 : // send connection request to client
323 0 : send_notification(notification_type::e_notification_type::CONNECTION_REQUEST,
324 0 : get_bookkeeper_id(),
325 : client,
326 : client,
327 0 : get_bookkeeper_id(),
328 : 1);
329 :
330 : // Listen to receive connection response and available files
331 : // auto msg = listen_for_notification(get_bookkeepers_conn().front(), client);
332 :
333 : // while (msg.has_value() && msg.value().m_notification !=
334 : // notification_type::notification_to_string(notification_type::e_notification_type::CONNECTION_REQUEST))
335 : // {
336 : // action_on_receive_notification(msg.value());
337 : // msg = listen_for_notification(get_bookkeepers_conn().front(), client);
338 : // }
339 0 : }
340 :
341 : void
342 0 : Bookkeeper::request_update_metadata(bool force)
343 : {
344 0 : for (const auto& [id, g] : get_grp_transfers()) {
345 : // Only request for dynamic status
346 0 : if (g.get_group_status() == status_type::e_status::DOWNLOADING ||
347 0 : g.get_group_status() == status_type::e_status::CHECKING ||
348 0 : g.get_group_status() == status_type::e_status::UPLOADING ||
349 0 : g.get_group_status() == status_type::e_status::HASHING || force) {
350 :
351 0 : for (const std::string& session : m_clients_per_grp_transfer[g.get_group_id()]) {
352 0 : send_notification(notification_type::e_notification_type::UPDATE_REQUEST,
353 0 : get_bookkeeper_id(),
354 : session,
355 0 : get_client_name_from_session_name(session));
356 : }
357 : }
358 : }
359 0 : }
360 :
361 : bool
362 0 : Bookkeeper::action_on_receive_notification(NotificationData notif)
363 : {
364 0 : TLOG() << "debug : Action on request " << notif.m_notification << " for " << notif.m_target_id;
365 :
366 0 : if (notif.m_target_id.find(get_bookkeeper_id()) == std::string::npos && notif.m_target_id != "all") {
367 0 : ers::warning(
368 0 : NotificationWrongDestinationError(ERS_HERE, get_bookkeeper_id(), notif.m_source_id, notif.m_target_id));
369 0 : return false;
370 : }
371 :
372 : // Use enum cast for converting string to enum, still working with older clients and user readable
373 0 : auto action = notification_type::string_to_notification(notif.m_notification);
374 :
375 0 : if (action.has_value() == false) {
376 0 : ers::warning(
377 0 : InvalidNotificationReceivedError(ERS_HERE, get_bookkeeper_id(), notif.m_source_id, notif.m_notification));
378 : }
379 :
380 0 : switch (action.value()) {
381 0 : case notification_type::e_notification_type::TRANSFER_METADATA: {
382 0 : if (notif.m_data == "end") {
383 : // Create entry in the map in case no files
384 0 : m_transfers[notif.m_source_id];
385 : break;
386 : }
387 :
388 : // Store it
389 0 : add_update_transfer(notif.m_source_id, notif.m_data);
390 : break;
391 : }
392 :
393 0 : case notification_type::e_notification_type::TRANSFER_ERROR:
394 0 : case notification_type::e_notification_type::GROUP_METADATA: {
395 : // Loading the data and cnovert to a proper transfer metadata object
396 0 : GroupMetadata group_meta(notif.m_data, false);
397 :
398 : // Store it
399 0 : m_clients_per_grp_transfer[group_meta.get_group_id()].insert(notif.m_source_id);
400 0 : add_update_grp_transfer(std::move(group_meta));
401 0 : break;
402 0 : }
403 :
404 0 : default:
405 0 : ers::warning(NotHandledNotificationError(ERS_HERE, get_bookkeeper_id(), notif.m_source_id, notif.m_notification));
406 : }
407 : return true;
408 : }
409 :
410 : void
411 0 : Bookkeeper::display_information()
412 : {
413 0 : std::ostream* output = nullptr;
414 0 : std::ostream* output_line_log = nullptr;
415 0 : std::string sep = ";";
416 :
417 0 : if (m_file_log_path != "") {
418 : // open file
419 0 : output = new std::ofstream();
420 0 : output_line_log = new std::ofstream();
421 0 : dynamic_cast<std::ofstream*>(output)->open(m_file_log_path + get_bookkeeper_id() + ".log", std::fstream::out);
422 0 : dynamic_cast<std::ofstream*>(output_line_log)
423 0 : ->open(m_file_log_path + get_bookkeeper_id() + "_line.csv", std::fstream::app | std::fstream::out);
424 : // clear file
425 0 : dynamic_cast<std::ofstream*>(output)->clear();
426 0 : TLOG() << "debug : output log wroten " << m_file_log_path << get_bookkeeper_id() << ".log\t" << m_file_log_path
427 0 : << get_bookkeeper_id() << "_line.csv";
428 :
429 : // if csv file empty, write header
430 0 : if (dynamic_cast<std::ofstream*>(output_line_log)->tellp() == 0) {
431 0 : *output_line_log << "time" << sep << "file_name" << sep << "file_full_path" << sep << "group_id" << sep
432 : << "src_ip" << sep << "size" << sep
433 :
434 : << "dest_ip" << sep << "start_time" << sep << "duration" << sep << "progress" << sep << "speed"
435 : << sep << "state" << sep
436 :
437 0 : << "end_time" << sep << "error" << sep << std::endl;
438 : }
439 : } else {
440 : output = &std::cout;
441 : }
442 :
443 0 : *output << "***** Bookkeeper " << get_bookkeeper_id() << " " + get_ip().get_ip_port() << " informations display *****"
444 0 : << std::endl;
445 : // *output << "q: quit, d : display info, n : new transfer, s : start transfer" << std::endl;
446 0 : *output << "Connected clients :" << std::endl;
447 :
448 0 : for (const auto& client : get_transfers()) {
449 0 : bool is_session = false;
450 : // If it's a session
451 0 : if (client.first.find("ses") != std::string::npos) {
452 0 : *output << "\t* Session " << client.first << " is active" << std::endl;
453 : is_session = true;
454 : } else {
455 0 : *output << "> Client " << client.first << " is connected" << std::endl;
456 : }
457 :
458 : // print for each file the status
459 0 : for (const auto& file : client.second) {
460 0 : if (m_file_log_path != "") {
461 0 : *output_line_log << std::chrono::duration_cast<std::chrono::seconds>(
462 0 : std::chrono::system_clock::now().time_since_epoch())
463 0 : .count()
464 0 : << sep << file->get_file_name() << sep << file->get_file_path() << sep << file->get_group_id()
465 0 : << sep << file->get_src().get_ip_port() << sep << file->get_size() << sep
466 :
467 0 : << file->get_dest().get_ip_port() << sep << file->get_start_time_str() << sep
468 0 : << file->get_total_duration_ms() << sep << file->get_progress() << sep
469 0 : << file->get_transmission_speed() << sep << status_type::status_to_string(file->get_status())
470 : << sep
471 :
472 0 : << file->get_end_time_str() << sep << file->get_error_code() << sep
473 :
474 0 : << std::endl;
475 : }
476 :
477 0 : if (is_session) {
478 0 : *output << "\t\t - ";
479 : } else {
480 0 : *output << "\t - ";
481 : }
482 :
483 0 : if (is_session)
484 0 : *output << file->get_file_name() << "\t" << file->get_size() << " bytes\tfrom " << file->get_src().get_ip_port()
485 : << "\t"
486 : // << file->get_dest().get_ip_port() << "\t"
487 0 : << status_type::status_to_string(file->get_status()) << "\t" << file->get_progress() << "%\t"
488 0 : << (file->get_transmission_speed() == 0 ? "-" : std::to_string(file->get_transmission_speed()))
489 0 : << "Bi/s\t" << file->get_start_time_str() << "\t" << file->get_total_duration_ms() << "ms\t"
490 0 : << file->get_end_time_str() << "\t" << file->get_error_code() << "\t" << std::endl;
491 : else
492 0 : *output << "Available file " << file->get_file_path() << "\t" << file->get_size() << " bytes\tfrom "
493 0 : << file->get_src().get_ip_port() << "\t" << std::endl;
494 : }
495 : }
496 :
497 0 : *output << std::endl << "Active Transfers :" << std::endl;
498 :
499 0 : *output << "Group ID\t" << "Protocol\t" << "Src\t" << "IP\t" << "Status\t" << std::endl;
500 :
501 0 : for (const auto& [id, g] : get_grp_transfers()) {
502 0 : *output << g.get_group_id() << "\t" << protocol_type::protocols_to_string(g.get_protocol()) << "\t"
503 0 : << g.get_source_id() << "\t" << g.get_source_ip().get_ip_port() << "\t"
504 0 : << status_type::status_to_string(g.get_group_status()) << "\t" << std::endl;
505 :
506 0 : for (const std::shared_ptr<TransferMetadata>& fmeta : g.get_transfers_meta()) {
507 0 : *output << "\t- " << fmeta->get_file_name() << "\t" << fmeta->get_src().get_ip_port() << " to "
508 0 : << fmeta->get_dest().get_ip_port() << "\t" << status_type::status_to_string(fmeta->get_status()) << "\t"
509 0 : << std::endl;
510 : }
511 :
512 0 : for (const std::string& f : g.get_expected_files()) {
513 0 : *output << "\t- " << f << "\t" << "Expected" << std::endl;
514 : }
515 : }
516 :
517 0 : if (m_file_log_path != "") {
518 0 : dynamic_cast<std::ofstream*>(output)->close();
519 0 : dynamic_cast<std::ofstream*>(output_line_log)->close();
520 : } else {
521 0 : output->flush();
522 : }
523 0 : }
524 :
525 : void
526 0 : Bookkeeper::add_update_transfer(const std::string& client_id, const std::string& data)
527 : {
528 : // Loading the data and convert to a proper transfer metadata object
529 0 : std::shared_ptr<TransferMetadata> file = std::make_shared<TransferMetadata>(data, false);
530 0 : std::string group_id_tmp = file->get_group_id();
531 :
532 0 : std::vector<std::shared_ptr<TransferMetadata>>& tr_vector = get_transfers()[client_id];
533 0 : for (std::shared_ptr<TransferMetadata>& tr : tr_vector) {
534 0 : if (*tr == *file) {
535 : // Already inserted, simply update the one already present
536 0 : tr->from_string(data);
537 :
538 : // Add available file information
539 : // file->set_group_id("");
540 : // m_transfers[client_id].push_back(file);
541 :
542 0 : return;
543 : }
544 : }
545 :
546 : // Check if transfer already exist in a group transfer
547 0 : if (group_id_tmp != "" && m_grp_transfers.find(group_id_tmp) != m_grp_transfers.end()) {
548 0 : m_grp_transfers.at(group_id_tmp).add_file(file);
549 : }
550 0 : m_transfers[client_id].push_back(file);
551 0 : }
552 :
553 : void
554 0 : Bookkeeper::add_update_grp_transfer(GroupMetadata grp_transfers)
555 : {
556 0 : std::string group_id_tmp = grp_transfers.get_group_id();
557 0 : if (m_grp_transfers.find(group_id_tmp) != m_grp_transfers.end()) {
558 : // Already inserted, copy old values
559 0 : grp_transfers.set_transfers_meta(m_grp_transfers.at(grp_transfers.get_group_id()).get_transfers_meta());
560 0 : grp_transfers.set_expected_files(m_grp_transfers.at(grp_transfers.get_group_id()).get_expected_files());
561 0 : m_grp_transfers.erase(group_id_tmp);
562 : }
563 0 : m_grp_transfers.insert({ group_id_tmp, std::move(grp_transfers) });
564 0 : }
565 : } // namespace dunedaq::snbmodules
|