DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
dunedaq::snbmodules::Bookkeeper Class Reference

#include <bookkeeper.hpp>

Inheritance diagram for dunedaq::snbmodules::Bookkeeper:
[legend]
Collaboration diagram for dunedaq::snbmodules::Bookkeeper:
[legend]

Public Member Functions

 Bookkeeper (const IPFormat &listening_ip, std::string bookkeeper_id, std::string file_log_path="", int refresh_rate=5, std::string connection_prefix="snbmodules", int timeout_send=10, int timeout_receive=100)
 Constructor with params.
 
 ~Bookkeeper ()
 Destructor.
 
bool start ()
 Start the bookkeeper, Only used for stand alone application.
 
void do_work (std::atomic< bool > &running_flag)
 Start the bookkeeper thread to receive notifications.
 
bool action_on_receive_notification (NotificationData notif) override
 Action to do when receiving a notification.
 
void input_action (char input)
 Do action depending on the input, Used for stand alone application TODO: remake.
 
void create_new_transfer (const std::string &protocol, const std::string &src, const std::set< std::string > &dests, const std::set< std::filesystem::path > &files, const nlohmann::json &protocol_options=nlohmann::json())
 
void display_information ()
 Display the information of the bookkeeper either on the normal log or on a specific file depending on the value of m_file_log_path.
 
void request_update_metadata (bool force=false)
 Request the update of the metadata from every known clients to the bookkeeper. Only get from group transfers that are in a dynamic state (downloading, checking, uploading...)
 
void set_bookkeeper_id (std::string bookkeeper_id)
 
void set_ip (const IPFormat &ip)
 
void add_update_transfer (const std::string &client_id, const std::string &data)
 
void add_update_grp_transfer (GroupMetadata grp_transfers)
 
std::string get_bookkeeper_id () const
 
IPFormat get_ip () const
 
std::map< std::string, GroupMetadata > & get_grp_transfers ()
 
const std::map< std::string, GroupMetadata > & get_grp_transfers () const
 
std::map< std::string, std::vector< std::shared_ptr< TransferMetadata > > > & get_transfers ()
 
const std::map< std::string, std::vector< std::shared_ptr< TransferMetadata > > > & get_transfers () const
 
- Public Member Functions inherited from dunedaq::snbmodules::NotificationInterface
 NotificationInterface (std::string connection_prefix="snbmodules", int timeout_send=10, int timeout_receive=100)
 
 NotificationInterface (std::vector< std::string > bk_conn, std::set< std::string > client_conn, const std::string &connection_prefix="snbmodules", int timeout_send=10, int timeout_receive=100)
 
virtual ~NotificationInterface ()=default
 
std::optional< NotificationDatalisten_for_notification (const std::string &id, const std::string &expected_from="", int timeout=-1, int tries=-1)
 Listen for a notification.
 
bool send_notification (const notification_type::e_notification_type &notif, const std::string &src, const std::string &dst, const std::string &id_conn, const std::string &data="", int tries=-1)
 Send a notification during m_timeout_send ms.
 
void lookups_connections ()
 Get the list of every connections, must have the prefix first in the name of the connection and the name of the connection must be in the format: prefix.*bookkeeper.* or prefix.*client.*.
 
const std::vector< std::string > & get_bookkeepers_conn () const
 Init the connection interface, Only used for standalone application.
 
const std::set< std::string > & get_clients_conn () const
 

Private Member Functions

void request_connection_and_available_files (const std::string &client)
 Send a notification to a clients id or connection to get available files.
 
void start_transfers (const std::string &transfer_id)
 Start a new transfer.
 
std::string get_client_name_from_session_name (const std::string &session_name) const
 Usefull convertion from session id to client id.
 

Private Attributes

std::string m_bookkeeper_id
 Unique identifier for the bookkeeper.
 
IPFormat m_ip
 IP address of the bookkeeper.
 
std::map< std::string, GroupMetadatam_grp_transfers
 map of group_id -> group_transfers
 
std::map< std::string, std::set< std::string > > m_clients_per_grp_transfer
 map of grp_transfer_id -> clients_id
 
std::map< std::string, std::vector< std::shared_ptr< TransferMetadata > > > m_transfers
 Map of files/current transfers, client_id -> set of transfers.
 
