Line data Source code
1 : /**
2 : * @file transfer_client.hpp TransferClient class, everything a client can do containing one session for each transfer
3 : *
4 : * This is part of the DUNE DAQ , copyright 2020.
5 : * Licensing/copyright details are in the COPYING file that you should have
6 : * received with this code.
7 : */
8 :
9 : #include "snbmodules/transfer_client.hpp"
10 :
11 : #include <memory>
12 : #include <set>
13 : #include <string>
14 : #include <utility>
15 : #include <vector>
16 :
17 : namespace dunedaq::snbmodules {
18 0 : TransferClient::TransferClient(const IPFormat& listening_ip,
19 : const std::string& client_id,
20 : const std::filesystem::path& listening_dir,
21 : const std::string& connection_prefix /*= "snbmodules"*/,
22 : int timeout_send /*= 10*/,
23 0 : int timeout_receive /*= 100*/)
24 : : NotificationInterface(connection_prefix, timeout_send, timeout_receive)
25 0 : , m_listening_ip(listening_ip)
26 0 : , m_client_id(client_id)
27 : {
28 0 : TLOG() << "DAB " << __LINE__ << " " << listening_dir;
29 : // remove all occurences of ./ in the file path
30 0 : std::string file_path_str = listening_dir.string();
31 0 : std::string x = "./";
32 :
33 0 : size_t pos = 0;
34 0 : while (true) {
35 0 : TLOG() << "DAB " << __LINE__ << " " << file_path_str;
36 0 : pos = file_path_str.find(x, pos);
37 0 : if (pos == std::string::npos) {
38 : break;
39 : }
40 :
41 0 : file_path_str.replace(pos, x.length(), "");
42 : }
43 0 : TLOG() << "DAB " << __LINE__ << " " << file_path_str;
44 0 : if (file_path_str.length() == 0) {
45 0 : file_path_str = "blah";
46 : }
47 0 : TLOG() << "DAB " << __LINE__ << " " << file_path_str;
48 0 : m_listening_dir = std::filesystem::absolute(file_path_str);
49 0 : TLOG() << "DAB " << __LINE__ << " " << file_path_str;
50 0 : std::filesystem::create_directories(m_listening_dir);
51 0 : }
52 :
53 0 : TransferClient::~TransferClient() {}
54 :
55 : bool
56 0 : TransferClient::start(int timeout)
57 : {
58 :
59 0 : while (true) {
60 : // When starting, client wait for notification from bookkeeper.
61 0 : std::optional<NotificationData> msg = listen_for_notification(get_my_conn(), "", timeout);
62 0 : if (msg.has_value()) {
63 0 : action_on_receive_notification(msg.value());
64 : } else {
65 0 : TLOG() << "debug : no notification received, timeout";
66 0 : return false;
67 : }
68 :
69 : // print status of sessions
70 0 : for (const auto& session : m_sessions) {
71 0 : TLOG() << session.to_string();
72 : }
73 0 : }
74 :
75 : return true;
76 : }
77 :
78 : bool
79 0 : TransferClient::do_work(std::atomic<bool>& running_flag)
80 : {
81 :
82 0 : while (running_flag.load()) {
83 0 : if (get_bookkeepers_conn().size() == 0) {
84 0 : TLOG() << "debug : no bookkeeper connected, looking for connections";
85 0 : lookups_connections();
86 : }
87 : // When starting, client wait for notification from bookkeeper.
88 0 : std::optional<NotificationData> msg = listen_for_notification(get_my_conn());
89 0 : if (msg.has_value()) {
90 0 : action_on_receive_notification(msg.value());
91 :
92 : // print status of sessions
93 0 : for (const auto& session : m_sessions) {
94 0 : TLOG() << session.to_string();
95 : }
96 : }
97 0 : }
98 :
99 0 : return true;
100 : }
101 :
102 : void
103 0 : TransferClient::create_new_transfer(const std::string& transfer_id,
104 : const std::string& protocol,
105 : const std::set<std::string>& dest_clients,
106 : const std::set<std::filesystem::path>& files,
107 : const nlohmann::json& protocol_options /*= nlohmann::json()*/)
108 : {
109 :
110 0 : std::string session_name = generate_session_id(transfer_id);
111 :
112 : // Checking if transfer already exists
113 0 : auto ses = get_session(session_name);
114 0 : if (ses != nullptr) {
115 0 : TLOG() << "debug : transfer " << transfer_id << " already exists !";
116 0 : return;
117 : }
118 :
119 0 : TLOG() << "debug : creating new transfer with protocol " << protocol;
120 :
121 0 : std::optional<protocol_type::e_protocol_type> _protocol = protocol_type::string_to_protocols(protocol);
122 0 : if (!_protocol.has_value()) {
123 0 : ers::error(InvalidProtocolError(ERS_HERE, get_client_id(), protocol));
124 0 : return;
125 : }
126 : // Initialize transfer
127 :
128 0 : GroupMetadata group_transfer(transfer_id, session_name, m_listening_ip, _protocol.value(), protocol_options);
129 :
130 0 : for (const auto& file : files) {
131 : // Check if file exists
132 0 : if (!std::filesystem::exists(file)) {
133 0 : ers::warning(FileForTransferNotExistError(ERS_HERE, get_client_id(), file));
134 0 : continue;
135 : } else {
136 0 : group_transfer.add_expected_file(file);
137 0 : group_transfer.add_file(std::move(create_metadata_from_file(file)));
138 : }
139 : }
140 :
141 0 : if (group_transfer.get_transfers_meta().empty()) {
142 0 : ers::error(FileForTransferNotExistError(ERS_HERE, get_client_id(), "All files"));
143 0 : return;
144 : }
145 :
146 : // Sending to bookkeepers to update preparing state
147 0 : for (auto& bk : get_bookkeepers_conn()) {
148 0 : send_notification(
149 0 : notification_type::e_notification_type::GROUP_METADATA, session_name, bk, bk, group_transfer.export_to_string());
150 : }
151 :
152 : // Create local session, can take time depending on protocol
153 0 : auto& s = create_session(std::move(group_transfer),
154 : e_session_type::Uploader,
155 : session_name,
156 0 : get_listening_dir().append(transfer_id),
157 : m_listening_ip,
158 : dest_clients);
159 :
160 : // Notify clients and sending group metadata
161 0 : for (const auto& client : dest_clients) {
162 0 : std::string new_session_name = generate_session_id(transfer_id, client);
163 0 : TLOG() << "debug : notifying client " << client;
164 0 : send_notification(notification_type::e_notification_type::NEW_TRANSFER,
165 0 : get_client_id(),
166 : new_session_name,
167 : client,
168 0 : s.get_transfer_options().export_to_string());
169 :
170 : // Send transfer metadata
171 0 : for (auto file : s.get_transfer_options().get_transfers_meta()) {
172 0 : send_notification(notification_type::e_notification_type::TRANSFER_METADATA,
173 0 : get_client_id(),
174 : new_session_name,
175 : client,
176 0 : file->export_to_string());
177 0 : }
178 0 : }
179 0 : }
180 :
181 : void
182 0 : TransferClient::start_transfer(const std::string& transfer_id)
183 : {
184 :
185 0 : TransferSession* session = get_session(transfer_id);
186 :
187 0 : if (session == nullptr) {
188 0 : ers::warning(SessionIDNotFoundInClientError(ERS_HERE, get_client_id(), transfer_id));
189 0 : return;
190 : }
191 :
192 0 : session->start_all();
193 : }
194 :
195 : void
196 0 : TransferClient::pause_transfer(const std::string& transfer_id)
197 : {
198 :
199 0 : TransferSession* session = get_session(transfer_id);
200 :
201 0 : if (session == nullptr) {
202 0 : ers::warning(SessionIDNotFoundInClientError(ERS_HERE, get_client_id(), transfer_id));
203 0 : return;
204 : }
205 :
206 0 : session->pause_all();
207 : }
208 : void
209 0 : TransferClient::resume_transfer(const std::string& transfer_id)
210 : {
211 :
212 0 : TransferSession* session = get_session(transfer_id);
213 :
214 0 : if (session == nullptr) {
215 0 : ers::warning(SessionIDNotFoundInClientError(ERS_HERE, get_client_id(), transfer_id));
216 0 : return;
217 : }
218 :
219 0 : session->resume_all();
220 : }
221 :
222 : void
223 0 : TransferClient::cancel_transfer(const std::string& transfer_id)
224 : {
225 :
226 0 : TransferSession* session = get_session(transfer_id);
227 :
228 0 : if (session == nullptr) {
229 0 : ers::warning(SessionIDNotFoundInClientError(ERS_HERE, get_client_id(), transfer_id));
230 0 : return;
231 : }
232 :
233 0 : session->cancel_all();
234 : }
235 :
236 : TransferSession*
237 0 : TransferClient::get_session(std::string transfer_id)
238 : {
239 0 : if (transfer_id.find("ses") == std::string::npos) {
240 0 : transfer_id = generate_session_id(transfer_id);
241 : }
242 :
243 0 : for (auto& s : m_sessions) {
244 0 : if (s.get_session_id() == transfer_id) {
245 0 : return &s;
246 : }
247 : }
248 0 : return nullptr;
249 : }
250 :
251 : // TODO ip useless ?
252 : TransferSession&
253 0 : TransferClient::create_session(GroupMetadata transfer_options,
254 : e_session_type type,
255 : std::string id,
256 : const std::filesystem::path& work_dir,
257 : IPFormat ip /*= IPFormat()*/,
258 : const std::set<std::string>& dest_clients /*= std::set<std::string>()*/)
259 : {
260 0 : if (ip.is_default()) {
261 0 : ip = get_ip();
262 : }
263 :
264 0 : TransferSession new_session(
265 0 : std::move(transfer_options), type, id, ip, work_dir, get_bookkeepers_conn(), get_clients_conn());
266 :
267 0 : m_sessions.emplace_back(std::move(new_session));
268 0 : TLOG() << "debug : session created " << TransferSession::session_type_to_string(type);
269 0 : m_sessions.back().set_target_clients(dest_clients);
270 :
271 0 : return m_sessions.back();
272 0 : }
273 :
274 : void
275 0 : TransferClient::share_available_files(const std::set<std::filesystem::path>& to_share, const std::string& dest)
276 : {
277 :
278 0 : for (const std::filesystem::path& f : to_share) {
279 0 : TLOG() << "debug : Sharing " << f.filename();
280 0 : std::shared_ptr<TransferMetadata> fmeta = create_metadata_from_file(f);
281 0 : m_available_files.insert({ f.string(), fmeta });
282 0 : send_notification(notification_type::e_notification_type::TRANSFER_METADATA,
283 0 : get_client_id(),
284 : dest,
285 : dest,
286 0 : fmeta->export_to_string());
287 0 : }
288 0 : }
289 :
290 : bool
291 0 : TransferClient::action_on_receive_notification(NotificationData notif)
292 : {
293 0 : TLOG() << "debug : Action on request " << notif.m_notification << " for " << notif.m_target_id;
294 :
295 0 : if (notif.m_target_id.find(get_client_id()) == std::string::npos && notif.m_target_id != "all") {
296 0 : ers::warning(NotificationWrongDestinationError(ERS_HERE, get_client_id(), notif.m_source_id, notif.m_target_id));
297 0 : return false;
298 : }
299 :
300 : // Use enum cast for converting string to enum, still working with older clients and user readable
301 0 : auto action = notification_type::string_to_notification(notif.m_notification);
302 :
303 0 : if (action.has_value() == false) {
304 0 : ers::warning(InvalidNotificationReceivedError(ERS_HERE, get_client_id(), notif.m_source_id, notif.m_notification));
305 : }
306 :
307 0 : switch (action.value()) {
308 :
309 0 : case notification_type::e_notification_type::CONNECTION_REQUEST: {
310 0 : TLOG() << "debug : receive connection request, sending available files";
311 0 : std::set<std::filesystem::path> to_share;
312 0 : scan_available_files(to_share, true);
313 0 : share_available_files(to_share, notif.m_source_id);
314 0 : send_notification(notification_type::e_notification_type::TRANSFER_METADATA,
315 0 : get_client_id(),
316 : notif.m_source_id,
317 : notif.m_source_id,
318 : "end");
319 0 : break;
320 0 : }
321 :
322 0 : case notification_type::e_notification_type::NEW_TRANSFER: {
323 0 : GroupMetadata metadata(notif.m_data, false);
324 0 : e_session_type type = Downloader;
325 :
326 : // If file is available, create a downloader session
327 0 : for (const auto& f : metadata.get_expected_files()) {
328 0 : if (m_available_files.find(f) != m_available_files.end()) {
329 : type = Uploader;
330 : break;
331 : }
332 : }
333 :
334 0 : TLOG() << "debug : creating session " << notif.m_target_id << " type "
335 0 : << TransferSession::session_type_to_string(type);
336 0 : std::string group_id_tmp = metadata.get_group_id();
337 0 : create_session(metadata, type, notif.m_target_id, get_listening_dir().append(group_id_tmp));
338 0 : break;
339 0 : }
340 :
341 0 : case notification_type::e_notification_type::TRANSFER_METADATA: {
342 0 : auto fmeta = std::make_shared<TransferMetadata>(notif.m_data, false);
343 0 : auto& sessions_ref = get_sessions();
344 :
345 : // print sessions names
346 0 : for (auto& s : sessions_ref) {
347 0 : TLOG() << "debug : session " << s.get_session_id();
348 0 : if (s.get_session_id().find(notif.m_target_id) != std::string::npos) {
349 0 : TLOG() << "debug : session found";
350 0 : fmeta->set_dest(s.get_ip());
351 0 : s.add_file(fmeta);
352 0 : return true;
353 : }
354 : }
355 :
356 0 : ers::warning(SessionIDNotFoundInClientError(ERS_HERE, get_client_id(), notif.m_target_id));
357 :
358 0 : break;
359 0 : }
360 :
361 0 : case notification_type::e_notification_type::START_TRANSFER: {
362 0 : TLOG() << "debug : starting transfer " << notif.m_target_id;
363 0 : TransferSession* ses = get_session(notif.m_target_id);
364 0 : if (ses != nullptr) {
365 :
366 0 : if (notif.m_data == "") {
367 0 : ses->start_all();
368 : } else {
369 0 : ses->start_file(ses->get_transfer_options().get_transfer_meta_from_file_path(notif.m_data));
370 : }
371 : } else {
372 0 : ers::warning(SessionIDNotFoundInClientError(ERS_HERE, get_client_id(), notif.m_target_id));
373 : }
374 :
375 : break;
376 : }
377 :
378 0 : case notification_type::e_notification_type::PAUSE_TRANSFER: {
379 0 : TLOG() << "debug : pausing transfer " << notif.m_target_id;
380 0 : TransferSession* ses = get_session(notif.m_target_id);
381 0 : if (ses != nullptr) {
382 :
383 0 : if (notif.m_data == "") {
384 0 : ses->pause_all();
385 : } else {
386 0 : ses->pause_file(ses->get_transfer_options().get_transfer_meta_from_file_path(notif.m_data));
387 : }
388 : } else {
389 0 : ers::warning(SessionIDNotFoundInClientError(ERS_HERE, get_client_id(), notif.m_target_id));
390 : }
391 :
392 : break;
393 : }
394 :
395 0 : case notification_type::e_notification_type::RESUME_TRANSFER: {
396 0 : TLOG() << "debug : resuming transfer " << notif.m_target_id;
397 0 : TransferSession* ses = get_session(notif.m_target_id);
398 0 : if (ses != nullptr) {
399 :
400 0 : if (notif.m_data == "") {
401 0 : ses->resume_all();
402 : } else {
403 0 : ses->resume_file(ses->get_transfer_options().get_transfer_meta_from_file_path(notif.m_data));
404 : }
405 : } else {
406 0 : ers::warning(SessionIDNotFoundInClientError(ERS_HERE, get_client_id(), notif.m_target_id));
407 : }
408 :
409 : break;
410 : }
411 :
412 0 : case notification_type::e_notification_type::CANCEL_TRANSFER: {
413 0 : TLOG() << "debug : cancelling transfer " << notif.m_target_id;
414 0 : TransferSession* ses = get_session(notif.m_target_id);
415 0 : if (ses != nullptr) {
416 :
417 0 : if (notif.m_data == "") {
418 0 : ses->cancel_all();
419 : } else {
420 0 : ses->cancel_file(ses->get_transfer_options().get_transfer_meta_from_file_path(notif.m_data));
421 : }
422 : } else {
423 0 : ers::warning(SessionIDNotFoundInClientError(ERS_HERE, get_client_id(), notif.m_target_id));
424 : }
425 :
426 : break;
427 : }
428 :
429 0 : case notification_type::e_notification_type::UPDATE_REQUEST: {
430 0 : TLOG() << "debug : updating grp transfer for " << notif.m_target_id;
431 0 : TransferSession* ses = get_session(notif.m_target_id);
432 0 : if (ses != nullptr) {
433 :
434 0 : if (notif.m_data == "") {
435 0 : ses->update_metadatas_to_bookkeeper();
436 : } else {
437 0 : ses->update_metadata_to_bookkeeper(
438 0 : ses->get_transfer_options().get_transfer_meta_from_file_path(notif.m_data));
439 : }
440 : } else {
441 0 : ers::warning(SessionIDNotFoundInClientError(ERS_HERE, get_client_id(), notif.m_target_id));
442 : }
443 :
444 : break;
445 : }
446 :
447 0 : default:
448 0 : ers::warning(NotHandledNotificationError(ERS_HERE, get_client_id(), notif.m_source_id, notif.m_notification));
449 : }
450 : return true;
451 : }
452 :
453 : // TransferSession &TransferClient::add_session(TransferSession session)
454 : // {
455 : // // std::string ses_id = session.get_session_id();
456 : // // m_sessions[ses_id] = std::move(session);
457 : // // return m_sessions.at(ses_id);
458 : // }
459 :
460 : void
461 0 : TransferClient::remove_session(const std::string& session_id)
462 : {
463 0 : auto s = get_session(session_id);
464 0 : if (s == nullptr) {
465 0 : ers::warning(SessionIDNotFoundInClientError(ERS_HERE, get_client_id(), session_id));
466 0 : return;
467 : }
468 0 : for (size_t i = 0; i < m_sessions.size(); i++) {
469 0 : if (s->get_session_id() == session_id) {
470 0 : m_sessions.erase(m_sessions.begin() + i);
471 0 : return;
472 : }
473 : }
474 : }
475 :
476 : std::string
477 0 : TransferClient::generate_session_id(const std::string& transferid, const std::string& dest_id /*= ""*/)
478 : {
479 0 : std::string id = "";
480 :
481 0 : id += dest_id == "" ? get_client_id() : dest_id;
482 0 : id += "_ses";
483 0 : id += transferid;
484 :
485 0 : return id;
486 0 : }
487 :
488 : void
489 0 : TransferClient::scan_available_files(std::set<std::filesystem::path>& previous_scan,
490 : bool nested,
491 : std::filesystem::path folder) // NOLINT
492 : {
493 0 : if (folder.empty()) {
494 0 : folder = get_listening_dir();
495 : }
496 :
497 0 : TLOG() << "debug : scanning files in " << folder;
498 :
499 : // First scan group metadata files
500 0 : for (const auto& entry : std::filesystem::directory_iterator(folder)) {
501 0 : if (entry.is_regular_file() && entry.path().extension() == GroupMetadata::m_file_extension) {
502 0 : if (previous_scan.insert(entry.path()).second) {
503 0 : TLOG() << "debug : found new file " << entry.path();
504 :
505 0 : GroupMetadata metadata = GroupMetadata(entry.path());
506 0 : bool already_active = false;
507 :
508 0 : for (const auto& s : get_sessions()) {
509 0 : if (s.get_transfer_options() == metadata) {
510 : already_active = true;
511 : break;
512 : }
513 : }
514 :
515 0 : if (!already_active) {
516 0 : std::string group_id_tmp = metadata.get_group_id();
517 0 : create_session(metadata,
518 : Uploader,
519 0 : generate_session_id(group_id_tmp),
520 0 : get_listening_dir().append("ses" + std::to_string(m_sessions.size())));
521 0 : }
522 0 : }
523 : }
524 0 : }
525 :
526 0 : for (const auto& entry : std::filesystem::directory_iterator(folder)) {
527 0 : if (entry.is_regular_file() && entry.path().extension() != GroupMetadata::m_file_extension) {
528 0 : if (previous_scan.insert(entry.path()).second) {
529 0 : TLOG() << "debug : found new file " << entry.path();
530 0 : if (entry.path().extension() == TransferMetadata::m_file_extension) {
531 0 : std::shared_ptr<TransferMetadata> metadata = std::make_shared<TransferMetadata>(entry.path());
532 :
533 0 : bool wanted = false;
534 0 : for (auto& s : get_sessions()) {
535 0 : auto expect_ref = s.get_transfer_options().get_expected_files();
536 0 : if (expect_ref.find(metadata->get_file_name()) != expect_ref.end()) {
537 0 : s.add_file(metadata);
538 0 : wanted = true;
539 0 : break;
540 : }
541 0 : }
542 0 : if (!wanted) {
543 : // If not wanted yet (or never), delete the file from the scan list to find it again later
544 0 : TLOG() << "debug : " << entry.path() << " not wanted yet, deleting from scan list";
545 0 : previous_scan.erase(entry.path());
546 0 : metadata.reset();
547 : }
548 0 : }
549 : }
550 0 : } else if (entry.is_directory()) {
551 0 : TLOG() << "debug : found directory " << entry.path();
552 0 : if (nested) {
553 0 : scan_available_files(previous_scan, nested, entry.path()); // NOLINT
554 : }
555 : }
556 0 : }
557 0 : }
558 :
559 : std::shared_ptr<TransferMetadata>
560 0 : TransferClient::create_metadata_from_file(const std::filesystem::path& src)
561 : {
562 0 : return std::make_shared<TransferMetadata>(src, std::filesystem::file_size(src), get_ip());
563 : }
564 :
565 : std::string
566 0 : TransferClient::get_my_conn()
567 : {
568 0 : TLOG() << "BBB " << __LINE__ << " my_conn=\"" << m_my_conn << "\"";
569 0 : if (m_my_conn.empty()) {
570 0 : for (const std::string& c : get_clients_conn()) {
571 0 : std::string str1 = c;
572 0 : std::string str2 = get_client_id();
573 :
574 0 : if (str1.starts_with("snbmodules_")) {
575 0 : str1 = str1.substr(11);
576 : }
577 0 : if (str2.starts_with("snbmodules_")) {
578 0 : str2 = str2.substr(11);
579 : }
580 :
581 0 : if (str1.starts_with("snb-sample-config-")) {
582 0 : str1 = str1.substr(18);
583 : }
584 0 : if (str2.starts_with("snb-sample-config-")) {
585 0 : str2 = str2.substr(18);
586 : }
587 :
588 0 : TLOG() << "BBB " << __LINE__ << " c=\"" << c << "\" client_id=" << get_client_id() << " " << str1 << " " << str2;
589 : // if (c.find(get_client_id()) != std::string::npos)
590 0 : if (str1 == str2) {
591 0 : m_my_conn = c;
592 0 : break;
593 : }
594 0 : }
595 :
596 0 : if (m_my_conn.empty()) {
597 0 : ers::warning(ConnectionNotFoundError(ERS_HERE, get_client_id()));
598 : }
599 : }
600 0 : return m_my_conn;
601 : }
602 : } // namespace dunedaq::snbmodules
|