DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
dunedaq::snbmodules::TransferClient Class Reference

TransferClient class, represent a client that can create session and communicate with Bookkeepers. extend NotificationInterface because the client can communicate and need to initialize the connection. More...

#include <transfer_client.hpp>

Inheritance diagram for dunedaq::snbmodules::TransferClient:
[legend]
Collaboration diagram for dunedaq::snbmodules::TransferClient:
[legend]

Public Member Functions

 TransferClient (const IPFormat &listening_ip, const std::string &client_id, const std::filesystem::path &listening_dir, const std::string &connection_prefix="snbmodules", int timeout_send=10, int timeout_receive=100)
 TransferClient constructor.
 
 ~TransferClient ()
 
bool start (int timeout)
 Start the client in curent thread.
 
bool do_work (std::atomic< bool > &running_flag)
 Start function to use in a thread.
 
void create_new_transfer (const std::string &transfer_id, const std::string &protocol, const std::set< std::string > &dest_clients, const std::set< std::filesystem::path > &files, const nlohmann::json &protocol_options=nlohmann::json())
 Create a new transfer.
 
void start_transfer (const std::string &transfer_id)
 Start, pause, resume or cancel a transfer.
 
void pause_transfer (const std::string &transfer_id)
 
void resume_transfer (const std::string &transfer_id)
 
void cancel_transfer (const std::string &transfer_id)
 
TransferSessioncreate_session (GroupMetadata transfer_options, e_session_type type, std::string id, const std::filesystem::path &work_dir, IPFormat ip=IPFormat(), const std::set< std::string > &dest_clients=std::set< std::string >())
 Create a new session, you can precise the IP address of the session forcing to use a specific network interface and different port that the client.
 
void scan_available_files (std::set< std::filesystem::path > &previous_scan, bool nested=false, std::filesystem::path folder=std::filesystem::path())
 Scan available files in the listening directory.
 
IPFormat get_ip () const
 
std::string get_client_id () const
 
std::filesystem::path get_listening_dir () const
 
TransferSessionget_session (std::string transfer_id)
 
std::vector< TransferSession > & get_sessions ()
 
const std::vector< TransferSession > & get_sessions () const
 
std::string get_my_conn ()
 
void set_ip (const std::string &ip)
 
void set_port (int port)
 
void set_client_id (std::string client_id)
 
void set_listening_dir (const std::filesystem::path &listening_dir)
 
- Public Member Functions inherited from dunedaq::snbmodules::NotificationInterface
 NotificationInterface (std::string connection_prefix="snbmodules", int timeout_send=10, int timeout_receive=100)
 
 NotificationInterface (std::vector< std::string > bk_conn, std::set< std::string > client_conn, const std::string &connection_prefix="snbmodules", int timeout_send=10, int timeout_receive=100)
 
virtual ~NotificationInterface ()=default
 
std::optional< NotificationDatalisten_for_notification (const std::string &id, const std::string &expected_from="", int timeout=-1, int tries=-1)
 Listen for a notification.
 
bool send_notification (const notification_type::e_notification_type &notif, const std::string &src, const std::string &dst, const std::string &id_conn, const std::string &data="", int tries=-1)
 Send a notification during m_timeout_send ms.
 
void lookups_connections ()
 Get the list of every connections, must have the prefix first in the name of the connection and the name of the connection must be in the format: prefix.*bookkeeper.* or prefix.*client.*.
 
const std::vector< std::string > & get_bookkeepers_conn () const
 Init the connection interface, Only used for standalone application.
 
const std::set< std::string > & get_clients_conn () const
 

Private Member Functions

std::string generate_session_id (const std::string &transferid, const std::string &dest="")
 Function to generate session ID.
 
void remove_session (const std::string &session_id)
 Add a session to the client.
 
bool action_on_receive_notification (NotificationData notif) override
 Action to do when the client receive a notification.
 
void share_available_files (const std::set< std::filesystem::path > &to_share, const std::string &dest)
 Share available files (in m_listening_dir)
 
std::shared_ptr< TransferMetadatacreate_metadata_from_file (const std::filesystem::path &src)
 Create a metadata from a file.
 

Private Attributes

IPFormat m_listening_ip
 IP address of the client.
 
