DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
bookkeeper.cpp
Go to the documentation of this file.
1
10
11#include <set>
12#include <string>
13#include <utility>
14#include <vector>
15
16namespace dunedaq::snbmodules {
17// TODO Aug-14-2022 Leo Joly leo.vincent.andre.joly@cern.ch : Obsolete, is this needed anymore ?
18void
19Bookkeeper::create_new_transfer(const std::string& protocol,
20 const std::string& src,
21 const std::set<std::string>& dests,
22 const std::set<std::filesystem::path>& files,
23 const nlohmann::json& protocol_options)
24{
25 // suppress warnings
26 (void)protocol;
27 (void)src;
28 (void)dests;
29 (void)files;
30 (void)protocol_options;
31
32 // TLOG() << "debug : creating new transfer with protocol " << protocol;
33
34 // // Check protocol
35 // std::optional<protocol_type::e_protocol_type> _protocol = protocol_type::string_to_protocols(protocol);
36 // if (!_protocol.has_value())
37 // {
38 // TLOG() << "debug : invalid protocol !";
39 // return;
40 // }
41
42 // // Check files and found metadata
43 // std::set<TransferMetadata *> files_meta;
44 // auto list = get_m_transfers().at(src);
45 // int nb_files = 0;
46
47 // for (std::filesystem::path file : files)
48 // {
49 // bool found = false;
50 // for (auto filemeta : list)
51 // {
52 // if (filemeta.get_file_name() == file)
53 // {
54 // files_meta.emplace(&filemeta);
55 // found = true;
56 // nb_files++;
57 // break;
58 // }
59 // }
60 // if (!found)
61 // {
62 // TLOG() << "debug : file " << file << " not found ! (ignoring)";
63 // }
64 // }
65
66 // if (nb_files == 0)
67 // {
68 // TLOG() << "debug : no file found ! exiting transfer creation.";
69 // return;
70 // }
71
72 // // Create transfer
73 // GroupMetadata transfer("transfer" + std::to_string(transfers.size()), _protocol.value(), files_meta);
74 // transfers_link[transfer.get_group_id()] = src;
75 // transfers.emplace(transfer);
76
77 // // Notify clients
78 // for (auto client : src)
79 // {
80 // std::string session_name = client + "_ses" + transfer.get_group_id();
81
82 // TLOG() << "debug : notifying src client " << client;
83 // send_notification(notification_type::e_notification_type::NEW_TRANSFER, get_bookkeeper_id(), session_name,
84 // client, 1000, transfer.export_to_string());
85 // }
86}
87
88// Only for stand alone application
89void
91{
92 switch (input) {
93 case 'q': {
94 TLOG() << "Exiting...";
95 exit(0);
96 break;
97 }
98 case 'd': {
100 break;
101 }
102 case 'n': {
103 TLOG() << "Creating new transfer ...";
104 TLOG() << "Choose protocol in the list";
106 enum_i++) {
107 TLOG() << enum_i << " - "
109 }
110 int protocol = -1;
111 std::cin >> protocol;
112 // Check input
113 if (protocol < 0 || protocol > protocol_type::e_protocol_type::dummy) {
114 TLOG() << "Invalid protocol";
115 break;
116 }
117
118 TLOG() << "Choose clients (q when finished)";
119 std::set<std::string> choosen_clients;
120 while (true) {
121 std::string client;
122 std::cin >> client;
123 if (client == "q") {
124 break;
125 }
126 // Check input
127 if (std::find(get_clients_conn().begin(), get_clients_conn().end(), client) == get_clients_conn().end()) {
128 TLOG() << "Invalid client";
129 break;
130 }
131
132 choosen_clients.emplace(client);
133 }
134 if (choosen_clients.empty()) {
135 TLOG() << "No client selected";
136 break;
137 }
138 if (choosen_clients.size() < 2) {
139 TLOG() << "At least 2 clients are required";
140 break;
141 }
142
143 TLOG() << "Choose file to transmit (q when finished)";
144 std::set<std::shared_ptr<TransferMetadata>> choosen_files;
145 while (true) {
146 uint64_t initial_size = choosen_files.size();
147 std::string file;
148 std::cin >> file;
149 if (file == "q") {
150 break;
151 }
152
153 for (const std::string& client : choosen_clients) {
154 // Check input
155 auto& list = get_transfers().at(client);
156 bool found = false;
157 for (std::shared_ptr<TransferMetadata> filemeta : list) {
158 if (filemeta->get_file_name() == file) {
159 choosen_files.emplace(filemeta);
160 found = true;
161 break;
162 }
163 }
164 if (found == true) {
165 break;
166 }
167 }
168 if (initial_size == choosen_files.size()) {
169 TLOG() << "Invalid file";
170 break;
171 }
172 }
173 if (choosen_files.empty()) {
174 TLOG() << "No file selected";
175 break;
176 }
177
178 // create_new_transfer(protocol_type::protocols_to_string(protocol_type::string_to_protocols(protocol).value()),
179 // choosen_clients, choosen_files);
180 break;
181 }
182
183 case 's': {
184 TLOG() << "Choose transfers to start ... ";
185 std::set<std::string> choosen_transfers;
186 while (true) {
187 std::string input;
188 std::cin >> input;
189 if (input == "q") {
190 break;
191 }
192 // Check input
194 TLOG() << "Invalid transfer";
195 break;
196 }
197 choosen_transfers.emplace(input);
198 }
199 if (choosen_transfers.size() == 0) {
200 TLOG() << "No transfer selected";
201 break;
202 }
203
204 for (const auto& transfer : choosen_transfers) {
205#pragma GCC diagnostic push
206#pragma GCC diagnostic ignored "-Wdeprecated-declarations"
207 start_transfers(transfer);
208#pragma GCC diagnostic pop
209 }
210 break;
211 }
212
213 default:
214 TLOG() << "Unknown command";
215 break;
216 }
217}
218
219void
220Bookkeeper::start_transfers(const std::string& transfer_id)
221{
222 TLOG() << "Starting transfer " << transfer_id;
223
224 if (m_clients_per_grp_transfer.find(transfer_id) != m_clients_per_grp_transfer.end()) {
225 for (const std::string& client : m_clients_per_grp_transfer[transfer_id]) {
226 std::string session_name = client;
227 session_name += "_ses";
228 session_name += transfer_id;
231 }
232 } else {
233 ers::warning(InvalidGroupTransferIDError(ERS_HERE, transfer_id, get_bookkeeper_id()));
234 }
235}
236
237void
238Bookkeeper::do_work(std::atomic<bool>& running_flag)
239{
240 TLOG() << "JAB " << __LINE__;
241 // Just one request on startup, after that the clients will have to send by themself
242 for (const std::string& client : get_clients_conn()) {
243 TLOG() << "JAB " << __LINE__ << " " << client;
245 }
246 TLOG() << "JAB " << __LINE__;
247
248 auto time_point = std::chrono::high_resolution_clock::now();
249
250 while (running_flag.load()) {
251 TLOG() << "JAB " << __LINE__;
253 TLOG() << "JAB " << __LINE__ << " " << get_bookkeepers_conn().size();
254 std::optional<NotificationData> msg = listen_for_notification(get_bookkeepers_conn().front());
255 TLOG() << "JAB " << __LINE__;
256 if (msg.has_value()) {
257 TLOG() << "JAB " << __LINE__;
259 }
260 TLOG() << "JAB " << __LINE__;
261
262 // check alives clients and available files
263 std::this_thread::sleep_for(std::chrono::milliseconds(100));
264
265 // Auto update metadata every 2 seconds
266 if (std::chrono::duration_cast<std::chrono::seconds>(std::chrono::high_resolution_clock::now() - time_point)
267 .count() >= m_refresh_rate) {
268 time_point = std::chrono::high_resolution_clock::now();
271 }
272 }
273}
274
275bool
277{
278 auto time_point = std::chrono::high_resolution_clock::now();
279
280 // Just one request on startup, after that the clients will have to send by themself
281 for (const std::string& client : get_clients_conn()) {
283 }
284
285 while (true) {
286 std::optional<NotificationData> msg = listen_for_notification(get_bookkeepers_conn().front());
287 if (msg.has_value()) {
289 }
290
291 std::string input;
292 getline(std::cin, input);
293 if (input.empty() == false) {
294 input_action(input[0]);
295 }
296
297 // check alives clients and available files
298 for (const std::string& client : get_clients_conn()) {
299 if (m_transfers.find(client) != m_transfers.end()) {
300 // already known client
301 continue;
302 }
303
305 }
306
307 std::this_thread::sleep_for(std::chrono::milliseconds(100));
308
309 // Auto update metadata every 2 seconds
310 if (std::chrono::duration_cast<std::chrono::seconds>(std::chrono::high_resolution_clock::now() - time_point)
311 .count() >= 5) {
312 time_point = std::chrono::high_resolution_clock::now();
315 }
316 }
317}
318
319void
321{
322 // send connection request to client
325 client,
326 client,
328 1);
329
330 // Listen to receive connection response and available files
331 // auto msg = listen_for_notification(get_bookkeepers_conn().front(), client);
332
333 // while (msg.has_value() && msg.value().m_notification !=
334 // notification_type::notification_to_string(notification_type::e_notification_type::CONNECTION_REQUEST))
335 // {
336 // action_on_receive_notification(msg.value());
337 // msg = listen_for_notification(get_bookkeepers_conn().front(), client);
338 // }
339}
340
341void
343{
344 for (const auto& [id, g] : get_grp_transfers()) {
345 // Only request for dynamic status
346 if (g.get_group_status() == status_type::e_status::DOWNLOADING ||
347 g.get_group_status() == status_type::e_status::CHECKING ||
348 g.get_group_status() == status_type::e_status::UPLOADING ||
349 g.get_group_status() == status_type::e_status::HASHING || force) {
350
351 for (const std::string& session : m_clients_per_grp_transfer[g.get_group_id()]) {
354 session,
356 }
357 }
358 }
359}
360
361bool
363{
364 TLOG() << "debug : Action on request " << notif.m_notification << " for " << notif.m_target_id;
365
366 if (notif.m_target_id.find(get_bookkeeper_id()) == std::string::npos && notif.m_target_id != "all") {
369 return false;
370 }
371
372 // Use enum cast for converting string to enum, still working with older clients and user readable
374
375 if (action.has_value() == false) {
377 InvalidNotificationReceivedError(ERS_HERE, get_bookkeeper_id(), notif.m_source_id, notif.m_notification));
378 }
379
380 switch (action.value()) {
382 if (notif.m_data == "end") {
383 // Create entry in the map in case no files
385 break;
386 }
387
388 // Store it
390 break;
391 }
392
395 // Loading the data and cnovert to a proper transfer metadata object
396 GroupMetadata group_meta(notif.m_data, false);
397
398 // Store it
399 m_clients_per_grp_transfer[group_meta.get_group_id()].insert(notif.m_source_id);
400 add_update_grp_transfer(std::move(group_meta));
401 break;
402 }
403
404 default:
405 ers::warning(NotHandledNotificationError(ERS_HERE, get_bookkeeper_id(), notif.m_source_id, notif.m_notification));
406 }
407 return true;
408}
409
410void
412{
413 std::ostream* output = nullptr;
414 std::ostream* output_line_log = nullptr;
415 std::string sep = ";";
416
417 if (m_file_log_path != "") {
418 // open file
419 output = new std::ofstream();
420 output_line_log = new std::ofstream();
421 dynamic_cast<std::ofstream*>(output)->open(m_file_log_path + get_bookkeeper_id() + ".log", std::fstream::out);
422 dynamic_cast<std::ofstream*>(output_line_log)
423 ->open(m_file_log_path + get_bookkeeper_id() + "_line.csv", std::fstream::app | std::fstream::out);
424 // clear file
425 dynamic_cast<std::ofstream*>(output)->clear();
426 TLOG() << "debug : output log wroten " << m_file_log_path << get_bookkeeper_id() << ".log\t" << m_file_log_path
427 << get_bookkeeper_id() << "_line.csv";
428
429 // if csv file empty, write header
430 if (dynamic_cast<std::ofstream*>(output_line_log)->tellp() == 0) {
431 *output_line_log << "time" << sep << "file_name" << sep << "file_full_path" << sep << "group_id" << sep
432 << "src_ip" << sep << "size" << sep
433
434 << "dest_ip" << sep << "start_time" << sep << "duration" << sep << "progress" << sep << "speed"
435 << sep << "state" << sep
436
437 << "end_time" << sep << "error" << sep << std::endl;
438 }
439 } else {
440 output = &std::cout;
441 }
442
443 *output << "***** Bookkeeper " << get_bookkeeper_id() << " " + get_ip().get_ip_port() << " informations display *****"
444 << std::endl;
445 // *output << "q: quit, d : display info, n : new transfer, s : start transfer" << std::endl;
446 *output << "Connected clients :" << std::endl;
447
448 for (const auto& client : get_transfers()) {
449 bool is_session = false;
450 // If it's a session
451 if (client.first.find("ses") != std::string::npos) {
452 *output << "\t* Session " << client.first << " is active" << std::endl;
453 is_session = true;
454 } else {
455 *output << "> Client " << client.first << " is connected" << std::endl;
456 }
457
458 // print for each file the status
459 for (const auto& file : client.second) {
460 if (m_file_log_path != "") {
461 *output_line_log << std::chrono::duration_cast<std::chrono::seconds>(
462 std::chrono::system_clock::now().time_since_epoch())
463 .count()
464 << sep << file->get_file_name() << sep << file->get_file_path() << sep << file->get_group_id()
465 << sep << file->get_src().get_ip_port() << sep << file->get_size() << sep
466
467 << file->get_dest().get_ip_port() << sep << file->get_start_time_str() << sep
468 << file->get_total_duration_ms() << sep << file->get_progress() << sep
469 << file->get_transmission_speed() << sep << status_type::status_to_string(file->get_status())
470 << sep
471
472 << file->get_end_time_str() << sep << file->get_error_code() << sep
473
474 << std::endl;
475 }
476
477 if (is_session) {
478 *output << "\t\t - ";
479 } else {
480 *output << "\t - ";
481 }
482
483 if (is_session)
484 *output << file->get_file_name() << "\t" << file->get_size() << " bytes\tfrom " << file->get_src().get_ip_port()
485 << "\t"
486 // << file->get_dest().get_ip_port() << "\t"
487 << status_type::status_to_string(file->get_status()) << "\t" << file->get_progress() << "%\t"
488 << (file->get_transmission_speed() == 0 ? "-" : std::to_string(file->get_transmission_speed()))
489 << "Bi/s\t" << file->get_start_time_str() << "\t" << file->get_total_duration_ms() << "ms\t"
490 << file->get_end_time_str() << "\t" << file->get_error_code() << "\t" << std::endl;
491 else
492 *output << "Available file " << file->get_file_path() << "\t" << file->get_size() << " bytes\tfrom "
493 << file->get_src().get_ip_port() << "\t" << std::endl;
494 }
495 }
496
497 *output << std::endl << "Active Transfers :" << std::endl;
498
499 *output << "Group ID\t" << "Protocol\t" << "Src\t" << "IP\t" << "Status\t" << std::endl;
500
501 for (const auto& [id, g] : get_grp_transfers()) {
502 *output << g.get_group_id() << "\t" << protocol_type::protocols_to_string(g.get_protocol()) << "\t"
503 << g.get_source_id() << "\t" << g.get_source_ip().get_ip_port() << "\t"
504 << status_type::status_to_string(g.get_group_status()) << "\t" << std::endl;
505
506 for (const std::shared_ptr<TransferMetadata>& fmeta : g.get_transfers_meta()) {
507 *output << "\t- " << fmeta->get_file_name() << "\t" << fmeta->get_src().get_ip_port() << " to "
508 << fmeta->get_dest().get_ip_port() << "\t" << status_type::status_to_string(fmeta->get_status()) << "\t"
509 << std::endl;
510 }
511
512 for (const std::string& f : g.get_expected_files()) {
513 *output << "\t- " << f << "\t" << "Expected" << std::endl;
514 }
515 }
516
517 if (m_file_log_path != "") {
518 dynamic_cast<std::ofstream*>(output)->close();
519 dynamic_cast<std::ofstream*>(output_line_log)->close();
520 } else {
521 output->flush();
522 }
523}
524
525void
526Bookkeeper::add_update_transfer(const std::string& client_id, const std::string& data)
527{
528 // Loading the data and convert to a proper transfer metadata object
529 std::shared_ptr<TransferMetadata> file = std::make_shared<TransferMetadata>(data, false);
530 std::string group_id_tmp = file->get_group_id();
531
532 std::vector<std::shared_ptr<TransferMetadata>>& tr_vector = get_transfers()[client_id];
533 for (std::shared_ptr<TransferMetadata>& tr : tr_vector) {
534 if (*tr == *file) {
535 // Already inserted, simply update the one already present
536 tr->from_string(data);
537
538 // Add available file information
539 // file->set_group_id("");
540 // m_transfers[client_id].push_back(file);
541
542 return;
543 }
544 }
545
546 // Check if transfer already exist in a group transfer
547 if (group_id_tmp != "" && m_grp_transfers.find(group_id_tmp) != m_grp_transfers.end()) {
548 m_grp_transfers.at(group_id_tmp).add_file(file);
549 }
550 m_transfers[client_id].push_back(file);
551}
552
553void
555{
556 std::string group_id_tmp = grp_transfers.get_group_id();
557 if (m_grp_transfers.find(group_id_tmp) != m_grp_transfers.end()) {
558 // Already inserted, copy old values
559 grp_transfers.set_transfers_meta(m_grp_transfers.at(grp_transfers.get_group_id()).get_transfers_meta());
560 grp_transfers.set_expected_files(m_grp_transfers.at(grp_transfers.get_group_id()).get_expected_files());
561 m_grp_transfers.erase(group_id_tmp);
562 }
563 m_grp_transfers.insert({ group_id_tmp, std::move(grp_transfers) });
564}
565} // namespace dunedaq::snbmodules
#define ERS_HERE
void do_work(std::atomic< bool > &running_flag)
Start the bookkeeper thread to receive notifications.
void request_update_metadata(bool force=false)
Request the update of the metadata from every known clients to the bookkeeper. Only get from group tr...
void start_transfers(const std::string &transfer_id)
Start a new transfer.
std::map< std::string, GroupMetadata > m_grp_transfers
map of group_id -> group_transfers
std::map< std::string, std::set< std::string > > m_clients_per_grp_transfer
map of grp_transfer_id -> clients_id
bool action_on_receive_notification(NotificationData notif) override
Action to do when receiving a notification.
bool start()
Start the bookkeeper, Only used for stand alone application.
void request_connection_and_available_files(const std::string &client)
Send a notification to a clients id or connection to get available files.
void display_information()
Display the information of the bookkeeper either on the normal log or on a specific file depending on...
void create_new_transfer(const std::string &protocol, const std::string &src, const std::set< std::string > &dests, const std::set< std::filesystem::path > &files, const nlohmann::json &protocol_options=nlohmann::json())
void add_update_grp_transfer(GroupMetadata grp_transfers)
void add_update_transfer(const std::string &client_id, const std::string &data)
std::map< std::string, GroupMetadata > & get_grp_transfers()
std::map< std::string, std::vector< std::shared_ptr< TransferMetadata > > > m_transfers
Map of files/current transfers, client_id -> set of transfers.
std::string m_file_log_path
should the information pannel of transfers be displayed on the normal log or a specific file
int m_refresh_rate
Refresh rate of the information pannel of transfers in seconds.
std::map< std::string, std::vector< std::shared_ptr< TransferMetadata > > > & get_transfers()
std::string get_client_name_from_session_name(const std::string &session_name) const
Usefull convertion from session id to client id.
void input_action(char input)
Do action depending on the input, Used for stand alone application TODO: remake.
std::string get_bookkeeper_id() const
void set_expected_files(std::set< std::string > expected_files)
void set_transfers_meta(std::vector< std::shared_ptr< TransferMetadata > > files_meta)
std::string get_ip_port() const
Get the IP address and the port in the format "ip:port".
Definition ip_format.hpp:57
std::optional< NotificationData > listen_for_notification(const std::string &id, const std::string &expected_from="", int timeout=-1, int tries=-1)
Listen for a notification.
void lookups_connections()
Get the list of every connections, must have the prefix first in the name of the connection and the n...
bool send_notification(const notification_type::e_notification_type &notif, const std::string &src, const std::string &dst, const std::string &id_conn, const std::string &data="", int tries=-1)
Send a notification during m_timeout_send ms.
const std::vector< std::string > & get_bookkeepers_conn() const
Init the connection interface, Only used for standalone application.
const std::set< std::string > & get_clients_conn() const
#define TLOG(...)
Definition macro.hpp:22
void warning(const Issue &issue)
Definition ers.hpp:115
NotificationData class, represent a notification.
std::string m_data
Data of the notification, can be empty.
std::string m_notification
Notification type.
static std::optional< e_notification_type > string_to_notification(std::string s)
e_protocol_type
Different type of protocols available for communication.
static std::string protocols_to_string(e_protocol_type e)
static std::string status_to_string(e_status e)