Line data Source code
1 : /**
2 : * @file bookkeeper.cpp Bookkeeper class retriving informations from clients
3 : *
4 : * This is part of the DUNE DAQ , copyright 2020.
5 : * Licensing/copyright details are in the COPYING file that you should have
6 : * received with this code.
7 : */
8 :
9 : #include "snbmodules/bookkeeper.hpp"
10 :
11 : #include <set>
12 : #include <string>
13 : #include <utility>
14 : #include <vector>
15 :
16 : namespace dunedaq::snbmodules {
17 : // TODO Aug-14-2022 Leo Joly leo.vincent.andre.joly@cern.ch : Obsolete, is this needed anymore ?
18 : void
19 0 : Bookkeeper::create_new_transfer(const std::string& protocol,
20 : const std::string& src,
21 : const std::set<std::string>& dests,
22 : const std::set<std::filesystem::path>& files,
23 : const nlohmann::json& protocol_options)
24 : {
25 : // suppress warnings
26 0 : (void)protocol;
27 0 : (void)src;
28 0 : (void)dests;
29 0 : (void)files;
30 0 : (void)protocol_options;
31 :
32 : // TLOG() << "debug : creating new transfer with protocol " << protocol;
33 :
34 : // // Check protocol
35 : // std::optional<protocol_type::e_protocol_type> _protocol = protocol_type::string_to_protocols(protocol);
36 : // if (!_protocol.has_value())
37 : // {
38 : // TLOG() << "debug : invalid protocol !";
39 : // return;
40 : // }
41 :
42 : // // Check files and found metadata
43 : // std::set<TransferMetadata *> files_meta;
44 : // auto list = get_m_transfers().at(src);
45 : // int nb_files = 0;
46 :
47 : // for (std::filesystem::path file : files)
48 : // {
49 : // bool found = false;
50 : // for (auto filemeta : list)
51 : // {
52 : // if (filemeta.get_file_name() == file)
53 : // {
54 : // files_meta.emplace(&filemeta);
55 : // found = true;
56 : // nb_files++;
57 : // break;
58 : // }
59 : // }
60 : // if (!found)
61 : // {
62 : // TLOG() << "debug : file " << file << " not found ! (ignoring)";
63 : // }
64 : // }
65 :
66 : // if (nb_files == 0)
67 : // {
68 : // TLOG() << "debug : no file found ! exiting transfer creation.";
69 : // return;
70 : // }
71 :
72 : // // Create transfer
73 : // GroupMetadata transfer("transfer" + std::to_string(transfers.size()), _protocol.value(), files_meta);
74 : // transfers_link[transfer.get_group_id()] = src;
75 : // transfers.emplace(transfer);
76 :
77 : // // Notify clients
78 : // for (auto client : src)
79 : // {
80 : // std::string session_name = client + "_ses" + transfer.get_group_id();
81 :
82 : // TLOG() << "debug : notifying src client " << client;
83 : // send_notification(notification_type::e_notification_type::NEW_TRANSFER, get_bookkeeper_id(), session_name,
84 : // client, 1000, transfer.export_to_string());
85 : // }
86 0 : }
87 :
88 : // Only for stand alone application
89 : void
90 0 : Bookkeeper::input_action(char input)
91 : {
92 0 : switch (input) {
93 0 : case 'q': {
94 0 : TLOG() << "Exiting...";
95 0 : exit(0);
96 0 : break;
97 : }
98 0 : case 'd': {
99 0 : display_information();
100 0 : break;
101 : }
102 0 : case 'n': {
103 0 : TLOG() << "Creating new transfer ...";
104 0 : TLOG() << "Choose protocol in the list";
105 0 : for (int enum_i = protocol_type::e_protocol_type::BITTORRENT; enum_i != protocol_type::e_protocol_type::dummy;
106 : enum_i++) {
107 0 : TLOG() << enum_i << " - "
108 0 : << protocol_type::protocols_to_string(static_cast<protocol_type::e_protocol_type>(enum_i));
109 : }
110 0 : int protocol = -1;
111 0 : std::cin >> protocol;
112 : // Check input
113 0 : if (protocol < 0 || protocol > protocol_type::e_protocol_type::dummy) {
114 0 : TLOG() << "Invalid protocol";
115 0 : break;
116 : }
117 :
118 0 : TLOG() << "Choose clients (q when finished)";
119 0 : std::set<std::string> choosen_clients;
120 0 : while (true) {
121 0 : std::string client;
122 0 : std::cin >> client;
123 0 : if (client == "q") {
124 : break;
125 : }
126 : // Check input
127 0 : if (std::find(get_clients_conn().begin(), get_clients_conn().end(), client) == get_clients_conn().end()) {
128 0 : TLOG() << "Invalid client";
129 0 : break;
130 : }
131 :
132 0 : choosen_clients.emplace(client);
133 0 : }
134 0 : if (choosen_clients.empty()) {
135 0 : TLOG() << "No client selected";
136 0 : break;
137 : }
138 0 : if (choosen_clients.size() < 2) {
139 0 : TLOG() << "At least 2 clients are required";
140 0 : break;
141 : }
142 :
143 0 : TLOG() << "Choose file to transmit (q when finished)";
144 0 : std::set<std::shared_ptr<TransferMetadata>> choosen_files;
145 0 : while (true) {
146 0 : uint64_t initial_size = choosen_files.size();
147 0 : std::string file;
148 0 : std::cin >> file;
149 0 : if (file == "q") {
150 : break;
151 : }
152 :
153 0 : for (const std::string& client : choosen_clients) {
154 : // Check input
155 0 : auto& list = get_transfers().at(client);
156 0 : bool found = false;
157 0 : for (std::shared_ptr<TransferMetadata> filemeta : list) {
158 0 : if (filemeta->get_file_name() == file) {
159 0 : choosen_files.emplace(filemeta);
160 0 : found = true;
161 0 : break;
162 : }
163 0 : }
164 0 : if (found == true) {
165 : break;
166 : }
167 : }
168 0 : if (initial_size == choosen_files.size()) {
169 0 : TLOG() << "Invalid file";
170 0 : break;
171 : }
172 0 : }
173 0 : if (choosen_files.empty()) {
174 0 : TLOG() << "No file selected";
175 0 : break;
176 : }
177 :
178 : // create_new_transfer(protocol_type::protocols_to_string(protocol_type::string_to_protocols(protocol).value()),
179 : // choosen_clients, choosen_files);
180 : break;
181 0 : }
182 :
183 0 : case 's': {
184 0 : TLOG() << "Choose transfers to start ... ";
185 0 : std::set<std::string> choosen_transfers;
186 0 : while (true) {
187 0 : std::string input;
188 0 : std::cin >> input;
189 0 : if (input == "q") {
190 : break;
191 : }
192 : // Check input
193 0 : if (m_clients_per_grp_transfer.find(input) == m_clients_per_grp_transfer.end()) {
194 0 : TLOG() << "Invalid transfer";
195 0 : break;
196 : }
197 0 : choosen_transfers.emplace(input);
198 0 : }
199 0 : if (choosen_transfers.size() == 0) {
200 0 : TLOG() << "No transfer selected";
201 0 : break;
202 : }
203 :
204 0 : for (const auto& transfer : choosen_transfers) {
205 0 : start_transfers(transfer);
206 : }
207 0 : break;
208 0 : }
209 :
210 0 : default:
211 0 : TLOG() << "Unknown command";
212 0 : break;
213 : }
214 0 : }
215 :
216 : void
217 0 : Bookkeeper::start_transfers(const std::string& transfer_id)
218 : {
219 0 : TLOG() << "Starting transfer " << transfer_id;
220 :
221 0 : if (m_clients_per_grp_transfer.find(transfer_id) != m_clients_per_grp_transfer.end()) {
222 0 : for (const std::string& client : m_clients_per_grp_transfer[transfer_id]) {
223 0 : std::string session_name = client;
224 0 : session_name += "_ses";
225 0 : session_name += transfer_id;
226 0 : send_notification(
227 0 : notification_type::e_notification_type::START_TRANSFER, get_bookkeeper_id(), session_name, client);
228 0 : }
229 : } else {
230 0 : ers::warning(InvalidGroupTransferIDError(ERS_HERE, transfer_id, get_bookkeeper_id()));
231 : }
232 0 : }
233 :
234 : void
235 0 : Bookkeeper::do_work(std::atomic<bool>& running_flag)
236 : {
237 0 : TLOG() << "JAB " << __LINE__;
238 : // Just one request on startup, after that the clients will have to send by themself
239 0 : for (const std::string& client : get_clients_conn()) {
240 0 : TLOG() << "JAB " << __LINE__ << " " << client;
241 0 : request_connection_and_available_files(client);
242 : }
243 0 : TLOG() << "JAB " << __LINE__;
244 :
245 0 : auto time_point = std::chrono::high_resolution_clock::now();
246 :
247 0 : while (running_flag.load()) {
248 0 : TLOG() << "JAB " << __LINE__;
249 0 : lookups_connections();
250 0 : TLOG() << "JAB " << __LINE__ << " " << get_bookkeepers_conn().size();
251 0 : std::optional<NotificationData> msg = listen_for_notification(get_bookkeepers_conn().front());
252 0 : TLOG() << "JAB " << __LINE__;
253 0 : if (msg.has_value()) {
254 0 : TLOG() << "JAB " << __LINE__;
255 0 : action_on_receive_notification(msg.value());
256 : }
257 0 : TLOG() << "JAB " << __LINE__;
258 :
259 : // check alives clients and available files
260 0 : std::this_thread::sleep_for(std::chrono::milliseconds(100));
261 :
262 : // Auto update metadata every 2 seconds
263 0 : if (std::chrono::duration_cast<std::chrono::seconds>(std::chrono::high_resolution_clock::now() - time_point)
264 0 : .count() >= m_refresh_rate) {
265 0 : time_point = std::chrono::high_resolution_clock::now();
266 0 : request_update_metadata();
267 0 : display_information();
268 : }
269 0 : }
270 0 : }
271 :
272 : bool
273 0 : Bookkeeper::start()
274 : {
275 0 : auto time_point = std::chrono::high_resolution_clock::now();
276 :
277 : // Just one request on startup, after that the clients will have to send by themself
278 0 : for (const std::string& client : get_clients_conn()) {
279 0 : request_connection_and_available_files(client);
280 : }
281 :
282 0 : while (true) {
283 0 : std::optional<NotificationData> msg = listen_for_notification(get_bookkeepers_conn().front());
284 0 : if (msg.has_value()) {
285 0 : action_on_receive_notification(msg.value());
286 : }
287 :
288 0 : std::string input;
289 0 : getline(std::cin, input);
290 0 : if (input.empty() == false) {
291 0 : input_action(input[0]);
292 : }
293 :
294 : // check alives clients and available files
295 0 : for (const std::string& client : get_clients_conn()) {
296 0 : if (m_transfers.find(client) != m_transfers.end()) {
297 : // already known client
298 0 : continue;
299 : }
300 :
301 0 : request_connection_and_available_files(client);
302 : }
303 :
304 0 : std::this_thread::sleep_for(std::chrono::milliseconds(100));
305 :
306 : // Auto update metadata every 2 seconds
307 0 : if (std::chrono::duration_cast<std::chrono::seconds>(std::chrono::high_resolution_clock::now() - time_point)
308 0 : .count() >= 5) {
309 0 : time_point = std::chrono::high_resolution_clock::now();
310 0 : request_update_metadata();
311 0 : display_information();
312 : }
313 0 : }
314 : }
315 :
316 : void
317 0 : Bookkeeper::request_connection_and_available_files(const std::string& client)
318 : {
319 : // send connection request to client
320 0 : send_notification(notification_type::e_notification_type::CONNECTION_REQUEST,
321 0 : get_bookkeeper_id(),
322 : client,
323 : client,
324 0 : get_bookkeeper_id(),
325 : 1);
326 :
327 : // Listen to receive connection response and available files
328 : // auto msg = listen_for_notification(get_bookkeepers_conn().front(), client);
329 :
330 : // while (msg.has_value() && msg.value().m_notification !=
331 : // notification_type::notification_to_string(notification_type::e_notification_type::CONNECTION_REQUEST))
332 : // {
333 : // action_on_receive_notification(msg.value());
334 : // msg = listen_for_notification(get_bookkeepers_conn().front(), client);
335 : // }
336 0 : }
337 :
338 : void
339 0 : Bookkeeper::request_update_metadata(bool force)
340 : {
341 0 : for (const auto& [id, g] : get_grp_transfers()) {
342 : // Only request for dynamic status
343 0 : if (g.get_group_status() == status_type::e_status::DOWNLOADING ||
344 0 : g.get_group_status() == status_type::e_status::CHECKING ||
345 0 : g.get_group_status() == status_type::e_status::UPLOADING ||
346 0 : g.get_group_status() == status_type::e_status::HASHING || force) {
347 :
348 0 : for (const std::string& session : m_clients_per_grp_transfer[g.get_group_id()]) {
349 0 : send_notification(notification_type::e_notification_type::UPDATE_REQUEST,
350 0 : get_bookkeeper_id(),
351 : session,
352 0 : get_client_name_from_session_name(session));
353 : }
354 : }
355 : }
356 0 : }
357 :
358 : bool
359 0 : Bookkeeper::action_on_receive_notification(NotificationData notif)
360 : {
361 0 : TLOG() << "debug : Action on request " << notif.m_notification << " for " << notif.m_target_id;
362 :
363 0 : if (notif.m_target_id.find(get_bookkeeper_id()) == std::string::npos && notif.m_target_id != "all") {
364 0 : ers::warning(
365 0 : NotificationWrongDestinationError(ERS_HERE, get_bookkeeper_id(), notif.m_source_id, notif.m_target_id));
366 0 : return false;
367 : }
368 :
369 : // Use enum cast for converting string to enum, still working with older clients and user readable
370 0 : auto action = notification_type::string_to_notification(notif.m_notification);
371 :
372 0 : if (action.has_value() == false) {
373 0 : ers::warning(
374 0 : InvalidNotificationReceivedError(ERS_HERE, get_bookkeeper_id(), notif.m_source_id, notif.m_notification));
375 : }
376 :
377 0 : switch (action.value()) {
378 0 : case notification_type::e_notification_type::TRANSFER_METADATA: {
379 0 : if (notif.m_data == "end") {
380 : // Create entry in the map in case no files
381 0 : m_transfers[notif.m_source_id];
382 : break;
383 : }
384 :
385 : // Store it
386 0 : add_update_transfer(notif.m_source_id, notif.m_data);
387 : break;
388 : }
389 :
390 0 : case notification_type::e_notification_type::TRANSFER_ERROR:
391 0 : case notification_type::e_notification_type::GROUP_METADATA: {
392 : // Loading the data and cnovert to a proper transfer metadata object
393 0 : GroupMetadata group_meta(notif.m_data, false);
394 :
395 : // Store it
396 0 : m_clients_per_grp_transfer[group_meta.get_group_id()].insert(notif.m_source_id);
397 0 : add_update_grp_transfer(std::move(group_meta));
398 0 : break;
399 0 : }
400 :
401 0 : default:
402 0 : ers::warning(NotHandledNotificationError(ERS_HERE, get_bookkeeper_id(), notif.m_source_id, notif.m_notification));
403 : }
404 : return true;
405 : }
406 :
407 : void
408 0 : Bookkeeper::display_information()
409 : {
410 0 : std::ostream* output = nullptr;
411 0 : std::ostream* output_line_log = nullptr;
412 0 : std::string sep = ";";
413 :
414 0 : if (m_file_log_path != "") {
415 : // open file
416 0 : output = new std::ofstream();
417 0 : output_line_log = new std::ofstream();
418 0 : dynamic_cast<std::ofstream*>(output)->open(m_file_log_path + get_bookkeeper_id() + ".log", std::fstream::out);
419 0 : dynamic_cast<std::ofstream*>(output_line_log)
420 0 : ->open(m_file_log_path + get_bookkeeper_id() + "_line.csv", std::fstream::app | std::fstream::out);
421 : // clear file
422 0 : dynamic_cast<std::ofstream*>(output)->clear();
423 0 : TLOG() << "debug : output log wroten " << m_file_log_path << get_bookkeeper_id() << ".log\t" << m_file_log_path
424 0 : << get_bookkeeper_id() << "_line.csv";
425 :
426 : // if csv file empty, write header
427 0 : if (dynamic_cast<std::ofstream*>(output_line_log)->tellp() == 0) {
428 0 : *output_line_log << "time" << sep << "file_name" << sep << "file_full_path" << sep << "group_id" << sep
429 : << "src_ip" << sep << "size" << sep
430 :
431 : << "dest_ip" << sep << "start_time" << sep << "duration" << sep << "progress" << sep << "speed"
432 : << sep << "state" << sep
433 :
434 0 : << "end_time" << sep << "error" << sep << std::endl;
435 : }
436 : } else {
437 : output = &std::cout;
438 : }
439 :
440 0 : *output << "***** Bookkeeper " << get_bookkeeper_id() << " " + get_ip().get_ip_port() << " informations display *****"
441 0 : << std::endl;
442 : // *output << "q: quit, d : display info, n : new transfer, s : start transfer" << std::endl;
443 0 : *output << "Connected clients :" << std::endl;
444 :
445 0 : for (const auto& client : get_transfers()) {
446 0 : bool is_session = false;
447 : // If it's a session
448 0 : if (client.first.find("ses") != std::string::npos) {
449 0 : *output << "\t* Session " << client.first << " is active" << std::endl;
450 : is_session = true;
451 : } else {
452 0 : *output << "> Client " << client.first << " is connected" << std::endl;
453 : }
454 :
455 : // print for each file the status
456 0 : for (const auto& file : client.second) {
457 0 : if (m_file_log_path != "") {
458 0 : *output_line_log << std::chrono::duration_cast<std::chrono::seconds>(
459 0 : std::chrono::system_clock::now().time_since_epoch())
460 0 : .count()
461 0 : << sep << file->get_file_name() << sep << file->get_file_path() << sep << file->get_group_id()
462 0 : << sep << file->get_src().get_ip_port() << sep << file->get_size() << sep
463 :
464 0 : << file->get_dest().get_ip_port() << sep << file->get_start_time_str() << sep
465 0 : << file->get_total_duration_ms() << sep << file->get_progress() << sep
466 0 : << file->get_transmission_speed() << sep << status_type::status_to_string(file->get_status())
467 : << sep
468 :
469 0 : << file->get_end_time_str() << sep << file->get_error_code() << sep
470 :
471 0 : << std::endl;
472 : }
473 :
474 0 : if (is_session) {
475 0 : *output << "\t\t - ";
476 : } else {
477 0 : *output << "\t - ";
478 : }
479 :
480 0 : if (is_session)
481 0 : *output << file->get_file_name() << "\t" << file->get_size() << " bytes\tfrom " << file->get_src().get_ip_port()
482 : << "\t"
483 : // << file->get_dest().get_ip_port() << "\t"
484 0 : << status_type::status_to_string(file->get_status()) << "\t" << file->get_progress() << "%\t"
485 0 : << (file->get_transmission_speed() == 0 ? "-" : std::to_string(file->get_transmission_speed()))
486 0 : << "Bi/s\t" << file->get_start_time_str() << "\t" << file->get_total_duration_ms() << "ms\t"
487 0 : << file->get_end_time_str() << "\t" << file->get_error_code() << "\t" << std::endl;
488 : else
489 0 : *output << "Available file " << file->get_file_path() << "\t" << file->get_size() << " bytes\tfrom "
490 0 : << file->get_src().get_ip_port() << "\t" << std::endl;
491 : }
492 : }
493 :
494 0 : *output << std::endl << "Active Transfers :" << std::endl;
495 :
496 0 : *output << "Group ID\t" << "Protocol\t" << "Src\t" << "IP\t" << "Status\t" << std::endl;
497 :
498 0 : for (const auto& [id, g] : get_grp_transfers()) {
499 0 : *output << g.get_group_id() << "\t" << protocol_type::protocols_to_string(g.get_protocol()) << "\t"
500 0 : << g.get_source_id() << "\t" << g.get_source_ip().get_ip_port() << "\t"
501 0 : << status_type::status_to_string(g.get_group_status()) << "\t" << std::endl;
502 :
503 0 : for (const std::shared_ptr<TransferMetadata>& fmeta : g.get_transfers_meta()) {
504 0 : *output << "\t- " << fmeta->get_file_name() << "\t" << fmeta->get_src().get_ip_port() << " to "
505 0 : << fmeta->get_dest().get_ip_port() << "\t" << status_type::status_to_string(fmeta->get_status()) << "\t"
506 0 : << std::endl;
507 : }
508 :
509 0 : for (const std::string& f : g.get_expected_files()) {
510 0 : *output << "\t- " << f << "\t" << "Expected" << std::endl;
511 : }
512 : }
513 :
514 0 : if (m_file_log_path != "") {
515 0 : dynamic_cast<std::ofstream*>(output)->close();
516 0 : dynamic_cast<std::ofstream*>(output_line_log)->close();
517 : } else {
518 0 : output->flush();
519 : }
520 0 : }
521 :
522 : void
523 0 : Bookkeeper::add_update_transfer(const std::string& client_id, const std::string& data)
524 : {
525 : // Loading the data and convert to a proper transfer metadata object
526 0 : std::shared_ptr<TransferMetadata> file = std::make_shared<TransferMetadata>(data, false);
527 0 : std::string group_id_tmp = file->get_group_id();
528 :
529 0 : std::vector<std::shared_ptr<TransferMetadata>>& tr_vector = get_transfers()[client_id];
530 0 : for (std::shared_ptr<TransferMetadata>& tr : tr_vector) {
531 0 : if (*tr == *file) {
532 : // Already inserted, simply update the one already present
533 0 : tr->from_string(data);
534 :
535 : // Add available file information
536 : // file->set_group_id("");
537 : // m_transfers[client_id].push_back(file);
538 :
539 0 : return;
540 : }
541 : }
542 :
543 : // Check if transfer already exist in a group transfer
544 0 : if (group_id_tmp != "" && m_grp_transfers.find(group_id_tmp) != m_grp_transfers.end()) {
545 0 : m_grp_transfers.at(group_id_tmp).add_file(file);
546 : }
547 0 : m_transfers[client_id].push_back(file);
548 0 : }
549 :
550 : void
551 0 : Bookkeeper::add_update_grp_transfer(GroupMetadata grp_transfers)
552 : {
553 0 : std::string group_id_tmp = grp_transfers.get_group_id();
554 0 : if (m_grp_transfers.find(group_id_tmp) != m_grp_transfers.end()) {
555 : // Already inserted, copy old values
556 0 : grp_transfers.set_transfers_meta(std::move(m_grp_transfers.at(grp_transfers.get_group_id()).get_transfers_meta()));
557 0 : grp_transfers.set_expected_files(std::move(m_grp_transfers.at(grp_transfers.get_group_id()).get_expected_files()));
558 0 : m_grp_transfers.erase(group_id_tmp);
559 : }
560 0 : m_grp_transfers.insert({ group_id_tmp, std::move(grp_transfers) });
561 0 : }
562 : } // namespace dunedaq::snbmodules
|