std::string m_client_id
 TransferClient ID, unique identifier of the client.
 
std::filesystem::path m_listening_dir
 Listening directory, directory where the client will listen for incoming files and files to share.
 
std::vector< TransferSessionm_sessions
 Map of active sessions (key = session ID, value = session)
 
std::map< std::string, std::shared_ptr< TransferMetadata > > m_available_files
 Map of available files (key = file path, value = file metadata)
 
std::string m_my_conn = ""
 Connection uuid of the client, retrieved using the notification interface and calling get_my_conn()
 

Detailed Description

TransferClient class, represent a client that can create session and communicate with Bookkeepers. extend NotificationInterface because the client can communicate and need to initialize the connection.

Definition at line 33 of file transfer_client.hpp.

Constructor & Destructor Documentation

◆ TransferClient()

dunedaq::snbmodules::TransferClient::TransferClient ( const IPFormat & listening_ip,
const std::string & client_id,
const std::filesystem::path & listening_dir,
const std::string & connection_prefix = "snbmodules",
int timeout_send = 10,
int timeout_receive = 100 )

TransferClient constructor.

Parameters
listening_ipListening IP address
portListening port
client_idClient ID
listening_dirListening directory (where the client will save incoming files and files to share with bookkeepers)

Definition at line 18 of file transfer_client.cpp.

24 : NotificationInterface(connection_prefix, timeout_send, timeout_receive)
25 , m_listening_ip(listening_ip)
26 , m_client_id(client_id)
27{
28 TLOG() << "DAB " << __LINE__ << " " << listening_dir;
29 // remove all occurences of ./ in the file path
30 std::string file_path_str = listening_dir.string();
31 std::string x = "./";
32
33 size_t pos = 0;
34 while (true) {
35 TLOG() << "DAB " << __LINE__ << " " << file_path_str;
36 pos = file_path_str.find(x, pos);
37 if (pos == std::string::npos) {
38 break;
39 }
40
41 file_path_str.replace(pos, x.length(), "");
42 }
43 TLOG() << "DAB " << __LINE__ << " " << file_path_str;
44 if (file_path_str.length() == 0) {
45 file_path_str = "blah";
46 }
47 TLOG() << "DAB " << __LINE__ << " " << file_path_str;
48 m_listening_dir = std::filesystem::absolute(file_path_str);
49 TLOG() << "DAB " << __LINE__ << " " << file_path_str;
50 std::filesystem::create_directories(m_listening_dir);
51}
NotificationInterface(std::string connection_prefix="snbmodules", int timeout_send=10, int timeout_receive=100)
std::string m_client_id
TransferClient ID, unique identifier of the client.
std::filesystem::path m_listening_dir
Listening directory, directory where the client will listen for incoming files and files to share.
IPFormat m_listening_ip
IP address of the client.
#define TLOG(...)
Definition macro.hpp:22

◆ ~TransferClient()

dunedaq::snbmodules::TransferClient::~TransferClient ( )

Definition at line 53 of file transfer_client.cpp.

53{}

Member Function Documentation

◆ action_on_receive_notification()

bool dunedaq::snbmodules::TransferClient::action_on_receive_notification ( NotificationData notif)
overrideprivatevirtual

Action to do when the client receive a notification.

Parameters
notifNotification received
Returns
True if the notification was handled

Implements dunedaq::snbmodules::NotificationInterface.

Definition at line 291 of file transfer_client.cpp.

