LCOV - code coverage report
Current view: top level - snbmodules/src/bookkeeper - bookkeeper.cpp (source / functions) Coverage Total Hit
Test: code.result Lines: 0.0 % 276 0
Test Date: 2025-12-21 13:07:08 Functions: 0.0 % 38 0

            Line data    Source code
       1              : /**
       2              :  * @file bookkeeper.cpp Bookkeeper class retriving informations from clients
       3              :  *
       4              :  * This is part of the DUNE DAQ , copyright 2020.
       5              :  * Licensing/copyright details are in the COPYING file that you should have
       6              :  * received with this code.
       7              :  */
       8              : 
       9              : #include "snbmodules/bookkeeper.hpp"
      10              : 
      11              : #include <set>
      12              : #include <string>
      13              : #include <utility>
      14              : #include <vector>
      15              : 
      16              : namespace dunedaq::snbmodules {
      17              : // TODO Aug-14-2022 Leo Joly leo.vincent.andre.joly@cern.ch : Obsolete, is this needed anymore ?
      18              : void
      19            0 : Bookkeeper::create_new_transfer(const std::string& protocol,
      20              :                                 const std::string& src,
      21              :                                 const std::set<std::string>& dests,
      22              :                                 const std::set<std::filesystem::path>& files,
      23              :                                 const nlohmann::json& protocol_options)
      24              : {
      25              :   // suppress warnings
      26            0 :   (void)protocol;
      27            0 :   (void)src;
      28            0 :   (void)dests;
      29            0 :   (void)files;
      30            0 :   (void)protocol_options;
      31              : 
      32              :   // TLOG() << "debug : creating new transfer with protocol " << protocol;
      33              : 
      34              :   // // Check protocol
      35              :   // std::optional<protocol_type::e_protocol_type> _protocol = protocol_type::string_to_protocols(protocol);
      36              :   // if (!_protocol.has_value())
      37              :   // {
      38              :   //     TLOG() << "debug : invalid protocol !";
      39              :   //     return;
      40              :   // }
      41              : 
      42              :   // // Check files and found metadata
      43              :   // std::set<TransferMetadata *> files_meta;
      44              :   // auto list = get_m_transfers().at(src);
      45              :   // int nb_files = 0;
      46              : 
      47              :   // for (std::filesystem::path file : files)
      48              :   // {
      49              :   //     bool found = false;
      50              :   //     for (auto filemeta : list)
      51              :   //     {
      52              :   //         if (filemeta.get_file_name() == file)
      53              :   //         {
      54              :   //             files_meta.emplace(&filemeta);
      55              :   //             found = true;
      56              :   //             nb_files++;
      57              :   //             break;
      58              :   //         }
      59              :   //     }
      60              :   //     if (!found)
      61              :   //     {
      62              :   //         TLOG() << "debug : file " << file << " not found ! (ignoring)";
      63              :   //     }
      64              :   // }
      65              : 
      66              :   // if (nb_files == 0)
      67              :   // {
      68              :   //     TLOG() << "debug : no file found ! exiting transfer creation.";
      69              :   //     return;
      70              :   // }
      71              : 
      72              :   // // Create transfer
      73              :   // GroupMetadata transfer("transfer" + std::to_string(transfers.size()), _protocol.value(), files_meta);
      74              :   // transfers_link[transfer.get_group_id()] = src;
      75              :   // transfers.emplace(transfer);
      76              : 
      77              :   // // Notify clients
      78              :   // for (auto client : src)
      79              :   // {
      80              :   //     std::string session_name = client + "_ses" + transfer.get_group_id();
      81              : 
      82              :   //     TLOG() << "debug : notifying src client " << client;
      83              :   //     send_notification(notification_type::e_notification_type::NEW_TRANSFER, get_bookkeeper_id(), session_name,
      84              :   //     client, 1000, transfer.export_to_string());
      85              :   // }
      86            0 : }
      87              : 
      88              : // Only for stand alone application
      89              : void
      90            0 : Bookkeeper::input_action(char input)
      91              : {
      92            0 :   switch (input) {
      93            0 :     case 'q': {
      94            0 :       TLOG() << "Exiting...";
      95            0 :       exit(0);
      96            0 :       break;
      97              :     }
      98            0 :     case 'd': {
      99            0 :       display_information();
     100            0 :       break;
     101              :     }
     102            0 :     case 'n': {
     103            0 :       TLOG() << "Creating new transfer ...";
     104            0 :       TLOG() << "Choose protocol in the list";
     105            0 :       for (int enum_i = protocol_type::e_protocol_type::BITTORRENT; enum_i != protocol_type::e_protocol_type::dummy;
     106              :            enum_i++) {
     107            0 :         TLOG() << enum_i << " - "
     108            0 :                << protocol_type::protocols_to_string(static_cast<protocol_type::e_protocol_type>(enum_i));
     109              :       }
     110            0 :       int protocol = -1;
     111            0 :       std::cin >> protocol;
     112              :       // Check input
     113            0 :       if (protocol < 0 || protocol > protocol_type::e_protocol_type::dummy) {
     114            0 :         TLOG() << "Invalid protocol";
     115            0 :         break;
     116              :       }
     117              : 
     118            0 :       TLOG() << "Choose clients (q when finished)";
     119            0 :       std::set<std::string> choosen_clients;
     120            0 :       while (true) {
     121            0 :         std::string client;
     122            0 :         std::cin >> client;
     123            0 :         if (client == "q") {
     124              :           break;
     125              :         }
     126              :         // Check input
     127            0 :         if (std::find(get_clients_conn().begin(), get_clients_conn().end(), client) == get_clients_conn().end()) {
     128            0 :           TLOG() << "Invalid client";
     129            0 :           break;
     130              :         }
     131              : 
     132            0 :         choosen_clients.emplace(client);
     133            0 :       }
     134            0 :       if (choosen_clients.empty()) {
     135            0 :         TLOG() << "No client selected";
     136            0 :         break;
     137              :       }
     138            0 :       if (choosen_clients.size() < 2) {
     139            0 :         TLOG() << "At least 2 clients are required";
     140            0 :         break;
     141              :       }
     142              : 
     143            0 :       TLOG() << "Choose file to transmit (q when finished)";
     144            0 :       std::set<std::shared_ptr<TransferMetadata>> choosen_files;
     145            0 :       while (true) {
     146            0 :         uint64_t initial_size = choosen_files.size();
     147            0 :         std::string file;
     148            0 :         std::cin >> file;
     149            0 :         if (file == "q") {
     150              :           break;
     151              :         }
     152              : 
     153            0 :         for (const std::string& client : choosen_clients) {
     154              :           // Check input
     155            0 :           auto& list = get_transfers().at(client);
     156            0 :           bool found = false;
     157            0 :           for (std::shared_ptr<TransferMetadata> filemeta : list) {
     158            0 :             if (filemeta->get_file_name() == file) {
     159            0 :               choosen_files.emplace(filemeta);
     160            0 :               found = true;
     161            0 :               break;
     162              :             }
     163            0 :           }
     164            0 :           if (found == true) {
     165              :             break;
     166              :           }
     167              :         }
     168            0 :         if (initial_size == choosen_files.size()) {
     169            0 :           TLOG() << "Invalid file";
     170            0 :           break;
     171              :         }
     172            0 :       }
     173            0 :       if (choosen_files.empty()) {
     174            0 :         TLOG() << "No file selected";
     175            0 :         break;
     176              :       }
     177              : 
     178              :       // create_new_transfer(protocol_type::protocols_to_string(protocol_type::string_to_protocols(protocol).value()),
     179              :       // choosen_clients, choosen_files);
     180              :       break;
     181            0 :     }
     182              : 
     183            0 :     case 's': {
     184            0 :       TLOG() << "Choose transfers to start ... ";
     185            0 :       std::set<std::string> choosen_transfers;
     186            0 :       while (true) {
     187            0 :         std::string input;
     188            0 :         std::cin >> input;
     189            0 :         if (input == "q") {
     190              :           break;
     191              :         }
     192              :         // Check input
     193            0 :         if (m_clients_per_grp_transfer.find(input) == m_clients_per_grp_transfer.end()) {
     194            0 :           TLOG() << "Invalid transfer";
     195            0 :           break;
     196              :         }
     197            0 :         choosen_transfers.emplace(input);
     198            0 :       }
     199            0 :       if (choosen_transfers.size() == 0) {
     200            0 :         TLOG() << "No transfer selected";
     201            0 :         break;
     202              :       }
     203              : 
     204            0 :       for (const auto& transfer : choosen_transfers) {
     205            0 :         start_transfers(transfer);
     206              :       }
     207            0 :       break;
     208            0 :     }
     209              : 
     210            0 :     default:
     211            0 :       TLOG() << "Unknown command";
     212            0 :       break;
     213              :   }
     214            0 : }
     215              : 
     216              : void
     217            0 : Bookkeeper::start_transfers(const std::string& transfer_id)
     218              : {
     219            0 :   TLOG() << "Starting transfer " << transfer_id;
     220              : 
     221            0 :   if (m_clients_per_grp_transfer.find(transfer_id) != m_clients_per_grp_transfer.end()) {
     222            0 :     for (const std::string& client : m_clients_per_grp_transfer[transfer_id]) {
     223            0 :       std::string session_name = client;
     224            0 :       session_name += "_ses";
     225            0 :       session_name += transfer_id;
     226            0 :       send_notification(
     227            0 :         notification_type::e_notification_type::START_TRANSFER, get_bookkeeper_id(), session_name, client);
     228            0 :     }
     229              :   } else {
     230            0 :     ers::warning(InvalidGroupTransferIDError(ERS_HERE, transfer_id, get_bookkeeper_id()));
     231              :   }
     232            0 : }
     233              : 
     234              : void
     235            0 : Bookkeeper::do_work(std::atomic<bool>& running_flag)
     236              : {
     237            0 :   TLOG() << "JAB " << __LINE__;
     238              :   // Just one request on startup, after that the clients will have to send by themself
     239            0 :   for (const std::string& client : get_clients_conn()) {
     240            0 :     TLOG() << "JAB " << __LINE__ << " " << client;
     241            0 :     request_connection_and_available_files(client);
     242              :   }
     243            0 :   TLOG() << "JAB " << __LINE__;
     244              : 
     245            0 :   auto time_point = std::chrono::high_resolution_clock::now();
     246              : 
     247            0 :   while (running_flag.load()) {
     248            0 :     TLOG() << "JAB " << __LINE__;
     249            0 :     lookups_connections();
     250            0 :     TLOG() << "JAB " << __LINE__ << " " << get_bookkeepers_conn().size();
     251            0 :     std::optional<NotificationData> msg = listen_for_notification(get_bookkeepers_conn().front());
     252            0 :     TLOG() << "JAB " << __LINE__;
     253            0 :     if (msg.has_value()) {
     254            0 :       TLOG() << "JAB " << __LINE__;
     255            0 :       action_on_receive_notification(msg.value());
     256              :     }
     257            0 :     TLOG() << "JAB " << __LINE__;
     258              : 
     259              :     // check alives clients and available files
     260            0 :     std::this_thread::sleep_for(std::chrono::milliseconds(100));
     261              : 
     262              :     // Auto update metadata every 2 seconds
     263            0 :     if (std::chrono::duration_cast<std::chrono::seconds>(std::chrono::high_resolution_clock::now() - time_point)
     264            0 :           .count() >= m_refresh_rate) {
     265            0 :       time_point = std::chrono::high_resolution_clock::now();
     266            0 :       request_update_metadata();
     267            0 :       display_information();
     268              :     }
     269            0 :   }
     270            0 : }
     271              : 
     272              : bool
     273            0 : Bookkeeper::start()
     274              : {
     275            0 :   auto time_point = std::chrono::high_resolution_clock::now();
     276              : 
     277              :   // Just one request on startup, after that the clients will have to send by themself
     278            0 :   for (const std::string& client : get_clients_conn()) {
     279            0 :     request_connection_and_available_files(client);
     280              :   }
     281              : 
     282            0 :   while (true) {
     283            0 :     std::optional<NotificationData> msg = listen_for_notification(get_bookkeepers_conn().front());
     284            0 :     if (msg.has_value()) {
     285            0 :       action_on_receive_notification(msg.value());
     286              :     }
     287              : 
     288            0 :     std::string input;
     289            0 :     getline(std::cin, input);
     290            0 :     if (input.empty() == false) {
     291            0 :       input_action(input[0]);
     292              :     }
     293              : 
     294              :     // check alives clients and available files
     295            0 :     for (const std::string& client : get_clients_conn()) {
     296            0 :       if (m_transfers.find(client) != m_transfers.end()) {
     297              :         // already known client
     298            0 :         continue;
     299              :       }
     300              : 
     301            0 :       request_connection_and_available_files(client);
     302              :     }
     303              : 
     304            0 :     std::this_thread::sleep_for(std::chrono::milliseconds(100));
     305              : 
     306              :     // Auto update metadata every 2 seconds
     307            0 :     if (std::chrono::duration_cast<std::chrono::seconds>(std::chrono::high_resolution_clock::now() - time_point)
     308            0 :           .count() >= 5) {
     309            0 :       time_point = std::chrono::high_resolution_clock::now();
     310            0 :       request_update_metadata();
     311            0 :       display_information();
     312              :     }
     313            0 :   }
     314              : }
     315              : 
     316              : void
     317            0 : Bookkeeper::request_connection_and_available_files(const std::string& client)
     318              : {
     319              :   // send connection request to client
     320            0 :   send_notification(notification_type::e_notification_type::CONNECTION_REQUEST,
     321            0 :                     get_bookkeeper_id(),
     322              :                     client,
     323              :                     client,
     324            0 :                     get_bookkeeper_id(),
     325              :                     1);
     326              : 
     327              :   // Listen to receive connection response and available files
     328              :   // auto msg = listen_for_notification(get_bookkeepers_conn().front(), client);
     329              : 
     330              :   // while (msg.has_value() && msg.value().m_notification !=
     331              :   // notification_type::notification_to_string(notification_type::e_notification_type::CONNECTION_REQUEST))
     332              :   // {
     333              :   //     action_on_receive_notification(msg.value());
     334              :   //     msg = listen_for_notification(get_bookkeepers_conn().front(), client);
     335              :   // }
     336            0 : }
     337              : 
     338              : void
     339            0 : Bookkeeper::request_update_metadata(bool force)
     340              : {
     341            0 :   for (const auto& [id, g] : get_grp_transfers()) {
     342              :     // Only request for dynamic status
     343            0 :     if (g.get_group_status() == status_type::e_status::DOWNLOADING ||
     344            0 :         g.get_group_status() == status_type::e_status::CHECKING ||
     345            0 :         g.get_group_status() == status_type::e_status::UPLOADING ||
     346            0 :         g.get_group_status() == status_type::e_status::HASHING || force) {
     347              : 
     348            0 :       for (const std::string& session : m_clients_per_grp_transfer[g.get_group_id()]) {
     349            0 :         send_notification(notification_type::e_notification_type::UPDATE_REQUEST,
     350            0 :                           get_bookkeeper_id(),
     351              :                           session,
     352            0 :                           get_client_name_from_session_name(session));
     353              :       }
     354              :     }
     355              :   }
     356            0 : }
     357              : 
     358              : bool
     359            0 : Bookkeeper::action_on_receive_notification(NotificationData notif)
     360              : {
     361            0 :   TLOG() << "debug : Action on request " << notif.m_notification << " for " << notif.m_target_id;
     362              : 
     363            0 :   if (notif.m_target_id.find(get_bookkeeper_id()) == std::string::npos && notif.m_target_id != "all") {
     364            0 :     ers::warning(
     365            0 :       NotificationWrongDestinationError(ERS_HERE, get_bookkeeper_id(), notif.m_source_id, notif.m_target_id));
     366            0 :     return false;
     367              :   }
     368              : 
     369              :   // Use enum cast for converting string to enum, still working with older clients and user readable
     370            0 :   auto action = notification_type::string_to_notification(notif.m_notification);
     371              : 
     372            0 :   if (action.has_value() == false) {
     373            0 :     ers::warning(
     374            0 :       InvalidNotificationReceivedError(ERS_HERE, get_bookkeeper_id(), notif.m_source_id, notif.m_notification));
     375              :   }
     376              : 
     377            0 :   switch (action.value()) {
     378            0 :     case notification_type::e_notification_type::TRANSFER_METADATA: {
     379            0 :       if (notif.m_data == "end") {
     380              :         // Create entry in the map in case no files
     381            0 :         m_transfers[notif.m_source_id];
     382              :         break;
     383              :       }
     384              : 
     385              :       // Store it
     386            0 :       add_update_transfer(notif.m_source_id, notif.m_data);
     387              :       break;
     388              :     }
     389              : 
     390            0 :     case notification_type::e_notification_type::TRANSFER_ERROR:
     391            0 :     case notification_type::e_notification_type::GROUP_METADATA: {
     392              :       // Loading the data and cnovert to a proper transfer metadata object
     393            0 :       GroupMetadata group_meta(notif.m_data, false);
     394              : 
     395              :       // Store it
     396            0 :       m_clients_per_grp_transfer[group_meta.get_group_id()].insert(notif.m_source_id);
     397            0 :       add_update_grp_transfer(std::move(group_meta));
     398            0 :       break;
     399            0 :     }
     400              : 
     401            0 :     default:
     402            0 :       ers::warning(NotHandledNotificationError(ERS_HERE, get_bookkeeper_id(), notif.m_source_id, notif.m_notification));
     403              :   }
     404              :   return true;
     405              : }
     406              : 
     407              : void
     408            0 : Bookkeeper::display_information()
     409              : {
     410            0 :   std::ostream* output = nullptr;
     411            0 :   std::ostream* output_line_log = nullptr;
     412            0 :   std::string sep = ";";
     413              : 
     414            0 :   if (m_file_log_path != "") {
     415              :     // open file
     416            0 :     output = new std::ofstream();
     417            0 :     output_line_log = new std::ofstream();
     418            0 :     dynamic_cast<std::ofstream*>(output)->open(m_file_log_path + get_bookkeeper_id() + ".log", std::fstream::out);
     419            0 :     dynamic_cast<std::ofstream*>(output_line_log)
     420            0 :       ->open(m_file_log_path + get_bookkeeper_id() + "_line.csv", std::fstream::app | std::fstream::out);
     421              :     // clear file
     422            0 :     dynamic_cast<std::ofstream*>(output)->clear();
     423            0 :     TLOG() << "debug : output log wroten " << m_file_log_path << get_bookkeeper_id() << ".log\t" << m_file_log_path
     424            0 :            << get_bookkeeper_id() << "_line.csv";
     425              : 
     426              :     // if csv file empty, write header
     427            0 :     if (dynamic_cast<std::ofstream*>(output_line_log)->tellp() == 0) {
     428            0 :       *output_line_log << "time" << sep << "file_name" << sep << "file_full_path" << sep << "group_id" << sep
     429              :                        << "src_ip" << sep << "size" << sep
     430              : 
     431              :                        << "dest_ip" << sep << "start_time" << sep << "duration" << sep << "progress" << sep << "speed"
     432              :                        << sep << "state" << sep
     433              : 
     434            0 :                        << "end_time" << sep << "error" << sep << std::endl;
     435              :     }
     436              :   } else {
     437              :     output = &std::cout;
     438              :   }
     439              : 
     440            0 :   *output << "***** Bookkeeper " << get_bookkeeper_id() << " " + get_ip().get_ip_port() << " informations display *****"
     441            0 :           << std::endl;
     442              :   // *output << "q: quit, d : display info, n : new transfer, s : start transfer" << std::endl;
     443            0 :   *output << "Connected clients :" << std::endl;
     444              : 
     445            0 :   for (const auto& client : get_transfers()) {
     446            0 :     bool is_session = false;
     447              :     // If it's a session
     448            0 :     if (client.first.find("ses") != std::string::npos) {
     449            0 :       *output << "\t* Session " << client.first << " is active" << std::endl;
     450              :       is_session = true;
     451              :     } else {
     452            0 :       *output << "> Client " << client.first << " is connected" << std::endl;
     453              :     }
     454              : 
     455              :     // print for each file the status
     456            0 :     for (const auto& file : client.second) {
     457            0 :       if (m_file_log_path != "") {
     458            0 :         *output_line_log << std::chrono::duration_cast<std::chrono::seconds>(
     459            0 :                               std::chrono::system_clock::now().time_since_epoch())
     460            0 :                               .count()
     461            0 :                          << sep << file->get_file_name() << sep << file->get_file_path() << sep << file->get_group_id()
     462            0 :                          << sep << file->get_src().get_ip_port() << sep << file->get_size() << sep
     463              : 
     464            0 :                          << file->get_dest().get_ip_port() << sep << file->get_start_time_str() << sep
     465            0 :                          << file->get_total_duration_ms() << sep << file->get_progress() << sep
     466            0 :                          << file->get_transmission_speed() << sep << status_type::status_to_string(file->get_status())
     467              :                          << sep
     468              : 
     469            0 :                          << file->get_end_time_str() << sep << file->get_error_code() << sep
     470              : 
     471            0 :                          << std::endl;
     472              :       }
     473              : 
     474            0 :       if (is_session) {
     475            0 :         *output << "\t\t - ";
     476              :       } else {
     477            0 :         *output << "\t - ";
     478              :       }
     479              : 
     480            0 :       if (is_session)
     481            0 :         *output << file->get_file_name() << "\t" << file->get_size() << " bytes\tfrom " << file->get_src().get_ip_port()
     482              :                 << "\t"
     483              :                 // << file->get_dest().get_ip_port() << "\t"
     484            0 :                 << status_type::status_to_string(file->get_status()) << "\t" << file->get_progress() << "%\t"
     485            0 :                 << (file->get_transmission_speed() == 0 ? "-" : std::to_string(file->get_transmission_speed()))
     486            0 :                 << "Bi/s\t" << file->get_start_time_str() << "\t" << file->get_total_duration_ms() << "ms\t"
     487            0 :                 << file->get_end_time_str() << "\t" << file->get_error_code() << "\t" << std::endl;
     488              :       else
     489            0 :         *output << "Available file " << file->get_file_path() << "\t" << file->get_size() << " bytes\tfrom "
     490            0 :                 << file->get_src().get_ip_port() << "\t" << std::endl;
     491              :     }
     492              :   }
     493              : 
     494            0 :   *output << std::endl << "Active Transfers :" << std::endl;
     495              : 
     496            0 :   *output << "Group ID\t" << "Protocol\t" << "Src\t" << "IP\t" << "Status\t" << std::endl;
     497              : 
     498            0 :   for (const auto& [id, g] : get_grp_transfers()) {
     499            0 :     *output << g.get_group_id() << "\t" << protocol_type::protocols_to_string(g.get_protocol()) << "\t"
     500            0 :             << g.get_source_id() << "\t" << g.get_source_ip().get_ip_port() << "\t"
     501            0 :             << status_type::status_to_string(g.get_group_status()) << "\t" << std::endl;
     502              : 
     503            0 :     for (const std::shared_ptr<TransferMetadata>& fmeta : g.get_transfers_meta()) {
     504            0 :       *output << "\t- " << fmeta->get_file_name() << "\t" << fmeta->get_src().get_ip_port() << " to "
     505            0 :               << fmeta->get_dest().get_ip_port() << "\t" << status_type::status_to_string(fmeta->get_status()) << "\t"
     506            0 :               << std::endl;
     507              :     }
     508              : 
     509            0 :     for (const std::string& f : g.get_expected_files()) {
     510            0 :       *output << "\t- " << f << "\t" << "Expected" << std::endl;
     511              :     }
     512              :   }
     513              : 
     514            0 :   if (m_file_log_path != "") {
     515            0 :     dynamic_cast<std::ofstream*>(output)->close();
     516            0 :     dynamic_cast<std::ofstream*>(output_line_log)->close();
     517              :   } else {
     518            0 :     output->flush();
     519              :   }
     520            0 : }
     521              : 
     522              : void
     523            0 : Bookkeeper::add_update_transfer(const std::string& client_id, const std::string& data)
     524              : {
     525              :   // Loading the data and convert to a proper transfer metadata object
     526            0 :   std::shared_ptr<TransferMetadata> file = std::make_shared<TransferMetadata>(data, false);
     527            0 :   std::string group_id_tmp = file->get_group_id();
     528              : 
     529            0 :   std::vector<std::shared_ptr<TransferMetadata>>& tr_vector = get_transfers()[client_id];
     530            0 :   for (std::shared_ptr<TransferMetadata>& tr : tr_vector) {
     531            0 :     if (*tr == *file) {
     532              :       // Already inserted, simply update the one already present
     533            0 :       tr->from_string(data);
     534              : 
     535              :       // Add available file information
     536              :       // file->set_group_id("");
     537              :       // m_transfers[client_id].push_back(file);
     538              : 
     539            0 :       return;
     540              :     }
     541              :   }
     542              : 
     543              :   // Check if transfer already exist in a group transfer
     544            0 :   if (group_id_tmp != "" && m_grp_transfers.find(group_id_tmp) != m_grp_transfers.end()) {
     545            0 :     m_grp_transfers.at(group_id_tmp).add_file(file);
     546              :   }
     547            0 :   m_transfers[client_id].push_back(file);
     548            0 : }
     549              : 
     550              : void
     551            0 : Bookkeeper::add_update_grp_transfer(GroupMetadata grp_transfers)
     552              : {
     553            0 :   std::string group_id_tmp = grp_transfers.get_group_id();
     554            0 :   if (m_grp_transfers.find(group_id_tmp) != m_grp_transfers.end()) {
     555              :     // Already inserted, copy old values
     556            0 :     grp_transfers.set_transfers_meta(std::move(m_grp_transfers.at(grp_transfers.get_group_id()).get_transfers_meta()));
     557            0 :     grp_transfers.set_expected_files(std::move(m_grp_transfers.at(grp_transfers.get_group_id()).get_expected_files()));
     558            0 :     m_grp_transfers.erase(group_id_tmp);
     559              :   }
     560            0 :   m_grp_transfers.insert({ group_id_tmp, std::move(grp_transfers) });
     561            0 : }
     562              : } // namespace dunedaq::snbmodules
        

Generated by: LCOV version 2.0-1