LCOV - code coverage report
Current view: top level - snbmodules/src/bookkeeper - bookkeeper.cpp (source / functions) Coverage Total Hit
Test: code.result Lines: 0.0 % 279 0
Test Date: 2026-02-16 10:18:04 Functions: 0.0 % 38 0

            Line data    Source code
       1              : /**
       2              :  * @file bookkeeper.cpp Bookkeeper class retriving informations from clients
       3              :  *
       4              :  * This is part of the DUNE DAQ , copyright 2020.
       5              :  * Licensing/copyright details are in the COPYING file that you should have
       6              :  * received with this code.
       7              :  */
       8              : 
       9              : #include "snbmodules/bookkeeper.hpp"
      10              : 
      11              : #include <set>
      12              : #include <string>
      13              : #include <utility>
      14              : #include <vector>
      15              : 
      16              : namespace dunedaq::snbmodules {
      17              : // TODO Aug-14-2022 Leo Joly leo.vincent.andre.joly@cern.ch : Obsolete, is this needed anymore ?
      18              : void
      19            0 : Bookkeeper::create_new_transfer(const std::string& protocol,
      20              :                                 const std::string& src,
      21              :                                 const std::set<std::string>& dests,
      22              :                                 const std::set<std::filesystem::path>& files,
      23              :                                 const nlohmann::json& protocol_options)
      24              : {
      25              :   // suppress warnings
      26            0 :   (void)protocol;
      27            0 :   (void)src;
      28            0 :   (void)dests;
      29            0 :   (void)files;
      30            0 :   (void)protocol_options;
      31              : 
      32              :   // TLOG() << "debug : creating new transfer with protocol " << protocol;
      33              : 
      34              :   // // Check protocol
      35              :   // std::optional<protocol_type::e_protocol_type> _protocol = protocol_type::string_to_protocols(protocol);
      36              :   // if (!_protocol.has_value())
      37              :   // {
      38              :   //     TLOG() << "debug : invalid protocol !";
      39              :   //     return;
      40              :   // }
      41              : 
      42              :   // // Check files and found metadata
      43              :   // std::set<TransferMetadata *> files_meta;
      44              :   // auto list = get_m_transfers().at(src);
      45              :   // int nb_files = 0;
      46              : 
      47              :   // for (std::filesystem::path file : files)
      48              :   // {
      49              :   //     bool found = false;
      50              :   //     for (auto filemeta : list)
      51              :   //     {
      52              :   //         if (filemeta.get_file_name() == file)
      53              :   //         {
      54              :   //             files_meta.emplace(&filemeta);
      55              :   //             found = true;
      56              :   //             nb_files++;
      57              :   //             break;
      58              :   //         }
      59              :   //     }
      60              :   //     if (!found)
      61              :   //     {
      62              :   //         TLOG() << "debug : file " << file << " not found ! (ignoring)";
      63              :   //     }
      64              :   // }
      65              : 
      66              :   // if (nb_files == 0)
      67              :   // {
      68              :   //     TLOG() << "debug : no file found ! exiting transfer creation.";
      69              :   //     return;
      70              :   // }
      71              : 
      72              :   // // Create transfer
      73              :   // GroupMetadata transfer("transfer" + std::to_string(transfers.size()), _protocol.value(), files_meta);
      74              :   // transfers_link[transfer.get_group_id()] = src;
      75              :   // transfers.emplace(transfer);
      76              : 
      77              :   // // Notify clients
      78              :   // for (auto client : src)
      79              :   // {
      80              :   //     std::string session_name = client + "_ses" + transfer.get_group_id();
      81              : 
      82              :   //     TLOG() << "debug : notifying src client " << client;
      83              :   //     send_notification(notification_type::e_notification_type::NEW_TRANSFER, get_bookkeeper_id(), session_name,
      84              :   //     client, 1000, transfer.export_to_string());
      85              :   // }
      86            0 : }
      87              : 
      88              : // Only for stand alone application
      89              : void
      90            0 : Bookkeeper::input_action(char input)
      91              : {
      92            0 :   switch (input) {
      93            0 :     case 'q': {
      94            0 :       TLOG() << "Exiting...";
      95            0 :       exit(0);
      96            0 :       break;
      97              :     }
      98            0 :     case 'd': {
      99            0 :       display_information();
     100            0 :       break;
     101              :     }
     102            0 :     case 'n': {
     103            0 :       TLOG() << "Creating new transfer ...";
     104            0 :       TLOG() << "Choose protocol in the list";
     105            0 :       for (int enum_i = protocol_type::e_protocol_type::BITTORRENT; enum_i != protocol_type::e_protocol_type::dummy;
     106              :            enum_i++) {
     107            0 :         TLOG() << enum_i << " - "
     108            0 :                << protocol_type::protocols_to_string(static_cast<protocol_type::e_protocol_type>(enum_i));
     109              :       }
     110            0 :       int protocol = -1;
     111            0 :       std::cin >> protocol;
     112              :       // Check input
     113            0 :       if (protocol < 0 || protocol > protocol_type::e_protocol_type::dummy) {
     114            0 :         TLOG() << "Invalid protocol";
     115            0 :         break;
     116              :       }
     117              : 
     118            0 :       TLOG() << "Choose clients (q when finished)";
     119            0 :       std::set<std::string> choosen_clients;
     120            0 :       while (true) {
     121            0 :         std::string client;
     122            0 :         std::cin >> client;
     123            0 :         if (client == "q") {
     124              :           break;
     125              :         }
     126              :         // Check input
     127            0 :         if (std::find(get_clients_conn().begin(), get_clients_conn().end(), client) == get_clients_conn().end()) {
     128            0 :           TLOG() << "Invalid client";
     129            0 :           break;
     130              :         }
     131              : 
     132            0 :         choosen_clients.emplace(client);
     133            0 :       }
     134            0 :       if (choosen_clients.empty()) {
     135            0 :         TLOG() << "No client selected";
     136            0 :         break;
     137              :       }
     138            0 :       if (choosen_clients.size() < 2) {
     139            0 :         TLOG() << "At least 2 clients are required";
     140            0 :         break;
     141              :       }
     142              : 
     143            0 :       TLOG() << "Choose file to transmit (q when finished)";
     144            0 :       std::set<std::shared_ptr<TransferMetadata>> choosen_files;
     145            0 :       while (true) {
     146            0 :         uint64_t initial_size = choosen_files.size();
     147            0 :         std::string file;
     148            0 :         std::cin >> file;
     149            0 :         if (file == "q") {
     150              :           break;
     151              :         }
     152              : 
     153            0 :         for (const std::string& client : choosen_clients) {
     154              :           // Check input
     155            0 :           auto& list = get_transfers().at(client);
     156            0 :           bool found = false;
     157            0 :           for (std::shared_ptr<TransferMetadata> filemeta : list) {
     158            0 :             if (filemeta->get_file_name() == file) {
     159            0 :               choosen_files.emplace(filemeta);
     160            0 :               found = true;
     161            0 :               break;
     162              :             }
     163            0 :           }
     164            0 :           if (found == true) {
     165              :             break;
     166              :           }
     167              :         }
     168            0 :         if (initial_size == choosen_files.size()) {
     169            0 :           TLOG() << "Invalid file";
     170            0 :           break;
     171              :         }
     172            0 :       }
     173            0 :       if (choosen_files.empty()) {
     174            0 :         TLOG() << "No file selected";
     175            0 :         break;
     176              :       }
     177              : 
     178              :       // create_new_transfer(protocol_type::protocols_to_string(protocol_type::string_to_protocols(protocol).value()),
     179              :       // choosen_clients, choosen_files);
     180              :       break;
     181            0 :     }
     182              : 
     183            0 :     case 's': {
     184            0 :       TLOG() << "Choose transfers to start ... ";
     185            0 :       std::set<std::string> choosen_transfers;
     186            0 :       while (true) {
     187            0 :         std::string input;
     188            0 :         std::cin >> input;
     189            0 :         if (input == "q") {
     190              :           break;
     191              :         }
     192              :         // Check input
     193            0 :         if (m_clients_per_grp_transfer.find(input) == m_clients_per_grp_transfer.end()) {
     194            0 :           TLOG() << "Invalid transfer";
     195            0 :           break;
     196              :         }
     197            0 :         choosen_transfers.emplace(input);
     198            0 :       }
     199            0 :       if (choosen_transfers.size() == 0) {
     200            0 :         TLOG() << "No transfer selected";
     201            0 :         break;
     202              :       }
     203              : 
     204            0 :       for (const auto& transfer : choosen_transfers) {
     205            0 : #pragma GCC diagnostic push
     206            0 : #pragma GCC diagnostic ignored "-Wdeprecated-declarations"
     207            0 :         start_transfers(transfer);
     208            0 : #pragma GCC diagnostic pop
     209              :       }
     210            0 :       break;
     211            0 :     }
     212              : 
     213            0 :     default:
     214            0 :       TLOG() << "Unknown command";
     215            0 :       break;
     216              :   }
     217            0 : }
     218              : 
     219              : void
     220            0 : Bookkeeper::start_transfers(const std::string& transfer_id)
     221              : {
     222            0 :   TLOG() << "Starting transfer " << transfer_id;
     223              : 
     224            0 :   if (m_clients_per_grp_transfer.find(transfer_id) != m_clients_per_grp_transfer.end()) {
     225            0 :     for (const std::string& client : m_clients_per_grp_transfer[transfer_id]) {
     226            0 :       std::string session_name = client;
     227            0 :       session_name += "_ses";
     228            0 :       session_name += transfer_id;
     229            0 :       send_notification(
     230            0 :         notification_type::e_notification_type::START_TRANSFER, get_bookkeeper_id(), session_name, client);
     231            0 :     }
     232              :   } else {
     233            0 :     ers::warning(InvalidGroupTransferIDError(ERS_HERE, transfer_id, get_bookkeeper_id()));
     234              :   }
     235            0 : }
     236              : 
     237              : void
     238            0 : Bookkeeper::do_work(std::atomic<bool>& running_flag)
     239              : {
     240            0 :   TLOG() << "JAB " << __LINE__;
     241              :   // Just one request on startup, after that the clients will have to send by themself
     242            0 :   for (const std::string& client : get_clients_conn()) {
     243            0 :     TLOG() << "JAB " << __LINE__ << " " << client;
     244            0 :     request_connection_and_available_files(client);
     245              :   }
     246            0 :   TLOG() << "JAB " << __LINE__;
     247              : 
     248            0 :   auto time_point = std::chrono::high_resolution_clock::now();
     249              : 
     250            0 :   while (running_flag.load()) {
     251            0 :     TLOG() << "JAB " << __LINE__;
     252            0 :     lookups_connections();
     253            0 :     TLOG() << "JAB " << __LINE__ << " " << get_bookkeepers_conn().size();
     254            0 :     std::optional<NotificationData> msg = listen_for_notification(get_bookkeepers_conn().front());
     255            0 :     TLOG() << "JAB " << __LINE__;
     256            0 :     if (msg.has_value()) {
     257            0 :       TLOG() << "JAB " << __LINE__;
     258            0 :       action_on_receive_notification(msg.value());
     259              :     }
     260            0 :     TLOG() << "JAB " << __LINE__;
     261              : 
     262              :     // check alives clients and available files
     263            0 :     std::this_thread::sleep_for(std::chrono::milliseconds(100));
     264              : 
     265              :     // Auto update metadata every 2 seconds
     266            0 :     if (std::chrono::duration_cast<std::chrono::seconds>(std::chrono::high_resolution_clock::now() - time_point)
     267            0 :           .count() >= m_refresh_rate) {
     268            0 :       time_point = std::chrono::high_resolution_clock::now();
     269            0 :       request_update_metadata();
     270            0 :       display_information();
     271              :     }
     272            0 :   }
     273            0 : }
     274              : 
     275              : bool
     276            0 : Bookkeeper::start()
     277              : {
     278            0 :   auto time_point = std::chrono::high_resolution_clock::now();
     279              : 
     280              :   // Just one request on startup, after that the clients will have to send by themself
     281            0 :   for (const std::string& client : get_clients_conn()) {
     282            0 :     request_connection_and_available_files(client);
     283              :   }
     284              : 
     285            0 :   while (true) {
     286            0 :     std::optional<NotificationData> msg = listen_for_notification(get_bookkeepers_conn().front());
     287            0 :     if (msg.has_value()) {
     288            0 :       action_on_receive_notification(msg.value());
     289              :     }
     290              : 
     291            0 :     std::string input;
     292            0 :     getline(std::cin, input);
     293            0 :     if (input.empty() == false) {
     294            0 :       input_action(input[0]);
     295              :     }
     296              : 
     297              :     // check alives clients and available files
     298            0 :     for (const std::string& client : get_clients_conn()) {
     299            0 :       if (m_transfers.find(client) != m_transfers.end()) {
     300              :         // already known client
     301            0 :         continue;
     302              :       }
     303              : 
     304            0 :       request_connection_and_available_files(client);
     305              :     }
     306              : 
     307            0 :     std::this_thread::sleep_for(std::chrono::milliseconds(100));
     308              : 
     309              :     // Auto update metadata every 2 seconds
     310            0 :     if (std::chrono::duration_cast<std::chrono::seconds>(std::chrono::high_resolution_clock::now() - time_point)
     311            0 :           .count() >= 5) {
     312            0 :       time_point = std::chrono::high_resolution_clock::now();
     313            0 :       request_update_metadata();
     314            0 :       display_information();
     315              :     }
     316            0 :   }
     317              : }
     318              : 
     319              : void
     320            0 : Bookkeeper::request_connection_and_available_files(const std::string& client)
     321              : {
     322              :   // send connection request to client
     323            0 :   send_notification(notification_type::e_notification_type::CONNECTION_REQUEST,
     324            0 :                     get_bookkeeper_id(),
     325              :                     client,
     326              :                     client,
     327            0 :                     get_bookkeeper_id(),
     328              :                     1);
     329              : 
     330              :   // Listen to receive connection response and available files
     331              :   // auto msg = listen_for_notification(get_bookkeepers_conn().front(), client);
     332              : 
     333              :   // while (msg.has_value() && msg.value().m_notification !=
     334              :   // notification_type::notification_to_string(notification_type::e_notification_type::CONNECTION_REQUEST))
     335              :   // {
     336              :   //     action_on_receive_notification(msg.value());
     337              :   //     msg = listen_for_notification(get_bookkeepers_conn().front(), client);
     338              :   // }
     339            0 : }
     340              : 
     341              : void
     342            0 : Bookkeeper::request_update_metadata(bool force)
     343              : {
     344            0 :   for (const auto& [id, g] : get_grp_transfers()) {
     345              :     // Only request for dynamic status
     346            0 :     if (g.get_group_status() == status_type::e_status::DOWNLOADING ||
     347            0 :         g.get_group_status() == status_type::e_status::CHECKING ||
     348            0 :         g.get_group_status() == status_type::e_status::UPLOADING ||
     349            0 :         g.get_group_status() == status_type::e_status::HASHING || force) {
     350              : 
     351            0 :       for (const std::string& session : m_clients_per_grp_transfer[g.get_group_id()]) {
     352            0 :         send_notification(notification_type::e_notification_type::UPDATE_REQUEST,
     353            0 :                           get_bookkeeper_id(),
     354              :                           session,
     355            0 :                           get_client_name_from_session_name(session));
     356              :       }
     357              :     }
     358              :   }
     359            0 : }
     360              : 
     361              : bool
     362            0 : Bookkeeper::action_on_receive_notification(NotificationData notif)
     363              : {
     364            0 :   TLOG() << "debug : Action on request " << notif.m_notification << " for " << notif.m_target_id;
     365              : 
     366            0 :   if (notif.m_target_id.find(get_bookkeeper_id()) == std::string::npos && notif.m_target_id != "all") {
     367            0 :     ers::warning(
     368            0 :       NotificationWrongDestinationError(ERS_HERE, get_bookkeeper_id(), notif.m_source_id, notif.m_target_id));
     369            0 :     return false;
     370              :   }
     371              : 
     372              :   // Use enum cast for converting string to enum, still working with older clients and user readable
     373            0 :   auto action = notification_type::string_to_notification(notif.m_notification);
     374              : 
     375            0 :   if (action.has_value() == false) {
     376            0 :     ers::warning(
     377            0 :       InvalidNotificationReceivedError(ERS_HERE, get_bookkeeper_id(), notif.m_source_id, notif.m_notification));
     378              :   }
     379              : 
     380            0 :   switch (action.value()) {
     381            0 :     case notification_type::e_notification_type::TRANSFER_METADATA: {
     382            0 :       if (notif.m_data == "end") {
     383              :         // Create entry in the map in case no files
     384            0 :         m_transfers[notif.m_source_id];
     385              :         break;
     386              :       }
     387              : 
     388              :       // Store it
     389            0 :       add_update_transfer(notif.m_source_id, notif.m_data);
     390              :       break;
     391              :     }
     392              : 
     393            0 :     case notification_type::e_notification_type::TRANSFER_ERROR:
     394            0 :     case notification_type::e_notification_type::GROUP_METADATA: {
     395              :       // Loading the data and cnovert to a proper transfer metadata object
     396            0 :       GroupMetadata group_meta(notif.m_data, false);
     397              : 
     398              :       // Store it
     399            0 :       m_clients_per_grp_transfer[group_meta.get_group_id()].insert(notif.m_source_id);
     400            0 :       add_update_grp_transfer(std::move(group_meta));
     401            0 :       break;
     402            0 :     }
     403              : 
     404            0 :     default:
     405            0 :       ers::warning(NotHandledNotificationError(ERS_HERE, get_bookkeeper_id(), notif.m_source_id, notif.m_notification));
     406              :   }
     407              :   return true;
     408              : }
     409              : 
     410              : void
     411            0 : Bookkeeper::display_information()
     412              : {
     413            0 :   std::ostream* output = nullptr;
     414            0 :   std::ostream* output_line_log = nullptr;
     415            0 :   std::string sep = ";";
     416              : 
     417            0 :   if (m_file_log_path != "") {
     418              :     // open file
     419            0 :     output = new std::ofstream();
     420            0 :     output_line_log = new std::ofstream();
     421            0 :     dynamic_cast<std::ofstream*>(output)->open(m_file_log_path + get_bookkeeper_id() + ".log", std::fstream::out);
     422            0 :     dynamic_cast<std::ofstream*>(output_line_log)
     423            0 :       ->open(m_file_log_path + get_bookkeeper_id() + "_line.csv", std::fstream::app | std::fstream::out);
     424              :     // clear file
     425            0 :     dynamic_cast<std::ofstream*>(output)->clear();
     426            0 :     TLOG() << "debug : output log wroten " << m_file_log_path << get_bookkeeper_id() << ".log\t" << m_file_log_path
     427            0 :            << get_bookkeeper_id() << "_line.csv";
     428              : 
     429              :     // if csv file empty, write header
     430            0 :     if (dynamic_cast<std::ofstream*>(output_line_log)->tellp() == 0) {
     431            0 :       *output_line_log << "time" << sep << "file_name" << sep << "file_full_path" << sep << "group_id" << sep
     432              :                        << "src_ip" << sep << "size" << sep
     433              : 
     434              :                        << "dest_ip" << sep << "start_time" << sep << "duration" << sep << "progress" << sep << "speed"
     435              :                        << sep << "state" << sep
     436              : 
     437            0 :                        << "end_time" << sep << "error" << sep << std::endl;
     438              :     }
     439              :   } else {
     440              :     output = &std::cout;
     441              :   }
     442              : 
     443            0 :   *output << "***** Bookkeeper " << get_bookkeeper_id() << " " + get_ip().get_ip_port() << " informations display *****"
     444            0 :           << std::endl;
     445              :   // *output << "q: quit, d : display info, n : new transfer, s : start transfer" << std::endl;
     446            0 :   *output << "Connected clients :" << std::endl;
     447              : 
     448            0 :   for (const auto& client : get_transfers()) {
     449            0 :     bool is_session = false;
     450              :     // If it's a session
     451            0 :     if (client.first.find("ses") != std::string::npos) {
     452            0 :       *output << "\t* Session " << client.first << " is active" << std::endl;
     453              :       is_session = true;
     454              :     } else {
     455            0 :       *output << "> Client " << client.first << " is connected" << std::endl;
     456              :     }
     457              : 
     458              :     // print for each file the status
     459            0 :     for (const auto& file : client.second) {
     460            0 :       if (m_file_log_path != "") {
     461            0 :         *output_line_log << std::chrono::duration_cast<std::chrono::seconds>(
     462            0 :                               std::chrono::system_clock::now().time_since_epoch())
     463            0 :                               .count()
     464            0 :                          << sep << file->get_file_name() << sep << file->get_file_path() << sep << file->get_group_id()
     465            0 :                          << sep << file->get_src().get_ip_port() << sep << file->get_size() << sep
     466              : 
     467            0 :                          << file->get_dest().get_ip_port() << sep << file->get_start_time_str() << sep
     468            0 :                          << file->get_total_duration_ms() << sep << file->get_progress() << sep
     469            0 :                          << file->get_transmission_speed() << sep << status_type::status_to_string(file->get_status())
     470              :                          << sep
     471              : 
     472            0 :                          << file->get_end_time_str() << sep << file->get_error_code() << sep
     473              : 
     474            0 :                          << std::endl;
     475              :       }
     476              : 
     477            0 :       if (is_session) {
     478            0 :         *output << "\t\t - ";
     479              :       } else {
     480            0 :         *output << "\t - ";
     481              :       }
     482              : 
     483            0 :       if (is_session)
     484            0 :         *output << file->get_file_name() << "\t" << file->get_size() << " bytes\tfrom " << file->get_src().get_ip_port()
     485              :                 << "\t"
     486              :                 // << file->get_dest().get_ip_port() << "\t"
     487            0 :                 << status_type::status_to_string(file->get_status()) << "\t" << file->get_progress() << "%\t"
     488            0 :                 << (file->get_transmission_speed() == 0 ? "-" : std::to_string(file->get_transmission_speed()))
     489            0 :                 << "Bi/s\t" << file->get_start_time_str() << "\t" << file->get_total_duration_ms() << "ms\t"
     490            0 :                 << file->get_end_time_str() << "\t" << file->get_error_code() << "\t" << std::endl;
     491              :       else
     492            0 :         *output << "Available file " << file->get_file_path() << "\t" << file->get_size() << " bytes\tfrom "
     493            0 :                 << file->get_src().get_ip_port() << "\t" << std::endl;
     494              :     }
     495              :   }
     496              : 
     497            0 :   *output << std::endl << "Active Transfers :" << std::endl;
     498              : 
     499            0 :   *output << "Group ID\t" << "Protocol\t" << "Src\t" << "IP\t" << "Status\t" << std::endl;
     500              : 
     501            0 :   for (const auto& [id, g] : get_grp_transfers()) {
     502            0 :     *output << g.get_group_id() << "\t" << protocol_type::protocols_to_string(g.get_protocol()) << "\t"
     503            0 :             << g.get_source_id() << "\t" << g.get_source_ip().get_ip_port() << "\t"
     504            0 :             << status_type::status_to_string(g.get_group_status()) << "\t" << std::endl;
     505              : 
     506            0 :     for (const std::shared_ptr<TransferMetadata>& fmeta : g.get_transfers_meta()) {
     507            0 :       *output << "\t- " << fmeta->get_file_name() << "\t" << fmeta->get_src().get_ip_port() << " to "
     508            0 :               << fmeta->get_dest().get_ip_port() << "\t" << status_type::status_to_string(fmeta->get_status()) << "\t"
     509            0 :               << std::endl;
     510              :     }
     511              : 
     512            0 :     for (const std::string& f : g.get_expected_files()) {
     513            0 :       *output << "\t- " << f << "\t" << "Expected" << std::endl;
     514              :     }
     515              :   }
     516              : 
     517            0 :   if (m_file_log_path != "") {
     518            0 :     dynamic_cast<std::ofstream*>(output)->close();
     519            0 :     dynamic_cast<std::ofstream*>(output_line_log)->close();
     520              :   } else {
     521            0 :     output->flush();
     522              :   }
     523            0 : }
     524              : 
     525              : void
     526            0 : Bookkeeper::add_update_transfer(const std::string& client_id, const std::string& data)
     527              : {
     528              :   // Loading the data and convert to a proper transfer metadata object
     529            0 :   std::shared_ptr<TransferMetadata> file = std::make_shared<TransferMetadata>(data, false);
     530            0 :   std::string group_id_tmp = file->get_group_id();
     531              : 
     532            0 :   std::vector<std::shared_ptr<TransferMetadata>>& tr_vector = get_transfers()[client_id];
     533            0 :   for (std::shared_ptr<TransferMetadata>& tr : tr_vector) {
     534            0 :     if (*tr == *file) {
     535              :       // Already inserted, simply update the one already present
     536            0 :       tr->from_string(data);
     537              : 
     538              :       // Add available file information
     539              :       // file->set_group_id("");
     540              :       // m_transfers[client_id].push_back(file);
     541              : 
     542            0 :       return;
     543              :     }
     544              :   }
     545              : 
     546              :   // Check if transfer already exist in a group transfer
     547            0 :   if (group_id_tmp != "" && m_grp_transfers.find(group_id_tmp) != m_grp_transfers.end()) {
     548            0 :     m_grp_transfers.at(group_id_tmp).add_file(file);
     549              :   }
     550            0 :   m_transfers[client_id].push_back(file);
     551            0 : }
     552              : 
     553              : void
     554            0 : Bookkeeper::add_update_grp_transfer(GroupMetadata grp_transfers)
     555              : {
     556            0 :   std::string group_id_tmp = grp_transfers.get_group_id();
     557            0 :   if (m_grp_transfers.find(group_id_tmp) != m_grp_transfers.end()) {
     558              :     // Already inserted, copy old values
     559            0 :     grp_transfers.set_transfers_meta(m_grp_transfers.at(grp_transfers.get_group_id()).get_transfers_meta());
     560            0 :     grp_transfers.set_expected_files(m_grp_transfers.at(grp_transfers.get_group_id()).get_expected_files());
     561            0 :     m_grp_transfers.erase(group_id_tmp);
     562              :   }
     563            0 :   m_grp_transfers.insert({ group_id_tmp, std::move(grp_transfers) });
     564            0 : }
     565              : } // namespace dunedaq::snbmodules
        

Generated by: LCOV version 2.0-1