292{
293 TLOG() << "debug : Action on request " << notif.m_notification << " for " << notif.m_target_id;
294
295 if (notif.m_target_id.find(get_client_id()) == std::string::npos && notif.m_target_id != "all") {
296 ers::warning(NotificationWrongDestinationError(ERS_HERE, get_client_id(), notif.m_source_id, notif.m_target_id));
297 return false;
298 }
299
300 // Use enum cast for converting string to enum, still working with older clients and user readable
301 auto action = notification_type::string_to_notification(notif.m_notification);
302
303 if (action.has_value() == false) {
304 ers::warning(InvalidNotificationReceivedError(ERS_HERE, get_client_id(), notif.m_source_id, notif.m_notification));
305 }
306
307 switch (action.value()) {
308
310 TLOG() << "debug : receive connection request, sending available files";
311 std::set<std::filesystem::path> to_share;
312 scan_available_files(to_share, true);
313 share_available_files(to_share, notif.m_source_id);
316 notif.m_source_id,
317 notif.m_source_id,
318 "end");
319 break;
320 }
321
323 GroupMetadata metadata(notif.m_data, false);
325
326 // If file is available, create a downloader session
327 for (const auto& f : metadata.get_expected_files()) {
328 if (m_available_files.find(f) != m_available_files.end()) {
329 type = Uploader;
330 break;
331 }
332 }
333
334 TLOG() << "debug : creating session " << notif.m_target_id << " type "
336 std::string group_id_tmp = metadata.get_group_id();
337 create_session(metadata, type, notif.m_target_id, get_listening_dir().append(group_id_tmp));
338 break;
339 }
340
342 auto fmeta = std::make_shared<TransferMetadata>(notif.m_data, false);
343 auto& sessions_ref = get_sessions();
344
345 // print sessions names
346 for (auto& s : sessions_ref) {
347 TLOG() << "debug : session " << s.get_session_id();
348 if (s.get_session_id().find(notif.m_target_id) != std::string::npos) {
349 TLOG() << "debug : session found";
350 fmeta->set_dest(s.get_ip());
351 s.add_file(fmeta);
352 return true;
353 }
354 }
355
356 ers::warning(SessionIDNotFoundInClientError(ERS_HERE, get_client_id(), notif.m_target_id));
357
358 break;
359 }
360
362 TLOG() << "debug : starting transfer " << notif.m_target_id;
363 TransferSession* ses = get_session(notif.m_target_id);
364 if (ses != nullptr) {
365
366 if (notif.m_data == "") {
367 ses->start_all();
368 } else {
369 ses->start_file(ses->get_transfer_options().get_transfer_meta_from_file_path(notif.m_data));
370 }
371 } else {
372 ers::warning(SessionIDNotFoundInClientError(ERS_HERE, get_client_id(), notif.m_target_id));
373 }
374
375 break;
376 }
377
379 TLOG() << "debug : pausing transfer " << notif.m_target_id;
380 TransferSession* ses = get_session(notif.m_target_id);
381 if (ses != nullptr) {
382
383 if (notif.m_data == "") {
384 ses->pause_all();
385 } else {
386 ses->pause_file(ses->get_transfer_options().get_transfer_meta_from_file_path(notif.m_data));
387 }
388 } else {
389 ers::warning(SessionIDNotFoundInClientError(ERS_HERE, get_client_id(), notif.m_target_id));
390 }
391
392 break;
393 }
394
396 TLOG() << "debug : resuming transfer " << notif.m_target_id;
397 TransferSession* ses = get_session(notif.m_target_id);
398 if (ses != nullptr) {
399
400 if (notif.m_data == "") {
401 ses->resume_all();
402 } else {
403 ses->resume_file(ses->get_transfer_options().get_transfer_meta_from_file_path(notif.m_data));
404 }
405 } else {
406 ers::warning(SessionIDNotFoundInClientError(ERS_HERE, get_client_id(), notif.m_target_id));
407 }
408
409 break;
410 }
411
413 TLOG() << "debug : cancelling transfer " << notif.m_target_id;
414 TransferSession* ses = get_session(notif.m_target_id);
415 if (ses != nullptr) {
416
417 if (notif.m_data == "") {
418 ses->cancel_all();
419 } else {
420 ses->cancel_file(ses->get_transfer_options().get_transfer_meta_from_file_path(notif.m_data));
421 }
422 } else {
423 ers::warning(SessionIDNotFoundInClientError(ERS_HERE, get_client_id(), notif.m_target_id));
424 }
425
426 break;
427 }
428
430 TLOG() << "debug : updating grp transfer for " << notif.m_target_id;
431 TransferSession* ses = get_session(notif.m_target_id);
432 if (ses != nullptr) {
433
434 if (notif.m_data == "") {
435 ses->update_metadatas_to_bookkeeper();
436 } else {
437 ses->update_metadata_to_bookkeeper(
438 ses->get_transfer_options().get_transfer_meta_from_file_path(notif.m_data));
439 }
440 } else {
441 ers::warning(SessionIDNotFoundInClientError(ERS_HERE, get_client_id(), notif.m_target_id));
442 }
443
444 break;
445 }
446
447 default:
448 ers::warning(NotHandledNotificationError(ERS_HERE, get_client_id(), notif.m_source_id, notif.m_notification));
449 }
450 return true;
451}
#define ERS_HERE
bool send_notification(const notification_type::e_notification_type &notif, const std::string &src, const std::string &dst, const std::string &id_conn, const std::string &data="", int tries=-1)
Send a notification during m_timeout_send ms.
TransferSession & create_session(GroupMetadata transfer_options, e_session_type type, std::string id, const std::filesystem::path &work_dir, IPFormat ip=IPFormat(), const std::set< std::string > &dest_clients=std::set< std::string >())
Create a new session, you can precise the IP address of the session forcing to use a specific network...
TransferSession * get_session(std::string transfer_id)
std::map< std::string, std::shared_ptr< TransferMetadata > > m_available_files
Map of available files (key = file path, value = file metadata)
std::filesystem::path get_listening_dir() const
void scan_available_files(std::set< std::filesystem::path > &previous_scan, bool nested=false, std::filesystem::path folder=std::filesystem::path())
Scan available files in the listening directory.
void share_available_files(const std::set< std::filesystem::path > &to_share, const std::string &dest)
Share available files (in m_listening_dir)
std::vector< TransferSession > & get_sessions()
static std::string session_type_to_string(e_session_type e)
e_session_type
Different type of session.
@ Downloader
TransferSession used to download files from uploaders client.
@ Uploader
TransferSession used to upload files to downloaders client.
void warning(const Issue &issue)
Definition ers.hpp:115
static std::optional< e_notification_type > string_to_notification(std::string s)

