LCOV - code coverage report
Current view: top level - snbmodules/src/client - transfer_session.cpp (source / functions) Coverage Total Hit
Test: code.result Lines: 0.0 % 241 0
Test Date: 2025-12-21 13:07:08 Functions: 0.0 % 25 0

            Line data    Source code
       1              : 
       2              : /**
       3              :  * @file transfer_session.cpp TransferSession class, wrapper to get access to the transfer interface and control states
       4              :  * of transfers
       5              :  *
       6              :  * This is part of the DUNE DAQ , copyright 2020.
       7              :  * Licensing/copyright details are in the COPYING file that you should have
       8              :  * received with this code.
       9              :  */
      10              : 
      11              : #include "snbmodules/transfer_session.hpp"
      12              : 
      13              : #include <memory>
      14              : #include <set>
      15              : #include <string>
      16              : #include <utility>
      17              : #include <vector>
      18              : 
      19              : namespace dunedaq::snbmodules {
      20              : 
      21            0 : TransferSession::TransferSession(GroupMetadata transfer_options,
      22              :                                  e_session_type type,
      23              :                                  std::string id,
      24              :                                  const IPFormat& ip,
      25              :                                  std::filesystem::path work_dir,
      26              :                                  std::vector<std::string> bk_conn /*= std::vector<std::string>()*/,
      27            0 :                                  std::set<std::string> client_conn /*= std::set<std::string>()*/)
      28            0 :   : NotificationInterface(std::move(bk_conn), std::move(client_conn))
      29            0 :   , m_type(type)
      30            0 :   , m_session_id(std::move(id))
      31            0 :   , m_ip(ip)
      32            0 :   , m_transfer_options(std::move(transfer_options))
      33              :   ,
      34              :   //   m_threads(std::vector<pid_t>()),
      35            0 :   m_work_dir(std::move(work_dir))
      36              : {
      37            0 :   std::filesystem::create_directories(m_work_dir);
      38              : 
      39              :   // Init transfer interface with the right protocol
      40            0 :   switch (m_transfer_options.get_protocol()) {
      41            0 :     case protocol_type::BITTORRENT:
      42              : 
      43              :       // check if port is set
      44            0 :       if (!m_transfer_options.get_protocol_options().contains("port")) {
      45            0 :         ers::fatal(ConfigError(ERS_HERE, "port is mandatory in Bittorrent protocol options"));
      46            0 :         return;
      47              :       }
      48              : 
      49            0 :       m_transfer_interface = std::make_unique<TransferInterfaceBittorrent>(
      50            0 :         m_transfer_options, type == e_session_type::Downloader, get_work_dir(), get_ip());
      51              : 
      52              :       // Generate torrent files and magnet links
      53            0 :       if (type == e_session_type::Uploader) {
      54            0 :         TLOG() << "Generating torrent files...";
      55            0 :         dynamic_cast<TransferInterfaceBittorrent&>(*m_transfer_interface).generate_torrents_files(m_work_dir, "");
      56              : 
      57            0 :         for (auto f_meta : m_transfer_options.get_transfers_meta()) {
      58            0 :           TLOG() << "Writing magnet link data into transfer Metadata "
      59            0 :                  << get_work_dir().append(f_meta->get_file_name() + ".torrent");
      60            0 :           lt::error_code ec;
      61            0 :           lt::torrent_info t(get_work_dir().append(f_meta->get_file_name() + ".torrent").string(), ec);
      62              : 
      63            0 :           if (ec) {
      64            0 :             ers::error(BittorrentInvalidTorrentFileError(ERS_HERE, ec.message()));
      65              :           }
      66              : 
      67            0 :           TLOG() << "Magnet link: " << lt::make_magnet_uri(t);
      68            0 :           f_meta->set_magnet_link(lt::make_magnet_uri(t) + "&x.pe=" + get_ip().get_ip() + ":" +
      69            0 :                                   m_transfer_options.get_protocol_options()["port"].get<std::string>());
      70            0 :         }
      71              :       }
      72              :       break;
      73              : 
      74            0 :     case protocol_type::SCP:
      75            0 :       m_transfer_interface =
      76            0 :         std::make_unique<TransferInterfaceSCP>(m_transfer_options, type == e_session_type::Uploader);
      77            0 :       break;
      78              : 
      79            0 :     case protocol_type::RCLONE: {
      80            0 :       m_transfer_interface = std::make_unique<TransferInterfaceRClone>(m_transfer_options, get_work_dir());
      81            0 :       break;
      82              :     }
      83              : 
      84            0 :     case protocol_type::dummy:
      85            0 :       m_transfer_interface = std::make_unique<TransferInterfaceDummy>(m_transfer_options);
      86            0 :       break;
      87              : 
      88            0 :     default:
      89            0 :       ers::error(InvalidProtocolError(
      90            0 :         ERS_HERE, get_session_id(), protocol_type::protocols_to_string(m_transfer_options.get_protocol())));
      91            0 :       break;
      92              :   }
      93              : 
      94            0 :   TLOG() << "debug : Transfer session " << get_session_id() << " created";
      95            0 :   update_metadatas_to_bookkeeper();
      96            0 : }
      97              : 
      98            0 : TransferSession::~TransferSession()
      99              : {
     100              :   // TLOG() << "Reaping children";
     101              :   // for (pid_t pid : m_threads)
     102              :   // {
     103              :   //     TLOG() << "Killing pid " << pid;
     104              :   //     kill(pid, SIGINT);
     105              :   //     kill(pid, SIGKILL);
     106              :   // }
     107              :   // for (pid_t pid : m_threads)
     108              :   // {
     109              :   //     TLOG() << "Waiting for pid " << pid;
     110              :   //     siginfo_t status;
     111              :   //     auto sts = waitid(P_PID, pid, &status, WEXITED);
     112              : 
     113              :   //     TLOG() << "Forked process " << pid << " exited with status " << status.si_status << " (wait status " << sts
     114              :   //            << ")";
     115              :   // }
     116              : 
     117            0 :   TLOG() << "DONE CLOSING SESSION " << get_session_id();
     118            0 : }
     119              : 
     120              : bool
     121            0 : TransferSession::action_on_receive_notification(NotificationData notif)
     122              : {
     123            0 :   (void)notif;
     124              :   // TODO : now in client
     125            0 :   return true;
     126              : }
     127              : 
     128              : std::string
     129            0 : TransferSession::to_string() const
     130              : {
     131            0 :   std::string str;
     132            0 :   str += "Session " + get_session_id() + " ";
     133            0 :   str += "type " + TransferSession::session_type_to_string(m_type) + " ";
     134            0 :   str += "listening on " + m_ip.get_ip_port() + "\n";
     135              : 
     136            0 :   str += m_transfer_options.to_string();
     137              : 
     138            0 :   return str;
     139            0 : }
     140              : 
     141              : bool
     142            0 : TransferSession::update_metadatas_to_bookkeeper()
     143              : {
     144            0 :   bool result = true;
     145            0 :   for (const std::string& bk : get_bookkeepers_conn()) {
     146            0 :     result = result && send_notification(notification_type::e_notification_type::GROUP_METADATA,
     147            0 :                                          get_session_id(),
     148              :                                          bk,
     149              :                                          bk,
     150            0 :                                          get_transfer_options().export_to_string());
     151              :   }
     152              : 
     153            0 :   for (std::shared_ptr<TransferMetadata> f_meta : m_transfer_options.get_transfers_meta()) {
     154            0 :     result = result && update_metadata_to_bookkeeper(*f_meta);
     155            0 :   }
     156              : 
     157            0 :   return result;
     158              : }
     159              : 
     160              : bool
     161            0 : TransferSession::update_metadata_to_bookkeeper(TransferMetadata& f_meta)
     162              : {
     163            0 :   bool result = true;
     164            0 :   for (const std::string& bk : get_bookkeepers_conn()) {
     165            0 :     result = result && send_notification(notification_type::e_notification_type::TRANSFER_METADATA,
     166            0 :                                          get_session_id(),
     167              :                                          bk,
     168              :                                          bk,
     169            0 :                                          f_meta.export_to_string_partial(false));
     170              :   }
     171            0 :   return result;
     172              : }
     173              : 
     174              : bool
     175            0 : TransferSession::send_notification_to_targets(notification_type::e_notification_type type,
     176              :                                               const std::string& data /*= ""*/)
     177              : {
     178            0 :   bool result = true;
     179            0 :   for (const std::string& client : get_target_clients()) {
     180            0 :     std::string session_name = client + "_ses" + m_transfer_options.get_group_id();
     181            0 :     result &= send_notification(type, get_session_id(), session_name, client, data);
     182            0 :   }
     183            0 :   return result;
     184              : }
     185              : 
     186              : bool
     187            0 : TransferSession::start_file(TransferMetadata& f_meta)
     188              : {
     189            0 :   bool res = false;
     190            0 :   if (is_downloader()) {
     191            0 :     res = download_file(f_meta, m_work_dir);
     192            0 :   } else if (is_uploader()) {
     193            0 :     res = upload_file(f_meta);
     194              :   } else {
     195            0 :     ers::error(SessionTypeNotSupportedError(ERS_HERE, get_session_id()));
     196              :   }
     197              : 
     198            0 :   return res;
     199              : }
     200              : 
     201              : bool
     202            0 : TransferSession::pause_file(TransferMetadata& f_meta, bool is_multiple)
     203              : {
     204            0 :   if (f_meta.get_status() != status_type::e_status::DOWNLOADING &&
     205              :       f_meta.get_status() != status_type::e_status::UPLOADING) {
     206            0 :     ers::warning(SessionWrongStateTransitionError(ERS_HERE,
     207            0 :                                                   get_session_id(),
     208            0 :                                                   f_meta.get_file_name(),
     209            0 :                                                   status_type::status_to_string(f_meta.get_status()),
     210            0 :                                                   status_type::status_to_string(status_type::e_status::PAUSED)));
     211            0 :     return false;
     212              :   }
     213              : 
     214            0 :   f_meta.set_status(status_type::e_status::PAUSED);
     215              : 
     216            0 :   bool res = m_transfer_interface->pause_file(f_meta);
     217            0 :   if (!res) {
     218            0 :     f_meta.set_status(status_type::e_status::ERROR);
     219              :   }
     220              : 
     221            0 :   if (!is_multiple) {
     222            0 :     send_notification_to_targets(notification_type::e_notification_type::PAUSE_TRANSFER, f_meta.get_file_path());
     223            0 :     update_metadata_to_bookkeeper(f_meta);
     224              :   }
     225              : 
     226              :   return res;
     227              : }
     228              : 
     229              : bool
     230            0 : TransferSession::resume_file(TransferMetadata& f_meta, bool is_multiple)
     231              : {
     232            0 :   if (f_meta.get_status() != status_type::e_status::PAUSED) {
     233            0 :     ers::warning(SessionWrongStateTransitionError(ERS_HERE,
     234            0 :                                                   get_session_id(),
     235            0 :                                                   f_meta.get_file_name(),
     236            0 :                                                   status_type::status_to_string(f_meta.get_status()),
     237            0 :                                                   "RESUMING"));
     238            0 :     return false;
     239              :   }
     240              : 
     241            0 :   if (is_downloader()) {
     242            0 :     f_meta.set_status(status_type::e_status::DOWNLOADING);
     243            0 :   } else if (is_uploader()) {
     244            0 :     f_meta.set_status(status_type::e_status::UPLOADING);
     245              :   }
     246              : 
     247            0 :   bool res = m_transfer_interface->resume_file(f_meta);
     248            0 :   if (!res) {
     249            0 :     f_meta.set_status(status_type::e_status::ERROR);
     250              :   }
     251              : 
     252            0 :   if (!is_multiple) {
     253            0 :     send_notification_to_targets(notification_type::e_notification_type::RESUME_TRANSFER, f_meta.get_file_path());
     254            0 :     update_metadata_to_bookkeeper(f_meta);
     255              :   }
     256              : 
     257              :   return res;
     258              : }
     259              : 
     260              : bool
     261            0 : TransferSession::hash_file(TransferMetadata& f_meta, bool is_multiple)
     262              : {
     263            0 :   if (f_meta.get_status() != status_type::e_status::FINISHED) {
     264            0 :     ers::warning(SessionWrongStateTransitionError(ERS_HERE,
     265            0 :                                                   get_session_id(),
     266            0 :                                                   f_meta.get_file_name(),
     267            0 :                                                   status_type::status_to_string(f_meta.get_status()),
     268            0 :                                                   status_type::status_to_string(status_type::e_status::HASHING)));
     269            0 :     return false;
     270              :   }
     271              : 
     272            0 :   f_meta.set_status(status_type::e_status::HASHING);
     273              : 
     274            0 :   bool res = m_transfer_interface->hash_file(f_meta);
     275            0 :   if (!res) {
     276            0 :     f_meta.set_status(status_type::e_status::ERROR);
     277              :   }
     278              : 
     279            0 :   if (!is_multiple) {
     280              :     // send_notification_to_targets(notification_type::e_notification_type::HASH_TRANSFER, f_meta.get_file_path());
     281            0 :     update_metadata_to_bookkeeper(f_meta);
     282              :   }
     283              :   return res;
     284              : }
     285              : 
     286              : bool
     287            0 : TransferSession::cancel_file(TransferMetadata& f_meta, bool is_multiple)
     288              : {
     289            0 :   if (f_meta.get_status() == status_type::e_status::FINISHED ||
     290              :       f_meta.get_status() == status_type::e_status::CANCELLED) {
     291            0 :     ers::warning(SessionWrongStateTransitionError(ERS_HERE,
     292            0 :                                                   get_session_id(),
     293            0 :                                                   f_meta.get_file_name(),
     294            0 :                                                   status_type::status_to_string(f_meta.get_status()),
     295            0 :                                                   status_type::status_to_string(status_type::e_status::CANCELLED)));
     296            0 :     return false;
     297              :   }
     298              : 
     299            0 :   f_meta.set_status(status_type::e_status::CANCELLED);
     300              : 
     301            0 :   bool res = m_transfer_interface->cancel_file(f_meta);
     302            0 :   if (!res) {
     303            0 :     f_meta.set_status(status_type::e_status::ERROR);
     304              :   }
     305              : 
     306            0 :   if (!is_multiple) {
     307            0 :     send_notification_to_targets(notification_type::e_notification_type::CANCEL_TRANSFER, f_meta.get_file_path());
     308            0 :     update_metadata_to_bookkeeper(f_meta);
     309              :   }
     310              :   return res;
     311              : }
     312              : 
     313              : bool
     314            0 : TransferSession::upload_file(TransferMetadata& f_meta, bool is_multiple)
     315              : {
     316            0 :   if (m_type != e_session_type::Uploader) {
     317            0 :     ers::warning(SessionAccessToIncorrectActionError(ERS_HERE, get_session_id(), "upload_file"));
     318              :   }
     319              : 
     320            0 :   if (f_meta.get_status() != status_type::e_status::WAITING) {
     321            0 :     ers::warning(SessionWrongStateTransitionError(ERS_HERE,
     322            0 :                                                   get_session_id(),
     323            0 :                                                   f_meta.get_file_name(),
     324            0 :                                                   status_type::status_to_string(f_meta.get_status()),
     325            0 :                                                   status_type::status_to_string(status_type::e_status::UPLOADING)));
     326            0 :     return false;
     327              :   }
     328              : 
     329            0 :   f_meta.set_status(status_type::e_status::UPLOADING);
     330              : 
     331            0 :   bool res = m_transfer_interface->upload_file(f_meta);
     332            0 :   if (!res) {
     333            0 :     f_meta.set_status(status_type::e_status::ERROR);
     334              :   }
     335              : 
     336            0 :   if (!is_multiple) {
     337            0 :     send_notification_to_targets(notification_type::e_notification_type::START_TRANSFER, f_meta.get_file_path());
     338            0 :     update_metadata_to_bookkeeper(f_meta);
     339              :   }
     340              :   return res;
     341              : }
     342              : 
     343              : bool
     344            0 : TransferSession::download_file(TransferMetadata& f_meta, std::filesystem::path dest, bool is_multiple)
     345              : {
     346            0 :   if (m_type != e_session_type::Downloader) {
     347            0 :     ers::warning(SessionAccessToIncorrectActionError(ERS_HERE, get_session_id(), "download_file"));
     348              :   }
     349              : 
     350            0 :   if (f_meta.get_status() != status_type::e_status::WAITING) {
     351            0 :     ers::warning(SessionWrongStateTransitionError(ERS_HERE,
     352            0 :                                                   get_session_id(),
     353            0 :                                                   f_meta.get_file_name(),
     354            0 :                                                   status_type::status_to_string(f_meta.get_status()),
     355            0 :                                                   status_type::status_to_string(status_type::e_status::DOWNLOADING)));
     356            0 :     return false;
     357              :   }
     358              : 
     359              :   // wait for the uploader to be ready
     360            0 :   std::this_thread::sleep_for(std::chrono::seconds(1));
     361              : 
     362            0 :   f_meta.set_status(status_type::e_status::DOWNLOADING);
     363              : 
     364            0 :   bool res = m_transfer_interface->download_file(f_meta, std::move(dest));
     365            0 :   if (!res) {
     366            0 :     f_meta.set_status(status_type::e_status::ERROR);
     367              :   }
     368            0 :   if (!is_multiple) {
     369            0 :     update_metadata_to_bookkeeper(f_meta);
     370              :   }
     371              :   return res;
     372              : }
     373              : 
     374              : bool
     375            0 : TransferSession::start_all()
     376              : {
     377            0 :   if (is_downloader()) {
     378            0 :     return download_all(m_work_dir);
     379            0 :   } else if (is_uploader()) {
     380            0 :     return upload_all();
     381              :   } else {
     382            0 :     ers::error(SessionTypeNotSupportedError(ERS_HERE, get_session_id()));
     383            0 :     return false;
     384              :   }
     385              : }
     386              : 
     387              : bool
     388            0 : TransferSession::pause_all()
     389              : {
     390              : 
     391            0 :   send_notification_to_targets(notification_type::e_notification_type::PAUSE_TRANSFER);
     392              : 
     393              :   // wait 1 second
     394            0 :   std::this_thread::sleep_for(std::chrono::seconds(1));
     395              : 
     396            0 :   bool result = true;
     397            0 :   for (auto file : m_transfer_options.get_transfers_meta()) {
     398            0 :     result = result && pause_file(*file, true);
     399            0 :   }
     400              : 
     401            0 :   update_metadatas_to_bookkeeper();
     402            0 :   return result;
     403              : }
     404              : 
     405              : bool
     406            0 : TransferSession::resume_all()
     407              : {
     408            0 :   bool result = true;
     409            0 :   for (auto file : m_transfer_options.get_transfers_meta()) {
     410            0 :     result = result && resume_file(*file, true);
     411            0 :   }
     412              : 
     413              :   // wait 1 second
     414            0 :   std::this_thread::sleep_for(std::chrono::seconds(1));
     415              : 
     416            0 :   send_notification_to_targets(notification_type::e_notification_type::RESUME_TRANSFER);
     417            0 :   update_metadatas_to_bookkeeper();
     418            0 :   return result;
     419              : }
     420              : 
     421              : bool
     422            0 : TransferSession::cancel_all()
     423              : {
     424            0 :   bool result = true;
     425            0 :   for (auto file : m_transfer_options.get_transfers_meta()) {
     426            0 :     result = result && cancel_file(*file, true);
     427            0 :   }
     428              : 
     429            0 :   send_notification_to_targets(notification_type::e_notification_type::CANCEL_TRANSFER);
     430            0 :   update_metadatas_to_bookkeeper();
     431            0 :   return result;
     432              : }
     433              : 
     434              : // Downloaders only
     435              : bool
     436            0 : TransferSession::download_all(const std::filesystem::path& dest)
     437              : {
     438            0 :   if (m_type != e_session_type::Downloader) {
     439            0 :     ers::warning(SessionAccessToIncorrectActionError(ERS_HERE, get_session_id(), "download_all"));
     440              :   }
     441              : 
     442            0 :   bool result = true;
     443            0 :   for (auto file : m_transfer_options.get_transfers_meta()) {
     444            0 :     result = result && download_file(*file, dest, true);
     445            0 :   }
     446            0 :   update_metadatas_to_bookkeeper();
     447            0 :   return result;
     448              : }
     449              : 
     450              : // Uploaders only
     451              : bool
     452            0 : TransferSession::upload_all()
     453              : {
     454            0 :   if (m_type != e_session_type::Uploader) {
     455            0 :     ers::warning(SessionAccessToIncorrectActionError(ERS_HERE, get_session_id(), "upload_all"));
     456              :   }
     457              : 
     458            0 :   bool result = true;
     459            0 :   for (auto file : m_transfer_options.get_transfers_meta()) {
     460            0 :     result = result && upload_file(*file, true);
     461            0 :   }
     462            0 :   send_notification_to_targets(notification_type::e_notification_type::START_TRANSFER);
     463            0 :   update_metadatas_to_bookkeeper();
     464            0 :   return result;
     465              : }
     466              : 
     467              : } // namespace dunedaq::snbmodules
        

Generated by: LCOV version 2.0-1