DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
bookkeeper.cpp
Go to the documentation of this file.
1
10
11#include <set>
12#include <string>
13#include <utility>
14#include <vector>
15
16namespace dunedaq::snbmodules {
17// TODO Aug-14-2022 Leo Joly leo.vincent.andre.joly@cern.ch : Obsolete, is this needed anymore ?
18void
19Bookkeeper::create_new_transfer(const std::string& protocol,
20 const std::string& src,
21 const std::set<std::string>& dests,
22 const std::set<std::filesystem::path>& files,
23 const nlohmann::json& protocol_options)
24{
25 // suppress warnings
26 (void)protocol;
27 (void)src;
28 (void)dests;
29 (void)files;
30 (void)protocol_options;
31
32 // TLOG() << "debug : creating new transfer with protocol " << protocol;
33
34 // // Check protocol
35 // std::optional<protocol_type::e_protocol_type> _protocol = protocol_type::string_to_protocols(protocol);
36 // if (!_protocol.has_value())
37 // {
38 // TLOG() << "debug : invalid protocol !";
39 // return;
40 // }
41
42 // // Check files and found metadata
43 // std::set<TransferMetadata *> files_meta;
44 // auto list = get_m_transfers().at(src);
45 // int nb_files = 0;
46
47 // for (std::filesystem::path file : files)
48 // {
49 // bool found = false;
50 // for (auto filemeta : list)
51 // {
52 // if (filemeta.get_file_name() == file)
53 // {
54 // files_meta.emplace(&filemeta);
55 // found = true;
56 // nb_files++;
57 // break;
58 // }
59 // }
60 // if (!found)
61 // {
62 // TLOG() << "debug : file " << file << " not found ! (ignoring)";
63 // }
64 // }
65
66 // if (nb_files == 0)
67 // {
68 // TLOG() << "debug : no file found ! exiting transfer creation.";
69 // return;
70 // }
71
72 // // Create transfer
73 // GroupMetadata transfer("transfer" + std::to_string(transfers.size()), _protocol.value(), files_meta);
74 // transfers_link[transfer.get_group_id()] = src;
75 // transfers.emplace(transfer);
76
77 // // Notify clients
78 // for (auto client : src)
79 // {
80 // std::string session_name = client + "_ses" + transfer.get_group_id();
81
82 // TLOG() << "debug : notifying src client " << client;
83 // send_notification(notification_type::e_notification_type::NEW_TRANSFER, get_bookkeeper_id(), session_name,
84 // client, 1000, transfer.export_to_string());
85 // }
86}
87
88// Only for stand alone application
89void
91{
92 switch (input) {
93 case 'q': {
94 TLOG() << "Exiting...";
95 exit(0);
96 break;
97 }
98 case 'd': {
100 break;
101 }
102 case 'n': {
103 TLOG() << "Creating new transfer ...";
104 TLOG() << "Choose protocol in the list";
106 enum_i++) {
107 TLOG() << enum_i << " - "
109 }
110 int protocol = -1;
111 std::cin >> protocol;
112 // Check input
113 if (protocol < 0 || protocol > protocol_type::e_protocol_type::dummy) {
114 TLOG() << "Invalid protocol";
115 break;
116 }
117
118 TLOG() << "Choose clients (q when finished)";
119 std::set<std::string> choosen_clients;
120 while (true) {
121 std::string client;
122 std::cin >> client;
123 if (client == "q") {
124 break;
125 }
126 // Check input
127 if (std::find(get_clients_conn().begin(), get_clients_conn().end(), client) == get_clients_conn().end()) {
128 TLOG() << "Invalid client";
129 break;
130 }
131
132 choosen_clients.emplace(client);
133 }
134 if (choosen_clients.empty()) {
135 TLOG() << "No client selected";
136 break;
137 }
138 if (choosen_clients.size() < 2) {
139 TLOG() << "At least 2 clients are required";
140 break;
141 }
142
143 TLOG() << "Choose file to transmit (q when finished)";
144 std::set<std::shared_ptr<TransferMetadata>> choosen_files;
145 while (true) {
146 uint64_t initial_size = choosen_files.size();
147 std::string file;
148 std::cin >> file;
149 if (file == "q") {
150 break;
151 }
152
153 for (const std::string& client : choosen_clients) {
154 // Check input
155 auto& list = get_transfers().at(client);
156 bool found = false;
157 for (std::shared_ptr<TransferMetadata> filemeta : list) {
158 if (filemeta->get_file_name() == file) {
159 choosen_files.emplace(filemeta);
160 found = true;
161 break;
162 }
163 }
164 if (found == true) {
165 break;
166 }
167 }
168 if (initial_size == choosen_files.size()) {
169 TLOG() << "Invalid file";
170 break;
171 }
172 }
173 if (choosen_files.empty()) {
174 TLOG() << "No file selected";
175 break;
176 }
177
178 // create_new_transfer(protocol_type::protocols_to_string(protocol_type::string_to_protocols(protocol).value()),
179 // choosen_clients, choosen_files);
180 break;
181 }
182
183 case 's': {
184 TLOG() << "Choose transfers to start ... ";
185 std::set<std::string> choosen_transfers;
186 while (true) {
187 std::string input;
188 std::cin >> input;
189 if (input == "q") {
190 break;
191 }
192 // Check input
194 TLOG() << "Invalid transfer";
195 break;
196 }
197 choosen_transfers.emplace(input);
198 }
199 if (choosen_transfers.size() == 0) {
200 TLOG() << "No transfer selected";
201 break;
202 }
203
204 for (const auto& transfer : choosen_transfers) {
205 start_transfers(transfer);
206 }
207 break;
208 }
209
210 default:
211 TLOG() << "Unknown command";
212 break;
213 }
214}
215
216void
217Bookkeeper::start_transfers(const std::string& transfer_id)
218{
219 TLOG() << "Starting transfer " << transfer_id;
220
221 if (m_clients_per_grp_transfer.find(transfer_id) != m_clients_per_grp_transfer.end()) {
222 for (const std::string& client : m_clients_per_grp_transfer[transfer_id]) {
223 std::string session_name = client;
224 session_name += "_ses";
225 session_name += transfer_id;
228 }
229 } else {
230 ers::warning(InvalidGroupTransferIDError(ERS_HERE, transfer_id, get_bookkeeper_id()));
231 }
232}
233
234void
235Bookkeeper::do_work(std::atomic<bool>& running_flag)
236{
237 TLOG() << "JAB " << __LINE__;
238 // Just one request on startup, after that the clients will have to send by themself
239 for (const std::string& client : get_clients_conn()) {
240 TLOG() << "JAB " << __LINE__ << " " << client;
242 }
243 TLOG() << "JAB " << __LINE__;
244
245 auto time_point = std::chrono::high_resolution_clock::now();
246
247 while (running_flag.load()) {
248 TLOG() << "JAB " << __LINE__;
250 TLOG() << "JAB " << __LINE__ << " " << get_bookkeepers_conn().size();
251 std::optional<NotificationData> msg = listen_for_notification(get_bookkeepers_conn().front());
252 TLOG() << "JAB " << __LINE__;
253 if (msg.has_value()) {
254 TLOG() << "JAB " << __LINE__;
256 }
257 TLOG() << "JAB " << __LINE__;
258
259 // check alives clients and available files
260 std::this_thread::sleep_for(std::chrono::milliseconds(100));
261
262 // Auto update metadata every 2 seconds
263 if (std::chrono::duration_cast<std::chrono::seconds>(std::chrono::high_resolution_clock::now() - time_point)
264 .count() >= m_refresh_rate) {
265 time_point = std::chrono::high_resolution_clock::now();
268 }
269 }
270}
271
272bool
274{
275 auto time_point = std::chrono::high_resolution_clock::now();
276
277 // Just one request on startup, after that the clients will have to send by themself
278 for (const std::string& client : get_clients_conn()) {
280 }
281
282 while (true) {
283 std::optional<NotificationData> msg = listen_for_notification(get_bookkeepers_conn().front());
284 if (msg.has_value()) {
286 }
287
288 std::string input;
289 getline(std::cin, input);
290 if (input.empty() == false) {
291 input_action(input[0]);
292 }
293
294 // check alives clients and available files
295 for (const std::string& client : get_clients_conn()) {
296 if (m_transfers.find(client) != m_transfers.end()) {
297 // already known client
298 continue;
299 }
300
302 }
303
304 std::this_thread::sleep_for(std::chrono::milliseconds(100));
305
306 // Auto update metadata every 2 seconds
307 if (std::chrono::duration_cast<std::chrono::seconds>(std::chrono::high_resolution_clock::now() - time_point)
308 .count() >= 5) {
309 time_point = std::chrono::high_resolution_clock::now();
312 }
313 }
314}
315
316void
318{
319 // send connection request to client
322 client,
323 client,
325 1);
326
327 // Listen to receive connection response and available files
328 // auto msg = listen_for_notification(get_bookkeepers_conn().front(), client);
329
330 // while (msg.has_value() && msg.value().m_notification !=
331 // notification_type::notification_to_string(notification_type::e_notification_type::CONNECTION_REQUEST))
332 // {
333 // action_on_receive_notification(msg.value());
334 // msg = listen_for_notification(get_bookkeepers_conn().front(), client);
335 // }
336}
337
338void
340{
341 for (const auto& [id, g] : get_grp_transfers()) {
342 // Only request for dynamic status
343 if (g.get_group_status() == status_type::e_status::DOWNLOADING ||
344 g.get_group_status() == status_type::e_status::CHECKING ||
345 g.get_group_status() == status_type::e_status::UPLOADING ||
346 g.get_group_status() == status_type::e_status::HASHING || force) {
347
348 for (const std::string& session : m_clients_per_grp_transfer[g.get_group_id()]) {
351 session,
353 }
354 }
355 }
356}
357
358bool
360{
361 TLOG() << "debug : Action on request " << notif.m_notification << " for " << notif.m_target_id;
362
363 if (notif.m_target_id.find(get_bookkeeper_id()) == std::string::npos && notif.m_target_id != "all") {
366 return false;
367 }
368
369 // Use enum cast for converting string to enum, still working with older clients and user readable
371
372 if (action.has_value() == false) {
374 InvalidNotificationReceivedError(ERS_HERE, get_bookkeeper_id(), notif.m_source_id, notif.m_notification));
375 }
376
377 switch (action.value()) {
379 if (notif.m_data == "end") {
380 // Create entry in the map in case no files
382 break;
383 }
384
385 // Store it
387 break;
388 }
389
392 // Loading the data and cnovert to a proper transfer metadata object
393 GroupMetadata group_meta(notif.m_data, false);
394
395 // Store it
396 m_clients_per_grp_transfer[group_meta.get_group_id()].insert(notif.m_source_id);
397 add_update_grp_transfer(std::move(group_meta));
398 break;
399 }
400
401 default:
402 ers::warning(NotHandledNotificationError(ERS_HERE, get_bookkeeper_id(), notif.m_source_id, notif.m_notification));
403 }
404 return true;
405}
406
407void
409{
410 std::ostream* output = nullptr;
411 std::ostream* output_line_log = nullptr;
412 std::string sep = ";";
413
414 if (m_file_log_path != "") {
415 // open file
416 output = new std::ofstream();
417 output_line_log = new std::ofstream();
418 dynamic_cast<std::ofstream*>(output)->open(m_file_log_path + get_bookkeeper_id() + ".log", std::fstream::out);
419 dynamic_cast<std::ofstream*>(output_line_log)
420 ->open(m_file_log_path + get_bookkeeper_id() + "_line.csv", std::fstream::app | std::fstream::out);
421 // clear file
422 dynamic_cast<std::ofstream*>(output)->clear();
423 TLOG() << "debug : output log wroten " << m_file_log_path << get_bookkeeper_id() << ".log\t" << m_file_log_path
424 << get_bookkeeper_id() << "_line.csv";
425
426 // if csv file empty, write header
427 if (dynamic_cast<std::ofstream*>(output_line_log)->tellp() == 0) {
428 *output_line_log << "time" << sep << "file_name" << sep << "file_full_path" << sep << "group_id" << sep
429 << "src_ip" << sep << "size" << sep
430
431 << "dest_ip" << sep << "start_time" << sep << "duration" << sep << "progress" << sep << "speed"
432 << sep << "state" << sep
433
434 << "end_time" << sep << "error" << sep << std::endl;
435 }
436 } else {
437 output = &std::cout;
438 }
439
440 *output << "***** Bookkeeper " << get_bookkeeper_id() << " " + get_ip().get_ip_port() << " informations display *****"
441 << std::endl;
442 // *output << "q: quit, d : display info, n : new transfer, s : start transfer" << std::endl;
443 *output << "Connected clients :" << std::endl;
444
445 for (const auto& client : get_transfers()) {
446 bool is_session = false;
447 // If it's a session
448 if (client.first.find("ses") != std::string::npos) {
449 *output << "\t* Session " << client.first << " is active" << std::endl;
450 is_session = true;
451 } else {
452 *output << "> Client " << client.first << " is connected" << std::endl;
453 }
454
455 // print for each file the status
456 for (const auto& file : client.second) {
457 if (m_file_log_path != "") {
458 *output_line_log << std::chrono::duration_cast<std::chrono::seconds>(
459 std::chrono::system_clock::now().time_since_epoch())
460 .count()
461 << sep << file->get_file_name() << sep << file->get_file_path() << sep << file->get_group_id()
462 << sep << file->get_src().get_ip_port() << sep << file->get_size() << sep
463
464 << file->get_dest().get_ip_port() << sep << file->get_start_time_str() << sep
465 << file->get_total_duration_ms() << sep << file->get_progress() << sep
466 << file->get_transmission_speed() << sep << status_type::status_to_string(file->get_status())
467 << sep
468
469 << file->get_end_time_str() << sep << file->get_error_code() << sep
470
471 << std::endl;
472 }
473
474 if (is_session) {
475 *output << "\t\t - ";
476 } else {
477 *output << "\t - ";
478 }
479
480 if (is_session)
481 *output << file->get_file_name() << "\t" << file->get_size() << " bytes\tfrom " << file->get_src().get_ip_port()
482 << "\t"
483 // << file->get_dest().get_ip_port() << "\t"
484 << status_type::status_to_string(file->get_status()) << "\t" << file->get_progress() << "%\t"
485 << (file->get_transmission_speed() == 0 ? "-" : std::to_string(file->get_transmission_speed()))
486 << "Bi/s\t" << file->get_start_time_str() << "\t" << file->get_total_duration_ms() << "ms\t"
487 << file->get_end_time_str() << "\t" << file->get_error_code() << "\t" << std::endl;
488 else
489 *output << "Available file " << file->get_file_path() << "\t" << file->get_size() << " bytes\tfrom "
490 << file->get_src().get_ip_port() << "\t" << std::endl;
491 }
492 }
493
494 *output << std::endl << "Active Transfers :" << std::endl;
495
496 *output << "Group ID\t" << "Protocol\t" << "Src\t" << "IP\t" << "Status\t" << std::endl;
497
498 for (const auto& [id, g] : get_grp_transfers()) {
499 *output << g.get_group_id() << "\t" << protocol_type::protocols_to_string(g.get_protocol()) << "\t"
500 << g.get_source_id() << "\t" << g.get_source_ip().get_ip_port() << "\t"
501 << status_type::status_to_string(g.get_group_status()) << "\t" << std::endl;
502
503 for (const std::shared_ptr<TransferMetadata>& fmeta : g.get_transfers_meta()) {
504 *output << "\t- " << fmeta->get_file_name() << "\t" << fmeta->get_src().get_ip_port() << " to "
505 << fmeta->get_dest().get_ip_port() << "\t" << status_type::status_to_string(fmeta->get_status()) << "\t"
506 << std::endl;
507 }
508
509 for (const std::string& f : g.get_expected_files()) {
510 *output << "\t- " << f << "\t" << "Expected" << std::endl;
511 }
512 }
513
514 if (m_file_log_path != "") {
515 dynamic_cast<std::ofstream*>(output)->close();
516 dynamic_cast<std::ofstream*>(output_line_log)->close();
517 } else {
518 output->flush();
519 }
520}
521
522void
523Bookkeeper::add_update_transfer(const std::string& client_id, const std::string& data)
524{
525 // Loading the data and convert to a proper transfer metadata object
526 std::shared_ptr<TransferMetadata> file = std::make_shared<TransferMetadata>(data, false);
527 std::string group_id_tmp = file->get_group_id();
528
529 std::vector<std::shared_ptr<TransferMetadata>>& tr_vector = get_transfers()[client_id];
530 for (std::shared_ptr<TransferMetadata>& tr : tr_vector) {
531 if (*tr == *file) {
532 // Already inserted, simply update the one already present
533 tr->from_string(data);
534
535 // Add available file information
536 // file->set_group_id("");
537 // m_transfers[client_id].push_back(file);
538
539 return;
540 }
541 }
542
543 // Check if transfer already exist in a group transfer
544 if (group_id_tmp != "" && m_grp_transfers.find(group_id_tmp) != m_grp_transfers.end()) {
545 m_grp_transfers.at(group_id_tmp).add_file(file);
546 }
547 m_transfers[client_id].push_back(file);
548}
549
550void
552{
553 std::string group_id_tmp = grp_transfers.get_group_id();
554 if (m_grp_transfers.find(group_id_tmp) != m_grp_transfers.end()) {
555 // Already inserted, copy old values
556 grp_transfers.set_transfers_meta(std::move(m_grp_transfers.at(grp_transfers.get_group_id()).get_transfers_meta()));
557 grp_transfers.set_expected_files(std::move(m_grp_transfers.at(grp_transfers.get_group_id()).get_expected_files()));
558 m_grp_transfers.erase(group_id_tmp);
559 }
560 m_grp_transfers.insert({ group_id_tmp, std::move(grp_transfers) });
561}
562} // namespace dunedaq::snbmodules
#define ERS_HERE
void do_work(std::atomic< bool > &running_flag)
Start the bookkeeper thread to receive notifications.
void request_update_metadata(bool force=false)
Request the update of the metadata from every known clients to the bookkeeper. Only get from group tr...
void start_transfers(const std::string &transfer_id)
Start a new transfer.
std::map< std::string, GroupMetadata > m_grp_transfers
map of group_id -> group_transfers
std::map< std::string, std::set< std::string > > m_clients_per_grp_transfer
map of grp_transfer_id -> clients_id
bool action_on_receive_notification(NotificationData notif) override
Action to do when receiving a notification.
bool start()
Start the bookkeeper, Only used for stand alone application.
void request_connection_and_available_files(const std::string &client)
Send a notification to a clients id or connection to get available files.
void display_information()
Display the information of the bookkeeper either on the normal log or on a specific file depending on...
void create_new_transfer(const std::string &protocol, const std::string &src, const std::set< std::string > &dests, const std::set< std::filesystem::path > &files, const nlohmann::json &protocol_options=nlohmann::json())
void add_update_grp_transfer(GroupMetadata grp_transfers)
void add_update_transfer(const std::string &client_id, const std::string &data)
std::map< std::string, GroupMetadata > & get_grp_transfers()
std::map< std::string, std::vector< std::shared_ptr< TransferMetadata > > > m_transfers
Map of files/current transfers, client_id -> set of transfers.
std::string m_file_log_path
should the information pannel of transfers be displayed on the normal log or a specific file
int m_refresh_rate
Refresh rate of the information pannel of transfers in seconds.
std::map< std::string, std::vector< std::shared_ptr< TransferMetadata > > > & get_transfers()
std::string get_client_name_from_session_name(const std::string &session_name) const
Usefull convertion from session id to client id.
void input_action(char input)
Do action depending on the input, Used for stand alone application TODO: remake.
std::string get_bookkeeper_id() const
void set_expected_files(std::set< std::string > expected_files)
void set_transfers_meta(std::vector< std::shared_ptr< TransferMetadata > > files_meta)
std::string get_ip_port() const
Get the IP address and the port in the format "ip:port".
Definition ip_format.hpp:57
std::optional< NotificationData > listen_for_notification(const std::string &id, const std::string &expected_from="", int timeout=-1, int tries=-1)
Listen for a notification.
void lookups_connections()
Get the list of every connections, must have the prefix first in the name of the connection and the n...
bool send_notification(const notification_type::e_notification_type &notif, const std::string &src, const std::string &dst, const std::string &id_conn, const std::string &data="", int tries=-1)
Send a notification during m_timeout_send ms.
const std::vector< std::string > & get_bookkeepers_conn() const
Init the connection interface, Only used for standalone application.
const std::set< std::string > & get_clients_conn() const
#define TLOG(...)
Definition macro.hpp:22
void warning(const Issue &issue)
Definition ers.hpp:115
NotificationData class, represent a notification.
std::string m_data
Data of the notification, can be empty.
std::string m_notification
Notification type.
static std::optional< e_notification_type > string_to_notification(std::string s)
e_protocol_type
Different type of protocols available for communication.
static std::string protocols_to_string(e_protocol_type e)
static std::string status_to_string(e_status e)