◆ cancel_transfer()

void dunedaq::snbmodules::TransferClient::cancel_transfer ( const std::string & transfer_id)

Definition at line 223 of file transfer_client.cpp.

224{
225
226 TransferSession* session = get_session(transfer_id);
227
228 if (session == nullptr) {
229 ers::warning(SessionIDNotFoundInClientError(ERS_HERE, get_client_id(), transfer_id));
230 return;
231 }
232
233 session->cancel_all();
234}

◆ create_metadata_from_file()

std::shared_ptr< TransferMetadata > dunedaq::snbmodules::TransferClient::create_metadata_from_file ( const std::filesystem::path & src)
private

Create a metadata from a file.

Parameters
srcPath of the file
Returns
Metadata of the file

Definition at line 560 of file transfer_client.cpp.

561{
562 return std::make_shared<TransferMetadata>(src, std::filesystem::file_size(src), get_ip());
563}

◆ create_new_transfer()

void dunedaq::snbmodules::TransferClient::create_new_transfer ( const std::string & transfer_id,
const std::string & protocol,
const std::set< std::string > & dest_clients,
const std::set< std::filesystem::path > & files,
const nlohmann::json & protocol_options = nlohmann::json() )

Create a new transfer.

Parameters
transfer_idID of the transfer
protocolProtocol to use
dest_clientsSet of destination clients
filesSet of files to transfer
protocol_optionsProtocol options

Definition at line 103 of file transfer_client.cpp.