std::string m_file_log_path = ""
 should the information pannel of transfers be displayed on the normal log or a specific file
 
int m_refresh_rate = 5
 Refresh rate of the information pannel of transfers in seconds.
 

Detailed Description

Definition at line 31 of file bookkeeper.hpp.

Constructor & Destructor Documentation

◆ Bookkeeper()

dunedaq::snbmodules::Bookkeeper::Bookkeeper ( const IPFormat & listening_ip,
std::string bookkeeper_id,
std::string file_log_path = "",
int refresh_rate = 5,
std::string connection_prefix = "snbmodules",
int timeout_send = 10,
int timeout_receive = 100 )
inline

Constructor with params.

Definition at line 36 of file bookkeeper.hpp.

43 : NotificationInterface(std::move(connection_prefix), timeout_send, timeout_receive)
44 {
45 set_bookkeeper_id(std::move(bookkeeper_id));
46 set_ip(listening_ip);
47 m_file_log_path = std::move(file_log_path);
48 m_refresh_rate = refresh_rate;
50 }
void set_bookkeeper_id(std::string bookkeeper_id)
void set_ip(const IPFormat &ip)
void display_information()
Display the information of the bookkeeper either on the normal log or on a specific file depending on...
std::string m_file_log_path
should the information pannel of transfers be displayed on the normal log or a specific file
int m_refresh_rate
Refresh rate of the information pannel of transfers in seconds.
NotificationInterface(std::string connection_prefix="snbmodules", int timeout_send=10, int timeout_receive=100)

◆ ~Bookkeeper()

dunedaq::snbmodules::Bookkeeper::~Bookkeeper ( )
inline

Destructor.

Definition at line 53 of file bookkeeper.hpp.

53{}

Member Function Documentation

◆ action_on_receive_notification()

bool dunedaq::snbmodules::Bookkeeper::action_on_receive_notification ( NotificationData notif)
overridevirtual

Action to do when receiving a notification.

Parameters
notifNotification received
Returns
true if the notification was handled, false otherwise

Implements dunedaq::snbmodules::NotificationInterface.

Definition at line 359 of file bookkeeper.cpp.

360{
361 TLOG() << "debug : Action on request " << notif.m_notification << " for " << notif.m_target_id;
362
363 if (notif.m_target_id.find(get_bookkeeper_id()) == std::string::npos && notif.m_target_id != "all") {
365 NotificationWrongDestinationError(ERS_HERE, get_bookkeeper_id(), notif.m_source_id, notif.m_target_id));
366 return false;
367 }
368
369 // Use enum cast for converting string to enum, still working with older clients and user readable
370 auto action = notification_type::string_to_notification(notif.m_notification);
371
372 if (action.has_value() == false) {
374 InvalidNotificationReceivedError(ERS_HERE, get_bookkeeper_id(), notif.m_source_id, notif.m_notification));
375 }
376
377 switch (action.value()) {
379 if (notif.m_data == "end") {
380 // Create entry in the map in case no files
381 m_transfers[notif.m_source_id];
382 break;
383 }
384
385 // Store it
386 add_update_transfer(notif.m_source_id, notif.m_data);
387 break;
388 }
389
392 // Loading the data and cnovert to a proper transfer metadata object
393 GroupMetadata group_meta(notif.m_data, false);
394
395 // Store it
396 m_clients_per_grp_transfer[group_meta.get_group_id()].insert(notif.m_source_id);
397 add_update_grp_transfer(std::move(group_meta));
398 break;
399 }
400
401 default:
402 ers::warning(NotHandledNotificationError(ERS_HERE, get_bookkeeper_id(), notif.m_source_id, notif.m_notification));
403 }
404 return true;
405}
#define ERS_HERE
std::map< std::string, std::set< std::string > > m_clients_per_grp_transfer
map of grp_transfer_id -> clients_id
void add_update_grp_transfer(GroupMetadata grp_transfers)
void add_update_transfer(const std::string &client_id, const std::string &data)
std::map< std::string, std::vector< std::shared_ptr< TransferMetadata > > > m_transfers
Map of files/current transfers, client_id -> set of transfers.
std::string get_bookkeeper_id() const
#define TLOG(...)
Definition macro.hpp:22
void warning(const Issue &issue)
Definition ers.hpp:115
static std::optional< e_notification_type > string_to_notification(std::string s)

