DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
transfer_client.cpp
Go to the documentation of this file.
1
10
11#include <memory>
12#include <set>
13#include <string>
14#include <utility>
15#include <vector>
16
17namespace dunedaq::snbmodules {
19 const std::string& client_id,
20 const std::filesystem::path& listening_dir,
21 const std::string& connection_prefix /*= "snbmodules"*/,
22 int timeout_send /*= 10*/,
23 int timeout_receive /*= 100*/)
24 : NotificationInterface(connection_prefix, timeout_send, timeout_receive)
25 , m_listening_ip(listening_ip)
26 , m_client_id(client_id)
27{
28 TLOG() << "DAB " << __LINE__ << " " << listening_dir;
29 // remove all occurences of ./ in the file path
30 std::string file_path_str = listening_dir.string();
31 std::string x = "./";
32
33 size_t pos = 0;
34 while (true) {
35 TLOG() << "DAB " << __LINE__ << " " << file_path_str;
36 pos = file_path_str.find(x, pos);
37 if (pos == std::string::npos) {
38 break;
39 }
40
41 file_path_str.replace(pos, x.length(), "");
42 }
43 TLOG() << "DAB " << __LINE__ << " " << file_path_str;
44 if (file_path_str.length() == 0) {
45 file_path_str = "blah";
46 }
47 TLOG() << "DAB " << __LINE__ << " " << file_path_str;
48 m_listening_dir = std::filesystem::absolute(file_path_str);
49 TLOG() << "DAB " << __LINE__ << " " << file_path_str;
50 std::filesystem::create_directories(m_listening_dir);
51}
52
54
55bool
57{
58
59 while (true) {
60 // When starting, client wait for notification from bookkeeper.
61 std::optional<NotificationData> msg = listen_for_notification(get_my_conn(), "", timeout);
62 if (msg.has_value()) {
64 } else {
65 TLOG() << "debug : no notification received, timeout";
66 return false;
67 }
68
69 // print status of sessions
70 for (const auto& session : m_sessions) {
71 TLOG() << session.to_string();
72 }
73 }
74
75 return true;
76}
77
78bool
79TransferClient::do_work(std::atomic<bool>& running_flag)
80{
81
82 while (running_flag.load()) {
83 if (get_bookkeepers_conn().size() == 0) {
84 TLOG() << "debug : no bookkeeper connected, looking for connections";
86 }
87 // When starting, client wait for notification from bookkeeper.
88 std::optional<NotificationData> msg = listen_for_notification(get_my_conn());
89 if (msg.has_value()) {
91
92 // print status of sessions
93 for (const auto& session : m_sessions) {
94 TLOG() << session.to_string();
95 }
96 }
97 }
98
99 return true;
100}
101
102void
103TransferClient::create_new_transfer(const std::string& transfer_id,
104 const std::string& protocol,
105 const std::set<std::string>& dest_clients,
106 const std::set<std::filesystem::path>& files,
107 const nlohmann::json& protocol_options /*= nlohmann::json()*/)
108{
109
110 std::string session_name = generate_session_id(transfer_id);
111
112 // Checking if transfer already exists
113 auto ses = get_session(session_name);
114 if (ses != nullptr) {
115 TLOG() << "debug : transfer " << transfer_id << " already exists !";
116 return;
117 }
118
119 TLOG() << "debug : creating new transfer with protocol " << protocol;
120
121 std::optional<protocol_type::e_protocol_type> _protocol = protocol_type::string_to_protocols(protocol);
122 if (!_protocol.has_value()) {
123 ers::error(InvalidProtocolError(ERS_HERE, get_client_id(), protocol));
124 return;
125 }
126 // Initialize transfer
127
128 GroupMetadata group_transfer(transfer_id, session_name, m_listening_ip, _protocol.value(), protocol_options);
129
130 for (const auto& file : files) {
131 // Check if file exists
132 if (!std::filesystem::exists(file)) {
133 ers::warning(FileForTransferNotExistError(ERS_HERE, get_client_id(), file));
134 continue;
135 } else {
136 group_transfer.add_expected_file(file);
137 group_transfer.add_file(std::move(create_metadata_from_file(file)));
138 }
139 }
140
141 if (group_transfer.get_transfers_meta().empty()) {
142 ers::error(FileForTransferNotExistError(ERS_HERE, get_client_id(), "All files"));
143 return;
144 }
145
146 // Sending to bookkeepers to update preparing state
147 for (auto& bk : get_bookkeepers_conn()) {
149 notification_type::e_notification_type::GROUP_METADATA, session_name, bk, bk, group_transfer.export_to_string());
150 }
151
152 // Create local session, can take time depending on protocol
153 auto& s = create_session(std::move(group_transfer),
155 session_name,
156 get_listening_dir().append(transfer_id),
158 dest_clients);
159
160 // Notify clients and sending group metadata
161 for (const auto& client : dest_clients) {
162 std::string new_session_name = generate_session_id(transfer_id, client);
163 TLOG() << "debug : notifying client " << client;
166 new_session_name,
167 client,
168 s.get_transfer_options().export_to_string());
169
170 // Send transfer metadata
171 for (auto file : s.get_transfer_options().get_transfers_meta()) {
174 new_session_name,
175 client,
176 file->export_to_string());
177 }
178 }
179}
180
181void
182TransferClient::start_transfer(const std::string& transfer_id)
183{
184
185 TransferSession* session = get_session(transfer_id);
186
187 if (session == nullptr) {
188 ers::warning(SessionIDNotFoundInClientError(ERS_HERE, get_client_id(), transfer_id));
189 return;
190 }
191
192 session->start_all();
193}
194
195void
196TransferClient::pause_transfer(const std::string& transfer_id)
197{
198
199 TransferSession* session = get_session(transfer_id);
200
201 if (session == nullptr) {
202 ers::warning(SessionIDNotFoundInClientError(ERS_HERE, get_client_id(), transfer_id));
203 return;
204 }
205
206 session->pause_all();
207}
208void
209TransferClient::resume_transfer(const std::string& transfer_id)
210{
211
212 TransferSession* session = get_session(transfer_id);
213
214 if (session == nullptr) {
215 ers::warning(SessionIDNotFoundInClientError(ERS_HERE, get_client_id(), transfer_id));
216 return;
217 }
218
219 session->resume_all();
220}
221
222void
223TransferClient::cancel_transfer(const std::string& transfer_id)
224{
225
226 TransferSession* session = get_session(transfer_id);
227
228 if (session == nullptr) {
229 ers::warning(SessionIDNotFoundInClientError(ERS_HERE, get_client_id(), transfer_id));
230 return;
231 }
232
233 session->cancel_all();
234}
235
237TransferClient::get_session(std::string transfer_id)
238{
239 if (transfer_id.find("ses") == std::string::npos) {
240 transfer_id = generate_session_id(transfer_id);
241 }
242
243 for (auto& s : m_sessions) {
244 if (s.get_session_id() == transfer_id) {
245 return &s;
246 }
247 }
248 return nullptr;
249}
250
251// TODO ip useless ?
254 e_session_type type,
255 std::string id,
256 const std::filesystem::path& work_dir,
257 IPFormat ip /*= IPFormat()*/,
258 const std::set<std::string>& dest_clients /*= std::set<std::string>()*/)
259{
260 if (ip.is_default()) {
261 ip = get_ip();
262 }
263
264 TransferSession new_session(
265 std::move(transfer_options), type, id, ip, work_dir, get_bookkeepers_conn(), get_clients_conn());
266
267 m_sessions.emplace_back(std::move(new_session));
268 TLOG() << "debug : session created " << TransferSession::session_type_to_string(type);
269 m_sessions.back().set_target_clients(dest_clients);
270
271 return m_sessions.back();
272}
273
274void
275TransferClient::share_available_files(const std::set<std::filesystem::path>& to_share, const std::string& dest)
276{
277
278 for (const std::filesystem::path& f : to_share) {
279 TLOG() << "debug : Sharing " << f.filename();
280 std::shared_ptr<TransferMetadata> fmeta = create_metadata_from_file(f);
281 m_available_files.insert({ f.string(), fmeta });
284 dest,
285 dest,
286 fmeta->export_to_string());
287 }
288}
289
290bool
292{
293 TLOG() << "debug : Action on request " << notif.m_notification << " for " << notif.m_target_id;
294
295 if (notif.m_target_id.find(get_client_id()) == std::string::npos && notif.m_target_id != "all") {
297 return false;
298 }
299
300 // Use enum cast for converting string to enum, still working with older clients and user readable
302
303 if (action.has_value() == false) {
304 ers::warning(InvalidNotificationReceivedError(ERS_HERE, get_client_id(), notif.m_source_id, notif.m_notification));
305 }
306
307 switch (action.value()) {
308
310 TLOG() << "debug : receive connection request, sending available files";
311 std::set<std::filesystem::path> to_share;
312 scan_available_files(to_share, true);
313 share_available_files(to_share, notif.m_source_id);
316 notif.m_source_id,
317 notif.m_source_id,
318 "end");
319 break;
320 }
321
323 GroupMetadata metadata(notif.m_data, false);
325
326 // If file is available, create a downloader session
327 for (const auto& f : metadata.get_expected_files()) {
328 if (m_available_files.find(f) != m_available_files.end()) {
329 type = Uploader;
330 break;
331 }
332 }
333
334 TLOG() << "debug : creating session " << notif.m_target_id << " type "
336 std::string group_id_tmp = metadata.get_group_id();
337 create_session(metadata, type, notif.m_target_id, get_listening_dir().append(group_id_tmp));
338 break;
339 }
340
342 auto fmeta = std::make_shared<TransferMetadata>(notif.m_data, false);
343 auto& sessions_ref = get_sessions();
344
345 // print sessions names
346 for (auto& s : sessions_ref) {
347 TLOG() << "debug : session " << s.get_session_id();
348 if (s.get_session_id().find(notif.m_target_id) != std::string::npos) {
349 TLOG() << "debug : session found";
350 fmeta->set_dest(s.get_ip());
351 s.add_file(fmeta);
352 return true;
353 }
354 }
355
356 ers::warning(SessionIDNotFoundInClientError(ERS_HERE, get_client_id(), notif.m_target_id));
357
358 break;
359 }
360
362 TLOG() << "debug : starting transfer " << notif.m_target_id;
364 if (ses != nullptr) {
365
366 if (notif.m_data == "") {
367 ses->start_all();
368 } else {
370 }
371 } else {
372 ers::warning(SessionIDNotFoundInClientError(ERS_HERE, get_client_id(), notif.m_target_id));
373 }
374
375 break;
376 }
377
379 TLOG() << "debug : pausing transfer " << notif.m_target_id;
381 if (ses != nullptr) {
382
383 if (notif.m_data == "") {
384 ses->pause_all();
385 } else {
387 }
388 } else {
389 ers::warning(SessionIDNotFoundInClientError(ERS_HERE, get_client_id(), notif.m_target_id));
390 }
391
392 break;
393 }
394
396 TLOG() << "debug : resuming transfer " << notif.m_target_id;
398 if (ses != nullptr) {
399
400 if (notif.m_data == "") {
401 ses->resume_all();
402 } else {
404 }
405 } else {
406 ers::warning(SessionIDNotFoundInClientError(ERS_HERE, get_client_id(), notif.m_target_id));
407 }
408
409 break;
410 }
411
413 TLOG() << "debug : cancelling transfer " << notif.m_target_id;
415 if (ses != nullptr) {
416
417 if (notif.m_data == "") {
418 ses->cancel_all();
419 } else {
421 }
422 } else {
423 ers::warning(SessionIDNotFoundInClientError(ERS_HERE, get_client_id(), notif.m_target_id));
424 }
425
426 break;
427 }
428
430 TLOG() << "debug : updating grp transfer for " << notif.m_target_id;
432 if (ses != nullptr) {
433
434 if (notif.m_data == "") {
436 } else {
439 }
440 } else {
441 ers::warning(SessionIDNotFoundInClientError(ERS_HERE, get_client_id(), notif.m_target_id));
442 }
443
444 break;
445 }
446
447 default:
448 ers::warning(NotHandledNotificationError(ERS_HERE, get_client_id(), notif.m_source_id, notif.m_notification));
449 }
450 return true;
451}
452
453// TransferSession &TransferClient::add_session(TransferSession session)
454// {
455// // std::string ses_id = session.get_session_id();
456// // m_sessions[ses_id] = std::move(session);
457// // return m_sessions.at(ses_id);
458// }
459
460void
461TransferClient::remove_session(const std::string& session_id)
462{
463 auto s = get_session(session_id);
464 if (s == nullptr) {
465 ers::warning(SessionIDNotFoundInClientError(ERS_HERE, get_client_id(), session_id));
466 return;
467 }
468 for (size_t i = 0; i < m_sessions.size(); i++) {
469 if (s->get_session_id() == session_id) {
470 m_sessions.erase(m_sessions.begin() + i);
471 return;
472 }
473 }
474}
475
476std::string
477TransferClient::generate_session_id(const std::string& transferid, const std::string& dest_id /*= ""*/)
478{
479 std::string id = "";
480
481 id += dest_id == "" ? get_client_id() : dest_id;
482 id += "_ses";
483 id += transferid;
484
485 return id;
486}
487
488void
489TransferClient::scan_available_files(std::set<std::filesystem::path>& previous_scan,
490 bool nested,
491 std::filesystem::path folder) // NOLINT
492{
493 if (folder.empty()) {
494 folder = get_listening_dir();
495 }
496
497 TLOG() << "debug : scanning files in " << folder;
498
499 // First scan group metadata files
500 for (const auto& entry : std::filesystem::directory_iterator(folder)) {
501 if (entry.is_regular_file() && entry.path().extension() == GroupMetadata::m_file_extension) {
502 if (previous_scan.insert(entry.path()).second) {
503 TLOG() << "debug : found new file " << entry.path();
504
505 GroupMetadata metadata = GroupMetadata(entry.path());
506 bool already_active = false;
507
508 for (const auto& s : get_sessions()) {
509 if (s.get_transfer_options() == metadata) {
510 already_active = true;
511 break;
512 }
513 }
514
515 if (!already_active) {
516 std::string group_id_tmp = metadata.get_group_id();
517 create_session(metadata,
518 Uploader,
519 generate_session_id(group_id_tmp),
520 get_listening_dir().append("ses" + std::to_string(m_sessions.size())));
521 }
522 }
523 }
524 }
525
526 for (const auto& entry : std::filesystem::directory_iterator(folder)) {
527 if (entry.is_regular_file() && entry.path().extension() != GroupMetadata::m_file_extension) {
528 if (previous_scan.insert(entry.path()).second) {
529 TLOG() << "debug : found new file " << entry.path();
530 if (entry.path().extension() == TransferMetadata::m_file_extension) {
531 std::shared_ptr<TransferMetadata> metadata = std::make_shared<TransferMetadata>(entry.path());
532
533 bool wanted = false;
534 for (auto& s : get_sessions()) {
535 auto expect_ref = s.get_transfer_options().get_expected_files();
536 if (expect_ref.find(metadata->get_file_name()) != expect_ref.end()) {
537 s.add_file(metadata);
538 wanted = true;
539 break;
540 }
541 }
542 if (!wanted) {
543 // If not wanted yet (or never), delete the file from the scan list to find it again later
544 TLOG() << "debug : " << entry.path() << " not wanted yet, deleting from scan list";
545 previous_scan.erase(entry.path());
546 metadata.reset();
547 }
548 }
549 }
550 } else if (entry.is_directory()) {
551 TLOG() << "debug : found directory " << entry.path();
552 if (nested) {
553 scan_available_files(previous_scan, nested, entry.path()); // NOLINT
554 }
555 }
556 }
557}
558
559std::shared_ptr<TransferMetadata>
560TransferClient::create_metadata_from_file(const std::filesystem::path& src)
561{
562 return std::make_shared<TransferMetadata>(src, std::filesystem::file_size(src), get_ip());
563}
564
565std::string
567{
568 TLOG() << "BBB " << __LINE__ << " my_conn=\"" << m_my_conn << "\"";
569 if (m_my_conn.empty()) {
570 for (const std::string& c : get_clients_conn()) {
571 std::string str1 = c;
572 std::string str2 = get_client_id();
573
574 if (str1.starts_with("snbmodules_")) {
575 str1 = str1.substr(11);
576 }
577 if (str2.starts_with("snbmodules_")) {
578 str2 = str2.substr(11);
579 }
580
581 if (str1.starts_with("snb-sample-config-")) {
582 str1 = str1.substr(18);
583 }
584 if (str2.starts_with("snb-sample-config-")) {
585 str2 = str2.substr(18);
586 }
587
588 TLOG() << "BBB " << __LINE__ << " c=\"" << c << "\" client_id=" << get_client_id() << " " << str1 << " " << str2;
589 // if (c.find(get_client_id()) != std::string::npos)
590 if (str1 == str2) {
591 m_my_conn = c;
592 break;
593 }
594 }
595
596 if (m_my_conn.empty()) {
597 ers::warning(ConnectionNotFoundError(ERS_HERE, get_client_id()));
598 }
599 }
600 return m_my_conn;
601}
602} // namespace dunedaq::snbmodules
#define ERS_HERE
std::string export_to_string() override
Export metadata to string (json format)
void add_expected_file(const std::filesystem::path &file)
TransferMetadata & add_file(std::shared_ptr< TransferMetadata > meta)
const std::set< std::string > & get_expected_files() const
TransferMetadata & get_transfer_meta_from_file_path(const std::string &file_path)
std::vector< std::shared_ptr< TransferMetadata > > & get_transfers_meta()
static const std::string m_file_extension
Class that represents an IP address and a port TODO: should be replaced by something better ?
Definition ip_format.hpp:26
bool is_default() const
Check if the IP address and the port are set to the default values.
Definition ip_format.hpp:61
std::optional< NotificationData > listen_for_notification(const std::string &id, const std::string &expected_from="", int timeout=-1, int tries=-1)
Listen for a notification.
void lookups_connections()
Get the list of every connections, must have the prefix first in the name of the connection and the n...
bool send_notification(const notification_type::e_notification_type &notif, const std::string &src, const std::string &dst, const std::string &id_conn, const std::string &data="", int tries=-1)
Send a notification during m_timeout_send ms.
const std::vector< std::string > & get_bookkeepers_conn() const
Init the connection interface, Only used for standalone application.
const std::set< std::string > & get_clients_conn() const
void create_new_transfer(const std::string &transfer_id, const std::string &protocol, const std::set< std::string > &dest_clients, const std::set< std::filesystem::path > &files, const nlohmann::json &protocol_options=nlohmann::json())
Create a new transfer.
void cancel_transfer(const std::string &transfer_id)
TransferSession & create_session(GroupMetadata transfer_options, e_session_type type, std::string id, const std::filesystem::path &work_dir, IPFormat ip=IPFormat(), const std::set< std::string > &dest_clients=std::set< std::string >())
Create a new session, you can precise the IP address of the session forcing to use a specific network...
void start_transfer(const std::string &transfer_id)
Start, pause, resume or cancel a transfer.
TransferClient(const IPFormat &listening_ip, const std::string &client_id, const std::filesystem::path &listening_dir, const std::string &connection_prefix="snbmodules", int timeout_send=10, int timeout_receive=100)
TransferClient constructor.
TransferSession * get_session(std::string transfer_id)
std::filesystem::path m_listening_dir
Listening directory, directory where the client will listen for incoming files and files to share.
void remove_session(const std::string &session_id)
Add a session to the client.
std::map< std::string, std::shared_ptr< TransferMetadata > > m_available_files
Map of available files (key = file path, value = file metadata)
std::filesystem::path get_listening_dir() const
void scan_available_files(std::set< std::filesystem::path > &previous_scan, bool nested=false, std::filesystem::path folder=std::filesystem::path())
Scan available files in the listening directory.
bool start(int timeout)
Start the client in curent thread.
std::string m_my_conn
Connection uuid of the client, retrieved using the notification interface and calling get_my_conn()
std::vector< TransferSession > m_sessions
Map of active sessions (key = session ID, value = session)
void share_available_files(const std::set< std::filesystem::path > &to_share, const std::string &dest)
Share available files (in m_listening_dir)
void pause_transfer(const std::string &transfer_id)
std::shared_ptr< TransferMetadata > create_metadata_from_file(const std::filesystem::path &src)
Create a metadata from a file.
std::vector< TransferSession > & get_sessions()
std::string generate_session_id(const std::string &transferid, const std::string &dest="")
Function to generate session ID.
void resume_transfer(const std::string &transfer_id)
IPFormat m_listening_ip
IP address of the client.
bool do_work(std::atomic< bool > &running_flag)
Start function to use in a thread.
bool action_on_receive_notification(NotificationData notif) override
Action to do when the client receive a notification.
TransferSession class contained in a client, is a wrapper for a transfer. extend notification interfa...
bool resume_file(TransferMetadata &f_meta, bool is_multiple=false)
bool cancel_file(TransferMetadata &f_meta, bool is_multiple=false)
bool update_metadata_to_bookkeeper(TransferMetadata &f_meta)
const GroupMetadata & get_transfer_options() const
bool start_file(TransferMetadata &f_meta)
Start the session by downloading or uploading files depending on the type of session TODO : separate ...
static std::string session_type_to_string(e_session_type e)
bool pause_file(TransferMetadata &f_meta, bool is_multiple=false)
#define TLOG(...)
Definition macro.hpp:22
e_session_type
Different type of session.
@ Downloader
TransferSession used to download files from uploaders client.
@ Uploader
TransferSession used to upload files to downloaders client.
FELIX Initialization std::string initerror FELIX queue timed std::string queuename Unexpected chunk size
void warning(const Issue &issue)
Definition ers.hpp:115
void error(const Issue &issue)
Definition ers.hpp:81
NotificationData class, represent a notification.
std::string m_data
Data of the notification, can be empty.
std::string m_notification
Notification type.
static std::optional< e_notification_type > string_to_notification(std::string s)
static std::optional< e_protocol_type > string_to_protocols(std::string s)