108{
109
110 std::string session_name = generate_session_id(transfer_id);
111
112 // Checking if transfer already exists
113 auto ses = get_session(session_name);
114 if (ses != nullptr) {
115 TLOG() << "debug : transfer " << transfer_id << " already exists !";
116 return;
117 }
118
119 TLOG() << "debug : creating new transfer with protocol " << protocol;
120
121 std::optional<protocol_type::e_protocol_type> _protocol = protocol_type::string_to_protocols(protocol);
122 if (!_protocol.has_value()) {
123 ers::error(InvalidProtocolError(ERS_HERE, get_client_id(), protocol));
124 return;
125 }
126 // Initialize transfer
127
128 GroupMetadata group_transfer(transfer_id, session_name, m_listening_ip, _protocol.value(), protocol_options);
129
130 for (const auto& file : files) {
131 // Check if file exists
132 if (!std::filesystem::exists(file)) {
133 ers::warning(FileForTransferNotExistError(ERS_HERE, get_client_id(), file));
134 continue;
135 } else {
136 group_transfer.add_expected_file(file);
137 group_transfer.add_file(std::move(create_metadata_from_file(file)));
138 }
139 }
140
141 if (group_transfer.get_transfers_meta().empty()) {
142 ers::error(FileForTransferNotExistError(ERS_HERE, get_client_id(), "All files"));
143 return;
144 }
145
146 // Sending to bookkeepers to update preparing state
147 for (auto& bk : get_bookkeepers_conn()) {
149 notification_type::e_notification_type::GROUP_METADATA, session_name, bk, bk, group_transfer.export_to_string());
150 }
151
152 // Create local session, can take time depending on protocol
153 auto& s = create_session(std::move(group_transfer),
155 session_name,
156 get_listening_dir().append(transfer_id),
158 dest_clients);
159
160 // Notify clients and sending group metadata
161 for (const auto& client : dest_clients) {
162 std::string new_session_name = generate_session_id(transfer_id, client);
163 TLOG() << "debug : notifying client " << client;
166 new_session_name,
167 client,
168 s.get_transfer_options().export_to_string());
169
170 // Send transfer metadata
171 for (auto file : s.get_transfer_options().get_transfers_meta()) {
174 new_session_name,
175 client,
176 file->export_to_string());
177 }
178 }
179}
const std::vector< std::string > & get_bookkeepers_conn() const
Init the connection interface, Only used for standalone application.
std::shared_ptr< TransferMetadata > create_metadata_from_file(const std::filesystem::path &src)
Create a metadata from a file.
std::string generate_session_id(const std::string &transferid, const std::string &dest="")
Function to generate session ID.
void error(const Issue &issue)
Definition ers.hpp:81
static std::optional< e_protocol_type > string_to_protocols(std::string s)

◆ create_session()

TransferSession & dunedaq::snbmodules::TransferClient::create_session ( GroupMetadata transfer_options,
e_session_type type,
std::string id,
const std::filesystem::path & work_dir,
IPFormat ip = IPFormat(),
const std::set< std::string > & dest_clients = std::set<std::string>() )

Create a new session, you can precise the IP address of the session forcing to use a specific network interface and different port that the client.

Parameters
transfer_optionsTransfer options for the protocol
typeType of the session (upload or download)
idID of the session
listening_ipListening IP\:PORT address of the session
Returns
Pointer to the new session

Definition at line 253 of file transfer_client.cpp.

259{
260 if (ip.is_default()) {
261 ip = get_ip();
262 }
263
264 TransferSession new_session(
265 std::move(transfer_options), type, id, ip, work_dir, get_bookkeepers_conn(), get_clients_conn());
266
267 m_sessions.emplace_back(std::move(new_session));
268 TLOG() << "debug : session created " << TransferSession::session_type_to_string(type);
269 m_sessions.back().set_target_clients(dest_clients);
270
271 return m_sessions.back();
272}
const std::set< std::string > & get_clients_conn() const
std::vector< TransferSession > m_sessions
Map of active sessions (key = session ID, value = session)

◆ do_work()

bool dunedaq::snbmodules::TransferClient::do_work ( std::atomic< bool > & running_flag)

Start function to use in a thread.

Parameters
running_flagFlag to stop the client
Returns
False if the client did not stopped correctly

Definition at line 79 of file transfer_client.cpp.

80{
81
82 while (running_flag.load()) {
83 if (get_bookkeepers_conn().size() == 0) {
84 TLOG() << "debug : no bookkeeper connected, looking for connections";
86 }
87 // When starting, client wait for notification from bookkeeper.
88 std::optional<NotificationData> msg = listen_for_notification(get_my_conn());
89 if (msg.has_value()) {
91
92 // print status of sessions
93 for (const auto& session : m_sessions) {
94 TLOG() << session.to_string();
95 }
96 }
97 }
98
99 return true;
100}
std::optional< NotificationData > listen_for_notification(const std::string &id, const std::string &expected_from="", int timeout=-1, int tries=-1)
Listen for a notification.
void lookups_connections()
Get the list of every connections, must have the prefix first in the name of the connection and the n...
bool action_on_receive_notification(NotificationData notif) override
Action to do when the client receive a notification.
FELIX Initialization std::string initerror FELIX queue timed std::string queuename Unexpected chunk size

◆ generate_session_id()

std::string dunedaq::snbmodules::TransferClient::generate_session_id ( const std::string & transferid,
const std::string & dest = "" )
private

Function to generate session ID.

Definition at line 477 of file transfer_client.cpp.

478{
479 std::string id = "";
480
481 id += dest_id == "" ? get_client_id() : dest_id;
482 id += "_ses";
483 id += transferid;
484
485 return id;
486}

◆ get_client_id()

std::string dunedaq::snbmodules::TransferClient::get_client_id ( ) const
inline

Definition at line 105 of file transfer_client.hpp.

105{ return m_client_id; }

◆ get_ip()

IPFormat dunedaq::snbmodules::TransferClient::get_ip ( ) const
inline

Definition at line 104 of file transfer_client.hpp.

104{ return m_listening_ip; }

◆ get_listening_dir()

std::filesystem::path dunedaq::snbmodules::TransferClient::get_listening_dir ( ) const
inline

Definition at line 106 of file transfer_client.hpp.

106{ return m_listening_dir; }

◆ get_my_conn()

std::string dunedaq::snbmodules::TransferClient::get_my_conn ( )

Definition at line 566 of file transfer_client.cpp.

567{
568 TLOG() << "BBB " << __LINE__ << " my_conn=\"" << m_my_conn << "\"";
569 if (m_my_conn.empty()) {
570 for (const std::string& c : get_clients_conn()) {
571 std::string str1 = c;
572 std::string str2 = get_client_id();
573
574 if (str1.starts_with("snbmodules_")) {
575 str1 = str1.substr(11);
576 }
577 if (str2.starts_with("snbmodules_")) {
578 str2 = str2.substr(11);
579 }
580
581 if (str1.starts_with("snb-sample-config-")) {
582 str1 = str1.substr(18);
583 }
584 if (str2.starts_with("snb-sample-config-")) {
585 str2 = str2.substr(18);
586 }
587
588 TLOG() << "BBB " << __LINE__ << " c=\"" << c << "\" client_id=" << get_client_id() << " " << str1 << " " << str2;
589 // if (c.find(get_client_id()) != std::string::npos)
590 if (str1 == str2) {
591 m_my_conn = c;
592 break;
593 }
594 }
595
596 if (m_my_conn.empty()) {
597 ers::warning(ConnectionNotFoundError(ERS_HERE, get_client_id()));
598 }
599 }
600 return m_my_conn;
601}
std::string m_my_conn
Connection uuid of the client, retrieved using the notification interface and calling get_my_conn()

◆ get_session()

TransferSession * dunedaq::snbmodules::TransferClient::get_session ( std::string transfer_id)

Definition at line 237 of file transfer_client.cpp.

238{
239 if (transfer_id.find("ses") == std::string::npos) {
240 transfer_id = generate_session_id(transfer_id);
241 }
242
243 for (auto& s : m_sessions) {
244 if (s.get_session_id() == transfer_id) {
245 return &s;
246 }
247 }
248 return nullptr;
249}

◆ get_sessions() [1/2]

std::vector< TransferSession > & dunedaq::snbmodules::TransferClient::get_sessions ( )
inline

Definition at line 108 of file transfer_client.hpp.

108{ return m_sessions; }

◆ get_sessions() [2/2]

const std::vector< TransferSession > & dunedaq::snbmodules::TransferClient::get_sessions ( ) const
inline

Definition at line 109 of file transfer_client.hpp.

109{ return m_sessions; }

◆ pause_transfer()

void dunedaq::snbmodules::TransferClient::pause_transfer ( const std::string & transfer_id)

Definition at line 196 of file transfer_client.cpp.

197{
198
199 TransferSession* session = get_session(transfer_id);
200
201 if (session == nullptr) {
202 ers::warning(SessionIDNotFoundInClientError(ERS_HERE, get_client_id(), transfer_id));
203 return;
204 }
205
206 session->pause_all();
207}

◆ remove_session()

void dunedaq::snbmodules::TransferClient::remove_session ( const std::string & session_id)
private

Add a session to the client.

Parameters
sessionSession to add

Remove a session from the client

Parameters
session_idID of the session to remove

Definition at line 461 of file transfer_client.cpp.