◆ add_update_grp_transfer()

void dunedaq::snbmodules::Bookkeeper::add_update_grp_transfer ( GroupMetadata grp_transfers)

Definition at line 551 of file bookkeeper.cpp.

552{
553 std::string group_id_tmp = grp_transfers.get_group_id();
554 if (m_grp_transfers.find(group_id_tmp) != m_grp_transfers.end()) {
555 // Already inserted, copy old values
556 grp_transfers.set_transfers_meta(std::move(m_grp_transfers.at(grp_transfers.get_group_id()).get_transfers_meta()));
557 grp_transfers.set_expected_files(std::move(m_grp_transfers.at(grp_transfers.get_group_id()).get_expected_files()));
558 m_grp_transfers.erase(group_id_tmp);
559 }
560 m_grp_transfers.insert({ group_id_tmp, std::move(grp_transfers) });
561}
std::map< std::string, GroupMetadata > m_grp_transfers
map of group_id -> group_transfers

◆ add_update_transfer()

void dunedaq::snbmodules::Bookkeeper::add_update_transfer ( const std::string & client_id,
const std::string & data )

Definition at line 523 of file bookkeeper.cpp.

524{
525 // Loading the data and convert to a proper transfer metadata object
526 std::shared_ptr<TransferMetadata> file = std::make_shared<TransferMetadata>(data, false);
527 std::string group_id_tmp = file->get_group_id();
528
529 std::vector<std::shared_ptr<TransferMetadata>>& tr_vector = get_transfers()[client_id];
530 for (std::shared_ptr<TransferMetadata>& tr : tr_vector) {
531 if (*tr == *file) {
532 // Already inserted, simply update the one already present
533 tr->from_string(data);
534
535 // Add available file information
536 // file->set_group_id("");
537 // m_transfers[client_id].push_back(file);
538
539 return;
540 }
541 }
542
543 // Check if transfer already exist in a group transfer
544 if (group_id_tmp != "" && m_grp_transfers.find(group_id_tmp) != m_grp_transfers.end()) {
545 m_grp_transfers.at(group_id_tmp).add_file(file);
546 }
547 m_transfers[client_id].push_back(file);
548}
std::map< std::string, std::vector< std::shared_ptr< TransferMetadata > > > & get_transfers()

◆ create_new_transfer()

void dunedaq::snbmodules::Bookkeeper::create_new_transfer ( const std::string & protocol,
const std::string & src,
const std::set< std::string > & dests,
const std::set< std::filesystem::path > & files,
const nlohmann::json & protocol_options = nlohmann::json() )

Definition at line 19 of file bookkeeper.cpp.

24{
25 // suppress warnings
26 (void)protocol;
27 (void)src;
28 (void)dests;
29 (void)files;
30 (void)protocol_options;
31
32 // TLOG() << "debug : creating new transfer with protocol " << protocol;
33
34 // // Check protocol
35 // std::optional<protocol_type::e_protocol_type> _protocol = protocol_type::string_to_protocols(protocol);
36 // if (!_protocol.has_value())
37 // {
38 // TLOG() << "debug : invalid protocol !";
39 // return;
40 // }
41
42 // // Check files and found metadata
43 // std::set<TransferMetadata *> files_meta;
44 // auto list = get_m_transfers().at(src);
45 // int nb_files = 0;
46
47 // for (std::filesystem::path file : files)
48 // {
49 // bool found = false;
50 // for (auto filemeta : list)
51 // {
52 // if (filemeta.get_file_name() == file)
53 // {
54 // files_meta.emplace(&filemeta);
55 // found = true;
56 // nb_files++;
57 // break;
58 // }
59 // }
60 // if (!found)
61 // {
62 // TLOG() << "debug : file " << file << " not found ! (ignoring)";
63 // }
64 // }
65
66 // if (nb_files == 0)
67 // {
68 // TLOG() << "debug : no file found ! exiting transfer creation.";
69 // return;
70 // }
71
72 // // Create transfer
73 // GroupMetadata transfer("transfer" + std::to_string(transfers.size()), _protocol.value(), files_meta);
74 // transfers_link[transfer.get_group_id()] = src;
75 // transfers.emplace(transfer);
76
77 // // Notify clients
78 // for (auto client : src)
79 // {
80 // std::string session_name = client + "_ses" + transfer.get_group_id();
81
82 // TLOG() << "debug : notifying src client " << client;
83 // send_notification(notification_type::e_notification_type::NEW_TRANSFER, get_bookkeeper_id(), session_name,
84 // client, 1000, transfer.export_to_string());
85 // }
86}

◆ display_information()

void dunedaq::snbmodules::Bookkeeper::display_information ( )

Display the information of the bookkeeper either on the normal log or on a specific file depending on the value of m_file_log_path.

Definition at line 408 of file bookkeeper.cpp.

409{
410 std::ostream* output = nullptr;
411 std::ostream* output_line_log = nullptr;
412 std::string sep = ";";
413
414 if (m_file_log_path != "") {
415 // open file
416 output = new std::ofstream();
417 output_line_log = new std::ofstream();
418 dynamic_cast<std::ofstream*>(output)->open(m_file_log_path + get_bookkeeper_id() + ".log", std::fstream::out);
419 dynamic_cast<std::ofstream*>(output_line_log)
420 ->open(m_file_log_path + get_bookkeeper_id() + "_line.csv", std::fstream::app | std::fstream::out);
421 // clear file
422 dynamic_cast<std::ofstream*>(output)->clear();
423 TLOG() << "debug : output log wroten " << m_file_log_path << get_bookkeeper_id() << ".log\t" << m_file_log_path
424 << get_bookkeeper_id() << "_line.csv";
425
426 // if csv file empty, write header
427 if (dynamic_cast<std::ofstream*>(output_line_log)->tellp() == 0) {
428 *output_line_log << "time" << sep << "file_name" << sep << "file_full_path" << sep << "group_id" << sep
429 << "src_ip" << sep << "size" << sep
430
431 << "dest_ip" << sep << "start_time" << sep << "duration" << sep << "progress" << sep << "speed"
432 << sep << "state" << sep
433
434 << "end_time" << sep << "error" << sep << std::endl;
435 }
436 } else {
437 output = &std::cout;
438 }
439
440 *output << "***** Bookkeeper " << get_bookkeeper_id() << " " + get_ip().get_ip_port() << " informations display *****"
441 << std::endl;
442 // *output << "q: quit, d : display info, n : new transfer, s : start transfer" << std::endl;
443 *output << "Connected clients :" << std::endl;
444
445 for (const auto& client : get_transfers()) {
446 bool is_session = false;
447 // If it's a session
448 if (client.first.find("ses") != std::string::npos) {
449 *output << "\t* Session " << client.first << " is active" << std::endl;
450 is_session = true;
451 } else {
452 *output << "> Client " << client.first << " is connected" << std::endl;
453 }
454
455 // print for each file the status
456 for (const auto& file : client.second) {
457 if (m_file_log_path != "") {
458 *output_line_log << std::chrono::duration_cast<std::chrono::seconds>(
459 std::chrono::system_clock::now().time_since_epoch())
460 .count()
461 << sep << file->get_file_name() << sep << file->get_file_path() << sep << file->get_group_id()
462 << sep << file->get_src().get_ip_port() << sep << file->get_size() << sep
463
464 << file->get_dest().get_ip_port() << sep << file->get_start_time_str() << sep
465 << file->get_total_duration_ms() << sep << file->get_progress() << sep
466 << file->get_transmission_speed() << sep << status_type::status_to_string(file->get_status())
467 << sep
468
469 << file->get_end_time_str() << sep << file->get_error_code() << sep
470
471 << std::endl;
472 }
473
474 if (is_session) {
475 *output << "\t\t - ";
476 } else {
477 *output << "\t - ";
478 }
479
480 if (is_session)
481 *output << file->get_file_name() << "\t" << file->get_size() << " bytes\tfrom " << file->get_src().get_ip_port()
482 << "\t"
483 // << file->get_dest().get_ip_port() << "\t"
484 << status_type::status_to_string(file->get_status()) << "\t" << file->get_progress() << "%\t"
485 << (file->get_transmission_speed() == 0 ? "-" : std::to_string(file->get_transmission_speed()))
486 << "Bi/s\t" << file->get_start_time_str() << "\t" << file->get_total_duration_ms() << "ms\t"
487 << file->get_end_time_str() << "\t" << file->get_error_code() << "\t" << std::endl;
488 else
489 *output << "Available file " << file->get_file_path() << "\t" << file->get_size() << " bytes\tfrom "
490 << file->get_src().get_ip_port() << "\t" << std::endl;
491 }
492 }
493
494 *output << std::endl << "Active Transfers :" << std::endl;
495
496 *output << "Group ID\t" << "Protocol\t" << "Src\t" << "IP\t" << "Status\t" << std::endl;
497
498 for (const auto& [id, g] : get_grp_transfers()) {
499 *output << g.get_group_id() << "\t" << protocol_type::protocols_to_string(g.get_protocol()) << "\t"
500 << g.get_source_id() << "\t" << g.get_source_ip().get_ip_port() << "\t"
501 << status_type::status_to_string(g.get_group_status()) << "\t" << std::endl;
502
503 for (const std::shared_ptr<TransferMetadata>& fmeta : g.get_transfers_meta()) {
504 *output << "\t- " << fmeta->get_file_name() << "\t" << fmeta->get_src().get_ip_port() << " to "
505 << fmeta->get_dest().get_ip_port() << "\t" << status_type::status_to_string(fmeta->get_status()) << "\t"
506 << std::endl;
507 }
508
509 for (const std::string& f : g.get_expected_files()) {
510 *output << "\t- " << f << "\t" << "Expected" << std::endl;
511 }
512 }
513
514 if (m_file_log_path != "") {
515 dynamic_cast<std::ofstream*>(output)->close();
516 dynamic_cast<std::ofstream*>(output_line_log)->close();
517 } else {
518 output->flush();
519 }
520}
std::map< std::string, GroupMetadata > & get_grp_transfers()
std::string get_ip_port() const
Get the IP address and the port in the format "ip:port".
Definition ip_format.hpp:57
static std::string protocols_to_string(e_protocol_type e)
static std::string status_to_string(e_status e)

◆ do_work()

void dunedaq::snbmodules::Bookkeeper::do_work ( std::atomic< bool > & running_flag)

Start the bookkeeper thread to receive notifications.

Parameters
running_flagstop Flag to stop the thread

Definition at line 235 of file bookkeeper.cpp.

236{
237 TLOG() << "JAB " << __LINE__;
238 // Just one request on startup, after that the clients will have to send by themself
239 for (const std::string& client : get_clients_conn()) {
240 TLOG() << "JAB " << __LINE__ << " " << client;
242 }
243 TLOG() << "JAB " << __LINE__;
244
245 auto time_point = std::chrono::high_resolution_clock::now();
246
247 while (running_flag.load()) {
248 TLOG() << "JAB " << __LINE__;
250 TLOG() << "JAB " << __LINE__ << " " << get_bookkeepers_conn().size();
251 std::optional<NotificationData> msg = listen_for_notification(get_bookkeepers_conn().front());
252 TLOG() << "JAB " << __LINE__;
253 if (msg.has_value()) {
254 TLOG() << "JAB " << __LINE__;
256 }
257 TLOG() << "JAB " << __LINE__;
258
259 // check alives clients and available files
260 std::this_thread::sleep_for(std::chrono::milliseconds(100));
261
262 // Auto update metadata every 2 seconds
263 if (std::chrono::duration_cast<std::chrono::seconds>(std::chrono::high_resolution_clock::now() - time_point)
264 .count() >= m_refresh_rate) {
265 time_point = std::chrono::high_resolution_clock::now();
268 }
269 }
270}
void request_update_metadata(bool force=false)
Request the update of the metadata from every known clients to the bookkeeper. Only get from group tr...
bool action_on_receive_notification(NotificationData notif) override
Action to do when receiving a notification.
void request_connection_and_available_files(const std::string &client)
Send a notification to a clients id or connection to get available files.
std::optional< NotificationData > listen_for_notification(const std::string &id, const std::string &expected_from="", int timeout=-1, int tries=-1)
Listen for a notification.
void lookups_connections()
Get the list of every connections, must have the prefix first in the name of the connection and the n...
const std::vector< std::string > & get_bookkeepers_conn() const
Init the connection interface, Only used for standalone application.
const std::set< std::string > & get_clients_conn() const

◆ get_bookkeeper_id()

std::string dunedaq::snbmodules::Bookkeeper::get_bookkeeper_id ( ) const
inline

Definition at line 92 of file bookkeeper.hpp.

92{ return m_bookkeeper_id; }
std::string m_bookkeeper_id
Unique identifier for the bookkeeper.

◆ get_client_name_from_session_name()

std::string dunedaq::snbmodules::Bookkeeper::get_client_name_from_session_name ( const std::string & session_name) const
inlineprivate

Usefull convertion from session id to client id.

Parameters
session_namesession id
Returns
client id

Definition at line 134 of file bookkeeper.hpp.

135 {
136 return session_name.substr(0, session_name.find("_"));
137 }

◆ get_grp_transfers() [1/2]

std::map< std::string, GroupMetadata > & dunedaq::snbmodules::Bookkeeper::get_grp_transfers ( )
inline

Definition at line 94 of file bookkeeper.hpp.

94{ return m_grp_transfers; }

◆ get_grp_transfers() [2/2]

const std::map< std::string, GroupMetadata > & dunedaq::snbmodules::Bookkeeper::get_grp_transfers ( ) const
inline

Definition at line 95 of file bookkeeper.hpp.

95{ return m_grp_transfers; }

◆ get_ip()

IPFormat dunedaq::snbmodules::Bookkeeper::get_ip ( ) const
inline

Definition at line 93 of file bookkeeper.hpp.

93{ return m_ip; }
IPFormat m_ip
IP address of the bookkeeper.

◆ get_transfers() [1/2]

std::map< std::string, std::vector< std::shared_ptr< TransferMetadata > > > & dunedaq::snbmodules::Bookkeeper::get_transfers ( )
inline

Definition at line 96 of file bookkeeper.hpp.

96{ return m_transfers; }

◆ get_transfers() [2/2]

const std::map< std::string, std::vector< std::shared_ptr< TransferMetadata > > > & dunedaq::snbmodules::Bookkeeper::get_transfers ( ) const
inline

Definition at line 97 of file bookkeeper.hpp.

98 {
99 return m_transfers;
100 }

◆ input_action()

void dunedaq::snbmodules::Bookkeeper::input_action ( char input)

Do action depending on the input, Used for stand alone application TODO: remake.

Parameters
inputInput from the user

Definition at line 90 of file bookkeeper.cpp.

91{
92 switch (input) {
93 case 'q': {
94 TLOG() << "Exiting...";
95 exit(0);
96 break;
97 }
98 case 'd': {
100 break;
101 }
102 case 'n': {
103 TLOG() << "Creating new transfer ...";
104 TLOG() << "Choose protocol in the list";
106 enum_i++) {
107 TLOG() << enum_i << " - "
109 }
110 int protocol = -1;
111 std::cin >> protocol;
112 // Check input
113 if (protocol < 0 || protocol > protocol_type::e_protocol_type::dummy) {
114 TLOG() << "Invalid protocol";
115 break;
116 }
117
118 TLOG() << "Choose clients (q when finished)";
119 std::set<std::string> choosen_clients;
120 while (true) {
121 std::string client;
122 std::cin >> client;
123 if (client == "q") {
124 break;
125 }
126 // Check input
127 if (std::find(get_clients_conn().begin(), get_clients_conn().end(), client) == get_clients_conn().end()) {
128 TLOG() << "Invalid client";
129 break;
130 }
131
132 choosen_clients.emplace(client);
133 }
134 if (choosen_clients.empty()) {
135 TLOG() << "No client selected";
136 break;
137 }
138 if (choosen_clients.size() < 2) {
139 TLOG() << "At least 2 clients are required";
140 break;
141 }
142
143 TLOG() << "Choose file to transmit (q when finished)";
144 std::set<std::shared_ptr<TransferMetadata>> choosen_files;
145 while (true) {
146 uint64_t initial_size = choosen_files.size();
147 std::string file;
148 std::cin >> file;
149 if (file == "q") {
150 break;
151 }
152
153 for (const std::string& client : choosen_clients) {
154 // Check input
155 auto& list = get_transfers().at(client);
156 bool found = false;
157 for (std::shared_ptr<TransferMetadata> filemeta : list) {
158 if (filemeta->get_file_name() == file) {
159 choosen_files.emplace(filemeta);
160 found = true;
161 break;
162 }
163 }
164 if (found == true) {
165 break;
166 }
167 }
168 if (initial_size == choosen_files.size()) {
169 TLOG() << "Invalid file";
170 break;
171 }
172 }
173 if (choosen_files.empty()) {
174 TLOG() << "No file selected";
175 break;
176 }
177
178 // create_new_transfer(protocol_type::protocols_to_string(protocol_type::string_to_protocols(protocol).value()),
179 // choosen_clients, choosen_files);
180 break;
181 }
182
183 case 's': {
184 TLOG() << "Choose transfers to start ... ";
185 std::set<std::string> choosen_transfers;
186 while (true) {
187 std::string input;
188 std::cin >> input;
189 if (input == "q") {
190 break;
191 }
192 // Check input
194 TLOG() << "Invalid transfer";
195 break;
196 }
197 choosen_transfers.emplace(input);
198 }
199 if (choosen_transfers.size() == 0) {
200 TLOG() << "No transfer selected";
201 break;
202 }
203
204 for (const auto& transfer : choosen_transfers) {
205 start_transfers(transfer);
206 }
207 break;
208 }
209
210 default:
211 TLOG() << "Unknown command";
212 break;
213 }
214}
void start_transfers(const std::string &transfer_id)
Start a new transfer.
e_protocol_type
Different type of protocols available for communication.

◆ request_connection_and_available_files()

void dunedaq::snbmodules::Bookkeeper::request_connection_and_available_files ( const std::string & client)
private

Send a notification to a clients id or connection to get available files.

Parameters
clientclient id or connection name

Definition at line 317 of file bookkeeper.cpp.

318{
319 // send connection request to client
322 client,
323 client,
325 1);
326
327 // Listen to receive connection response and available files
328 // auto msg = listen_for_notification(get_bookkeepers_conn().front(), client);
329
330 // while (msg.has_value() && msg.value().m_notification !=
331 // notification_type::notification_to_string(notification_type::e_notification_type::CONNECTION_REQUEST))
332 // {
333 // action_on_receive_notification(msg.value());
334 // msg = listen_for_notification(get_bookkeepers_conn().front(), client);
335 // }
336}
bool send_notification(const notification_type::e_notification_type &notif, const std::string &src, const std::string &dst, const std::string &id_conn, const std::string &data="", int tries=-1)
Send a notification during m_timeout_send ms.

◆ request_update_metadata()

void dunedaq::snbmodules::Bookkeeper::request_update_metadata ( bool force = false)

Request the update of the metadata from every known clients to the bookkeeper. Only get from group transfers that are in a dynamic state (downloading, checking, uploading...)

Parameters
forceForce the update of the metadata even if the transfer is not in a dynamic state

Definition at line 339 of file bookkeeper.cpp.

340{
341 for (const auto& [id, g] : get_grp_transfers()) {
342 // Only request for dynamic status
343 if (g.get_group_status() == status_type::e_status::DOWNLOADING ||
344 g.get_group_status() == status_type::e_status::CHECKING ||
345 g.get_group_status() == status_type::e_status::UPLOADING ||
346 g.get_group_status() == status_type::e_status::HASHING || force) {
347
348 for (const std::string& session : m_clients_per_grp_transfer[g.get_group_id()]) {
351 session,
353 }
354 }
355 }
356}
std::string get_client_name_from_session_name(const std::string &session_name) const
Usefull convertion from session id to client id.

◆ set_bookkeeper_id()

void dunedaq::snbmodules::Bookkeeper::set_bookkeeper_id ( std::string bookkeeper_id)
inline

Definition at line 86 of file bookkeeper.hpp.

86{ m_bookkeeper_id = std::move(bookkeeper_id); }

◆ set_ip()

void dunedaq::snbmodules::Bookkeeper::set_ip ( const IPFormat & ip)
inline

Definition at line 87 of file bookkeeper.hpp.

87{ m_ip = ip; }

◆ start()

bool dunedaq::snbmodules::Bookkeeper::start ( )

Start the bookkeeper, Only used for stand alone application.

Returns
False if the bookkeeper did not stopped correctly

Definition at line 273 of file bookkeeper.cpp.

274{
275 auto time_point = std::chrono::high_resolution_clock::now();
276
277 // Just one request on startup, after that the clients will have to send by themself
278 for (const std::string& client : get_clients_conn()) {
280 }
281
282 while (true) {
283 std::optional<NotificationData> msg = listen_for_notification(get_bookkeepers_conn().front());
284 if (msg.has_value()) {
286 }
287
288 std::string input;
289 getline(std::cin, input);
290 if (input.empty() == false) {
291 input_action(input[0]);
292 }
293
294 // check alives clients and available files
295 for (const std::string& client : get_clients_conn()) {
296 if (m_transfers.find(client) != m_transfers.end()) {
297 // already known client
298 continue;
299 }
300
302 }
303
304 std::this_thread::sleep_for(std::chrono::milliseconds(100));
305
306 // Auto update metadata every 2 seconds
307 if (std::chrono::duration_cast<std::chrono::seconds>(std::chrono::high_resolution_clock::now() - time_point)
308 .count() >= 5) {
309 time_point = std::chrono::high_resolution_clock::now();
312 }
313 }
314}
void input_action(char input)
Do action depending on the input, Used for stand alone application TODO: remake.

◆ start_transfers()

void dunedaq::snbmodules::Bookkeeper::start_transfers ( const std::string & transfer_id)
private

Start a new transfer.

Definition at line 217 of file bookkeeper.cpp.

218{
219 TLOG() << "Starting transfer " << transfer_id;
220
221 if (m_clients_per_grp_transfer.find(transfer_id) != m_clients_per_grp_transfer.end()) {
222 for (const std::string& client : m_clients_per_grp_transfer[transfer_id]) {
223 std::string session_name = client;
224 session_name += "_ses";
225 session_name += transfer_id;
228 }
229 } else {
230 ers::warning(InvalidGroupTransferIDError(ERS_HERE, transfer_id, get_bookkeeper_id()));
231 }
232}

Member Data Documentation

◆ m_bookkeeper_id

std::string dunedaq::snbmodules::Bookkeeper::m_bookkeeper_id
private

Unique identifier for the bookkeeper.

Definition at line 104 of file bookkeeper.hpp.

◆ m_clients_per_grp_transfer

std::map<std::string, std::set<std::string> > dunedaq::snbmodules::Bookkeeper::m_clients_per_grp_transfer
private

map of grp_transfer_id -> clients_id

Definition at line 113 of file bookkeeper.hpp.

◆ m_file_log_path

std::string dunedaq::snbmodules::Bookkeeper::m_file_log_path = ""
private

should the information pannel of transfers be displayed on the normal log or a specific file

Definition at line 119 of file bookkeeper.hpp.

◆ m_grp_transfers

std::map<std::string, GroupMetadata> dunedaq::snbmodules::Bookkeeper::m_grp_transfers
private

map of group_id -> group_transfers

Definition at line 110 of file bookkeeper.hpp.

◆ m_ip

IPFormat dunedaq::snbmodules::Bookkeeper::m_ip
private

IP address of the bookkeeper.

Definition at line 107 of file bookkeeper.hpp.

◆ m_refresh_rate

int dunedaq::snbmodules::Bookkeeper::m_refresh_rate = 5
private

Refresh rate of the information pannel of transfers in seconds.

Definition at line 122 of file bookkeeper.hpp.

◆ m_transfers

std::map<std::string, std::vector<std::shared_ptr<TransferMetadata> > > dunedaq::snbmodules::Bookkeeper::m_transfers
private

Map of files/current transfers, client_id -> set of transfers.

Definition at line 116 of file bookkeeper.hpp.


The documentation for this class was generated from the following files: