DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
transfer_session.cpp
Go to the documentation of this file.
1
12
13#include <memory>
14#include <set>
15#include <string>
16#include <utility>
17#include <vector>
18
19namespace dunedaq::snbmodules {
20
22 e_session_type type,
23 std::string id,
24 const IPFormat& ip,
25 std::filesystem::path work_dir,
26 std::vector<std::string> bk_conn /*= std::vector<std::string>()*/,
27 std::set<std::string> client_conn /*= std::set<std::string>()*/)
28 : NotificationInterface(std::move(bk_conn), std::move(client_conn))
29 , m_type(type)
30 , m_session_id(std::move(id))
31 , m_ip(ip)
32 , m_transfer_options(std::move(transfer_options))
33 ,
34 // m_threads(std::vector<pid_t>()),
35 m_work_dir(std::move(work_dir))
36{
37 std::filesystem::create_directories(m_work_dir);
38
39 // Init transfer interface with the right protocol
42
43 // check if port is set
44 if (!m_transfer_options.get_protocol_options().contains("port")) {
45 ers::fatal(ConfigError(ERS_HERE, "port is mandatory in Bittorrent protocol options"));
46 return;
47 }
48
49 m_transfer_interface = std::make_unique<TransferInterfaceBittorrent>(
51
52 // Generate torrent files and magnet links
53 if (type == e_session_type::Uploader) {
54 TLOG() << "Generating torrent files...";
55 dynamic_cast<TransferInterfaceBittorrent&>(*m_transfer_interface).generate_torrents_files(m_work_dir, "");
56
57 for (auto f_meta : m_transfer_options.get_transfers_meta()) {
58 TLOG() << "Writing magnet link data into transfer Metadata "
59 << get_work_dir().append(f_meta->get_file_name() + ".torrent");
60 lt::error_code ec;
61 lt::torrent_info t(get_work_dir().append(f_meta->get_file_name() + ".torrent").string(), ec);
62
63 if (ec) {
64 ers::error(BittorrentInvalidTorrentFileError(ERS_HERE, ec.message()));
65 }
66
67 TLOG() << "Magnet link: " << lt::make_magnet_uri(t);
68 f_meta->set_magnet_link(lt::make_magnet_uri(t) + "&x.pe=" + get_ip().get_ip() + ":" +
69 m_transfer_options.get_protocol_options()["port"].get<std::string>());
70 }
71 }
72 break;
73
76 std::make_unique<TransferInterfaceSCP>(m_transfer_options, type == e_session_type::Uploader);
77 break;
78
80 m_transfer_interface = std::make_unique<TransferInterfaceRClone>(m_transfer_options, get_work_dir());
81 break;
82 }
83
85 m_transfer_interface = std::make_unique<TransferInterfaceDummy>(m_transfer_options);
86 break;
87
88 default:
89 ers::error(InvalidProtocolError(
91 break;
92 }
93
94 TLOG() << "debug : Transfer session " << get_session_id() << " created";
96}
97
99{
100 // TLOG() << "Reaping children";
101 // for (pid_t pid : m_threads)
102 // {
103 // TLOG() << "Killing pid " << pid;
104 // kill(pid, SIGINT);
105 // kill(pid, SIGKILL);
106 // }
107 // for (pid_t pid : m_threads)
108 // {
109 // TLOG() << "Waiting for pid " << pid;
110 // siginfo_t status;
111 // auto sts = waitid(P_PID, pid, &status, WEXITED);
112
113 // TLOG() << "Forked process " << pid << " exited with status " << status.si_status << " (wait status " << sts
114 // << ")";
115 // }
116
117 TLOG() << "DONE CLOSING SESSION " << get_session_id();
118}
119
120bool
122{
123 (void)notif;
124 // TODO : now in client
125 return true;
126}
127
128std::string
130{
131 std::string str;
132 str += "Session " + get_session_id() + " ";
133 str += "type " + TransferSession::session_type_to_string(m_type) + " ";
134 str += "listening on " + m_ip.get_ip_port() + "\n";
135
137
138 return str;
139}
140
141bool
143{
144 bool result = true;
145 for (const std::string& bk : get_bookkeepers_conn()) {
148 bk,
149 bk,
150 get_transfer_options().export_to_string());
151 }
152
153 for (std::shared_ptr<TransferMetadata> f_meta : m_transfer_options.get_transfers_meta()) {
154 result = result && update_metadata_to_bookkeeper(*f_meta);
155 }
156
157 return result;
158}
159
160bool
162{
163 bool result = true;
164 for (const std::string& bk : get_bookkeepers_conn()) {
167 bk,
168 bk,
169 f_meta.export_to_string_partial(false));
170 }
171 return result;
172}
173
174bool
176 const std::string& data /*= ""*/)
177{
178 bool result = true;
179 for (const std::string& client : get_target_clients()) {
180 std::string session_name = client + "_ses" + m_transfer_options.get_group_id();
181 result &= send_notification(type, get_session_id(), session_name, client, data);
182 }
183 return result;
184}
185
186bool
188{
189 bool res = false;
190 if (is_downloader()) {
191 res = download_file(f_meta, m_work_dir);
192 } else if (is_uploader()) {
193 res = upload_file(f_meta);
194 } else {
195 ers::error(SessionTypeNotSupportedError(ERS_HERE, get_session_id()));
196 }
197
198 return res;
199}
200
201bool
203{
206 ers::warning(SessionWrongStateTransitionError(ERS_HERE,
208 f_meta.get_file_name(),
211 return false;
212 }
213
215
216 bool res = m_transfer_interface->pause_file(f_meta);
217 if (!res) {
219 }
220
221 if (!is_multiple) {
224 }
225
226 return res;
227}
228
229bool
231{
233 ers::warning(SessionWrongStateTransitionError(ERS_HERE,
235 f_meta.get_file_name(),
237 "RESUMING"));
238 return false;
239 }
240
241 if (is_downloader()) {
243 } else if (is_uploader()) {
245 }
246
247 bool res = m_transfer_interface->resume_file(f_meta);
248 if (!res) {
250 }
251
252 if (!is_multiple) {
255 }
256
257 return res;
258}
259
260bool
262{
264 ers::warning(SessionWrongStateTransitionError(ERS_HERE,
266 f_meta.get_file_name(),
269 return false;
270 }
271
273
274 bool res = m_transfer_interface->hash_file(f_meta);
275 if (!res) {
277 }
278
279 if (!is_multiple) {
280 // send_notification_to_targets(notification_type::e_notification_type::HASH_TRANSFER, f_meta.get_file_path());
282 }
283 return res;
284}
285
286bool
288{
291 ers::warning(SessionWrongStateTransitionError(ERS_HERE,
293 f_meta.get_file_name(),
296 return false;
297 }
298
300
301 bool res = m_transfer_interface->cancel_file(f_meta);
302 if (!res) {
304 }
305
306 if (!is_multiple) {
309 }
310 return res;
311}
312
313bool
315{
317 ers::warning(SessionAccessToIncorrectActionError(ERS_HERE, get_session_id(), "upload_file"));
318 }
319
321 ers::warning(SessionWrongStateTransitionError(ERS_HERE,
323 f_meta.get_file_name(),
326 return false;
327 }
328
330
331 bool res = m_transfer_interface->upload_file(f_meta);
332 if (!res) {
334 }
335
336 if (!is_multiple) {
339 }
340 return res;
341}
342
343bool
344TransferSession::download_file(TransferMetadata& f_meta, std::filesystem::path dest, bool is_multiple)
345{
347 ers::warning(SessionAccessToIncorrectActionError(ERS_HERE, get_session_id(), "download_file"));
348 }
349
351 ers::warning(SessionWrongStateTransitionError(ERS_HERE,
353 f_meta.get_file_name(),
356 return false;
357 }
358
359 // wait for the uploader to be ready
360 std::this_thread::sleep_for(std::chrono::seconds(1));
361
363
364 bool res = m_transfer_interface->download_file(f_meta, std::move(dest));
365 if (!res) {
367 }
368 if (!is_multiple) {
370 }
371 return res;
372}
373
374bool
376{
377 if (is_downloader()) {
378 return download_all(m_work_dir);
379 } else if (is_uploader()) {
380 return upload_all();
381 } else {
382 ers::error(SessionTypeNotSupportedError(ERS_HERE, get_session_id()));
383 return false;
384 }
385}
386
387bool
389{
390
392
393 // wait 1 second
394 std::this_thread::sleep_for(std::chrono::seconds(1));
395
396 bool result = true;
397 for (auto file : m_transfer_options.get_transfers_meta()) {
398 result = result && pause_file(*file, true);
399 }
400
402 return result;
403}
404
405bool
407{
408 bool result = true;
409 for (auto file : m_transfer_options.get_transfers_meta()) {
410 result = result && resume_file(*file, true);
411 }
412
413 // wait 1 second
414 std::this_thread::sleep_for(std::chrono::seconds(1));
415
418 return result;
419}
420
421bool
423{
424 bool result = true;
425 for (auto file : m_transfer_options.get_transfers_meta()) {
426 result = result && cancel_file(*file, true);
427 }
428
431 return result;
432}
433
434// Downloaders only
435bool
436TransferSession::download_all(const std::filesystem::path& dest)
437{
439 ers::warning(SessionAccessToIncorrectActionError(ERS_HERE, get_session_id(), "download_all"));
440 }
441
442 bool result = true;
443 for (auto file : m_transfer_options.get_transfers_meta()) {
444 result = result && download_file(*file, dest, true);
445 }
447 return result;
448}
449
450// Uploaders only
451bool
453{
455 ers::warning(SessionAccessToIncorrectActionError(ERS_HERE, get_session_id(), "upload_all"));
456 }
457
458 bool result = true;
459 for (auto file : m_transfer_options.get_transfers_meta()) {
460 result = result && upload_file(*file, true);
461 }
464 return result;
465}
466
467} // namespace dunedaq::snbmodules
#define ERS_HERE
protocol_type::e_protocol_type get_protocol() const
std::vector< std::shared_ptr< TransferMetadata > > & get_transfers_meta()
nlohmann::json get_protocol_options() const
Class that represents an IP address and a port TODO: should be replaced by something better ?
Definition ip_format.hpp:26
std::string get_ip_port() const
Get the IP address and the port in the format "ip:port".
Definition ip_format.hpp:57
bool send_notification(const notification_type::e_notification_type &notif, const std::string &src, const std::string &dst, const std::string &id_conn, const std::string &data="", int tries=-1)
Send a notification during m_timeout_send ms.
const std::vector< std::string > & get_bookkeepers_conn() const
Init the connection interface, Only used for standalone application.
void generate_torrents_files(const std::filesystem::path &dest, const std::string &tracker)
status_type::e_status get_status() const
virtual std::string export_to_string_partial(bool force_all)
Export only the modified fields to a string.
std::filesystem::path get_file_path() const
void set_status(status_type::e_status status)
bool resume_file(TransferMetadata &f_meta, bool is_multiple=false)
bool is_downloader()
Usefull to check if the session is a downloader.
virtual ~TransferSession()
Destructor Kill all threads created by the session (TODO : useless ?)
bool hash_file(TransferMetadata &f_meta, bool is_multiple=false)
bool cancel_file(TransferMetadata &f_meta, bool is_multiple=false)
IPFormat m_ip
Ip of the client TODO : useless ? the session cannot have a unique connection.
bool action_on_receive_notification(NotificationData notif) override
handle actions to be taken when a notification is received. The notification is passed as a parameter...
TransferSession(TransferSession &&)=default
GroupMetadata m_transfer_options
A session must contain a transfer matadata. The session is basically a transfer wrapper.
bool upload_file(TransferMetadata &f_meta, bool is_multiple=false)
bool send_notification_to_targets(notification_type::e_notification_type type, const std::string &data="")
Send a notification to every targets of the session, generally for the uploader.
bool download_file(TransferMetadata &f_meta, std::filesystem::path dest, bool is_multiple=false)
std::filesystem::path get_work_dir() const
e_session_type m_type
Type of session, uploader or downloader. Used to block access to some functions.
bool update_metadata_to_bookkeeper(TransferMetadata &f_meta)
std::unique_ptr< TransferInterfaceAbstract > m_transfer_interface
Abstract interface can reference to any protocol used to transfer data.
bool is_uploader()
Usefull to check if the session is a uploader.
const GroupMetadata & get_transfer_options() const
const std::set< std::string > & get_target_clients() const
bool start_file(TransferMetadata &f_meta)
Start the session by downloading or uploading files depending on the type of session TODO : separate ...
static std::string session_type_to_string(e_session_type e)
bool pause_file(TransferMetadata &f_meta, bool is_multiple=false)
std::filesystem::path m_work_dir
List of threads created by the session TODO : useless ? should be handle in client.
bool download_all(const std::filesystem::path &dest)
#define TLOG(...)
Definition macro.hpp:22
e_session_type
Different type of session.
@ Downloader
TransferSession used to download files from uploaders client.
@ Uploader
TransferSession used to upload files to downloaders client.
void warning(const Issue &issue)
Definition ers.hpp:115
void fatal(const Issue &issue)
Definition ers.hpp:88
void error(const Issue &issue)
Definition ers.hpp:81
NotificationData class, represent a notification.
e_notification_type
Different type of notifications possible to send.
static std::string protocols_to_string(e_protocol_type e)
static std::string status_to_string(e_status e)