462{
463 auto s = get_session(session_id);
464 if (s == nullptr) {
465 ers::warning(SessionIDNotFoundInClientError(ERS_HERE, get_client_id(), session_id));
466 return;
467 }
468 for (size_t i = 0; i < m_sessions.size(); i++) {
469 if (s->get_session_id() == session_id) {
470 m_sessions.erase(m_sessions.begin() + i);
471 return;
472 }
473 }
474}

◆ resume_transfer()

void dunedaq::snbmodules::TransferClient::resume_transfer ( const std::string & transfer_id)

Definition at line 209 of file transfer_client.cpp.

210{
211
212 TransferSession* session = get_session(transfer_id);
213
214 if (session == nullptr) {
215 ers::warning(SessionIDNotFoundInClientError(ERS_HERE, get_client_id(), transfer_id));
216 return;
217 }
218
219 session->resume_all();
220}

◆ scan_available_files()

void dunedaq::snbmodules::TransferClient::scan_available_files ( std::set< std::filesystem::path > & previous_scan,
bool nested = false,
std::filesystem::path folder = std::filesystem::path() )

Scan available files in the listening directory.

Parameters
previous_scanSet of files already scanned
folderFolder to scan
nestedTrue if the function is called recursively

Definition at line 489 of file transfer_client.cpp.

492{
493 if (folder.empty()) {
494 folder = get_listening_dir();
495 }
496
497 TLOG() << "debug : scanning files in " << folder;
498
499 // First scan group metadata files
500 for (const auto& entry : std::filesystem::directory_iterator(folder)) {
501 if (entry.is_regular_file() && entry.path().extension() == GroupMetadata::m_file_extension) {
502 if (previous_scan.insert(entry.path()).second) {
503 TLOG() << "debug : found new file " << entry.path();
504
505 GroupMetadata metadata = GroupMetadata(entry.path());
506 bool already_active = false;
507
508 for (const auto& s : get_sessions()) {
509 if (s.get_transfer_options() == metadata) {
510 already_active = true;
511 break;
512 }
513 }
514
515 if (!already_active) {
516 std::string group_id_tmp = metadata.get_group_id();
517 create_session(metadata,
518 Uploader,
519 generate_session_id(group_id_tmp),
520 get_listening_dir().append("ses" + std::to_string(m_sessions.size())));
521 }
522 }
523 }
524 }
525
526 for (const auto& entry : std::filesystem::directory_iterator(folder)) {
527 if (entry.is_regular_file() && entry.path().extension() != GroupMetadata::m_file_extension) {
528 if (previous_scan.insert(entry.path()).second) {
529 TLOG() << "debug : found new file " << entry.path();
530 if (entry.path().extension() == TransferMetadata::m_file_extension) {
531 std::shared_ptr<TransferMetadata> metadata = std::make_shared<TransferMetadata>(entry.path());
532
533 bool wanted = false;
534 for (auto& s : get_sessions()) {
535 auto expect_ref = s.get_transfer_options().get_expected_files();
536 if (expect_ref.find(metadata->get_file_name()) != expect_ref.end()) {
537 s.add_file(metadata);
538 wanted = true;
539 break;
540 }
541 }
542 if (!wanted) {
543 // If not wanted yet (or never), delete the file from the scan list to find it again later
544 TLOG() << "debug : " << entry.path() << " not wanted yet, deleting from scan list";
545 previous_scan.erase(entry.path());
546 metadata.reset();
547 }
548 }
549 }
550 } else if (entry.is_directory()) {
551 TLOG() << "debug : found directory " << entry.path();
552 if (nested) {
553 scan_available_files(previous_scan, nested, entry.path()); // NOLINT
554 }
555 }
556 }
557}
static const std::string m_file_extension

◆ set_client_id()

void dunedaq::snbmodules::TransferClient::set_client_id ( std::string client_id)
inline

Definition at line 115 of file transfer_client.hpp.

115{ m_client_id = std::move(client_id); }

◆ set_ip()

void dunedaq::snbmodules::TransferClient::set_ip ( const std::string & ip)
inline

Definition at line 113 of file transfer_client.hpp.

113{ m_listening_ip.set_ip(ip); }
void set_ip(const std::string &ip)
Set the IP address, must be called after set_port() if the port is specified in the IP address ex 0....
Definition ip_format.hpp:80

◆ set_listening_dir()

void dunedaq::snbmodules::TransferClient::set_listening_dir ( const std::filesystem::path & listening_dir)
inline

Definition at line 116 of file transfer_client.hpp.

117 {
118 // remove all occurences of ./ in the file path
119 std::string file_path_str = listening_dir.string();
120 std::string x = "./";
121
122 size_t pos = 0;
123 while (true) {
124 pos = file_path_str.find(x, pos);
125 if (pos == std::string::npos) {
126 break;
127 }
128
129 file_path_str.replace(pos, x.length(), "");
130 }
131
132 m_listening_dir = std::filesystem::absolute(file_path_str);
133 }

◆ set_port()

void dunedaq::snbmodules::TransferClient::set_port ( int port)
inline

Definition at line 114 of file transfer_client.hpp.

114{ m_listening_ip.set_port(port); }

◆ share_available_files()

void dunedaq::snbmodules::TransferClient::share_available_files ( const std::set< std::filesystem::path > & to_share,
const std::string & dest )
private

Share available files (in m_listening_dir)

Parameters
to_shareSet of files to share
destDestination of the files

Definition at line 275 of file transfer_client.cpp.

276{
277
278 for (const std::filesystem::path& f : to_share) {
279 TLOG() << "debug : Sharing " << f.filename();
280 std::shared_ptr<TransferMetadata> fmeta = create_metadata_from_file(f);
281 m_available_files.insert({ f.string(), fmeta });
284 dest,
285 dest,
286 fmeta->export_to_string());
287 }
288}

◆ start()

bool dunedaq::snbmodules::TransferClient::start ( int timeout)

Start the client in curent thread.

Parameters
timeoutTimeout in seconds, after this time the client will stop
Returns
False if the client did not stopped correctly

Definition at line 56 of file transfer_client.cpp.

57{
58
59 while (true) {
60 // When starting, client wait for notification from bookkeeper.
61 std::optional<NotificationData> msg = listen_for_notification(get_my_conn(), "", timeout);
62 if (msg.has_value()) {
64 } else {
65 TLOG() << "debug : no notification received, timeout";
66 return false;
67 }
68
69 // print status of sessions
70 for (const auto& session : m_sessions) {
71 TLOG() << session.to_string();
72 }
73 }
74
75 return true;
76}

◆ start_transfer()

void dunedaq::snbmodules::TransferClient::start_transfer ( const std::string & transfer_id)

Start, pause, resume or cancel a transfer.

Parameters
transfer_idID of the transfer to start, pause, resume or cancel

Definition at line 182 of file transfer_client.cpp.

183{
184
185 TransferSession* session = get_session(transfer_id);
186
187 if (session == nullptr) {
188 ers::warning(SessionIDNotFoundInClientError(ERS_HERE, get_client_id(), transfer_id));
189 return;
190 }
191
192 session->start_all();
193}

Member Data Documentation

◆ m_available_files

std::map<std::string, std::shared_ptr<TransferMetadata> > dunedaq::snbmodules::TransferClient::m_available_files
private

Map of available files (key = file path, value = file metadata)

Definition at line 149 of file transfer_client.hpp.

◆ m_client_id

std::string dunedaq::snbmodules::TransferClient::m_client_id
private

TransferClient ID, unique identifier of the client.

Definition at line 140 of file transfer_client.hpp.

◆ m_listening_dir

std::filesystem::path dunedaq::snbmodules::TransferClient::m_listening_dir
private

Listening directory, directory where the client will listen for incoming files and files to share.

Definition at line 143 of file transfer_client.hpp.

◆ m_listening_ip

IPFormat dunedaq::snbmodules::TransferClient::m_listening_ip
private

IP address of the client.

Definition at line 137 of file transfer_client.hpp.

◆ m_my_conn

std::string dunedaq::snbmodules::TransferClient::m_my_conn = ""
private

Connection uuid of the client, retrieved using the notification interface and calling get_my_conn()

Definition at line 152 of file transfer_client.hpp.

◆ m_sessions

std::vector<TransferSession> dunedaq::snbmodules::TransferClient::m_sessions
private

Map of active sessions (key = session ID, value = session)

Definition at line 146 of file transfer_client.hpp.


The documentation for this class was generated from the following files: