Line data Source code
1 :
2 : /**
3 : * @file transfer_session.cpp TransferSession class, wrapper to get access to the transfer interface and control states
4 : * of transfers
5 : *
6 : * This is part of the DUNE DAQ , copyright 2020.
7 : * Licensing/copyright details are in the COPYING file that you should have
8 : * received with this code.
9 : */
10 :
11 : #include "snbmodules/transfer_session.hpp"
12 :
13 : #include <memory>
14 : #include <set>
15 : #include <string>
16 : #include <utility>
17 : #include <vector>
18 :
19 : namespace dunedaq::snbmodules {
20 :
21 0 : TransferSession::TransferSession(GroupMetadata transfer_options,
22 : e_session_type type,
23 : std::string id,
24 : const IPFormat& ip,
25 : std::filesystem::path work_dir,
26 : std::vector<std::string> bk_conn /*= std::vector<std::string>()*/,
27 0 : std::set<std::string> client_conn /*= std::set<std::string>()*/)
28 0 : : NotificationInterface(std::move(bk_conn), std::move(client_conn))
29 0 : , m_type(type)
30 0 : , m_session_id(std::move(id))
31 0 : , m_ip(ip)
32 0 : , m_transfer_options(std::move(transfer_options))
33 : ,
34 : // m_threads(std::vector<pid_t>()),
35 0 : m_work_dir(std::move(work_dir))
36 : {
37 0 : std::filesystem::create_directories(m_work_dir);
38 :
39 : // Init transfer interface with the right protocol
40 0 : switch (m_transfer_options.get_protocol()) {
41 0 : case protocol_type::BITTORRENT:
42 :
43 : // check if port is set
44 0 : if (!m_transfer_options.get_protocol_options().contains("port")) {
45 0 : ers::fatal(ConfigError(ERS_HERE, "port is mandatory in Bittorrent protocol options"));
46 0 : return;
47 : }
48 :
49 0 : m_transfer_interface = std::make_unique<TransferInterfaceBittorrent>(
50 0 : m_transfer_options, type == e_session_type::Downloader, get_work_dir(), get_ip());
51 :
52 : // Generate torrent files and magnet links
53 0 : if (type == e_session_type::Uploader) {
54 0 : TLOG() << "Generating torrent files...";
55 0 : dynamic_cast<TransferInterfaceBittorrent&>(*m_transfer_interface).generate_torrents_files(m_work_dir, "");
56 :
57 0 : for (auto f_meta : m_transfer_options.get_transfers_meta()) {
58 0 : TLOG() << "Writing magnet link data into transfer Metadata "
59 0 : << get_work_dir().append(f_meta->get_file_name() + ".torrent");
60 0 : lt::error_code ec;
61 0 : lt::torrent_info t(get_work_dir().append(f_meta->get_file_name() + ".torrent").string(), ec);
62 :
63 0 : if (ec) {
64 0 : ers::error(BittorrentInvalidTorrentFileError(ERS_HERE, ec.message()));
65 : }
66 :
67 0 : TLOG() << "Magnet link: " << lt::make_magnet_uri(t);
68 0 : f_meta->set_magnet_link(lt::make_magnet_uri(t) + "&x.pe=" + get_ip().get_ip() + ":" +
69 0 : m_transfer_options.get_protocol_options()["port"].get<std::string>());
70 0 : }
71 : }
72 : break;
73 :
74 0 : case protocol_type::SCP:
75 0 : m_transfer_interface =
76 0 : std::make_unique<TransferInterfaceSCP>(m_transfer_options, type == e_session_type::Uploader);
77 0 : break;
78 :
79 0 : case protocol_type::RCLONE: {
80 0 : m_transfer_interface = std::make_unique<TransferInterfaceRClone>(m_transfer_options, get_work_dir());
81 0 : break;
82 : }
83 :
84 0 : case protocol_type::dummy:
85 0 : m_transfer_interface = std::make_unique<TransferInterfaceDummy>(m_transfer_options);
86 0 : break;
87 :
88 0 : default:
89 0 : ers::error(InvalidProtocolError(
90 0 : ERS_HERE, get_session_id(), protocol_type::protocols_to_string(m_transfer_options.get_protocol())));
91 0 : break;
92 : }
93 :
94 0 : TLOG() << "debug : Transfer session " << get_session_id() << " created";
95 0 : update_metadatas_to_bookkeeper();
96 0 : }
97 :
98 0 : TransferSession::~TransferSession()
99 : {
100 : // TLOG() << "Reaping children";
101 : // for (pid_t pid : m_threads)
102 : // {
103 : // TLOG() << "Killing pid " << pid;
104 : // kill(pid, SIGINT);
105 : // kill(pid, SIGKILL);
106 : // }
107 : // for (pid_t pid : m_threads)
108 : // {
109 : // TLOG() << "Waiting for pid " << pid;
110 : // siginfo_t status;
111 : // auto sts = waitid(P_PID, pid, &status, WEXITED);
112 :
113 : // TLOG() << "Forked process " << pid << " exited with status " << status.si_status << " (wait status " << sts
114 : // << ")";
115 : // }
116 :
117 0 : TLOG() << "DONE CLOSING SESSION " << get_session_id();
118 0 : }
119 :
120 : bool
121 0 : TransferSession::action_on_receive_notification(NotificationData notif)
122 : {
123 0 : (void)notif;
124 : // TODO : now in client
125 0 : return true;
126 : }
127 :
128 : std::string
129 0 : TransferSession::to_string() const
130 : {
131 0 : std::string str;
132 0 : str += "Session " + get_session_id() + " ";
133 0 : str += "type " + TransferSession::session_type_to_string(m_type) + " ";
134 0 : str += "listening on " + m_ip.get_ip_port() + "\n";
135 :
136 0 : str += m_transfer_options.to_string();
137 :
138 0 : return str;
139 0 : }
140 :
141 : bool
142 0 : TransferSession::update_metadatas_to_bookkeeper()
143 : {
144 0 : bool result = true;
145 0 : for (const std::string& bk : get_bookkeepers_conn()) {
146 0 : result = result && send_notification(notification_type::e_notification_type::GROUP_METADATA,
147 0 : get_session_id(),
148 : bk,
149 : bk,
150 0 : get_transfer_options().export_to_string());
151 : }
152 :
153 0 : for (std::shared_ptr<TransferMetadata> f_meta : m_transfer_options.get_transfers_meta()) {
154 0 : result = result && update_metadata_to_bookkeeper(*f_meta);
155 0 : }
156 :
157 0 : return result;
158 : }
159 :
160 : bool
161 0 : TransferSession::update_metadata_to_bookkeeper(TransferMetadata& f_meta)
162 : {
163 0 : bool result = true;
164 0 : for (const std::string& bk : get_bookkeepers_conn()) {
165 0 : result = result && send_notification(notification_type::e_notification_type::TRANSFER_METADATA,
166 0 : get_session_id(),
167 : bk,
168 : bk,
169 0 : f_meta.export_to_string_partial(false));
170 : }
171 0 : return result;
172 : }
173 :
174 : bool
175 0 : TransferSession::send_notification_to_targets(notification_type::e_notification_type type,
176 : const std::string& data /*= ""*/)
177 : {
178 0 : bool result = true;
179 0 : for (const std::string& client : get_target_clients()) {
180 0 : std::string session_name = client + "_ses" + m_transfer_options.get_group_id();
181 0 : result &= send_notification(type, get_session_id(), session_name, client, data);
182 0 : }
183 0 : return result;
184 : }
185 :
186 : bool
187 0 : TransferSession::start_file(TransferMetadata& f_meta)
188 : {
189 0 : bool res = false;
190 0 : if (is_downloader()) {
191 0 : res = download_file(f_meta, m_work_dir);
192 0 : } else if (is_uploader()) {
193 0 : res = upload_file(f_meta);
194 : } else {
195 0 : ers::error(SessionTypeNotSupportedError(ERS_HERE, get_session_id()));
196 : }
197 :
198 0 : return res;
199 : }
200 :
201 : bool
202 0 : TransferSession::pause_file(TransferMetadata& f_meta, bool is_multiple)
203 : {
204 0 : if (f_meta.get_status() != status_type::e_status::DOWNLOADING &&
205 : f_meta.get_status() != status_type::e_status::UPLOADING) {
206 0 : ers::warning(SessionWrongStateTransitionError(ERS_HERE,
207 0 : get_session_id(),
208 0 : f_meta.get_file_name(),
209 0 : status_type::status_to_string(f_meta.get_status()),
210 0 : status_type::status_to_string(status_type::e_status::PAUSED)));
211 0 : return false;
212 : }
213 :
214 0 : f_meta.set_status(status_type::e_status::PAUSED);
215 :
216 0 : bool res = m_transfer_interface->pause_file(f_meta);
217 0 : if (!res) {
218 0 : f_meta.set_status(status_type::e_status::ERROR);
219 : }
220 :
221 0 : if (!is_multiple) {
222 0 : send_notification_to_targets(notification_type::e_notification_type::PAUSE_TRANSFER, f_meta.get_file_path());
223 0 : update_metadata_to_bookkeeper(f_meta);
224 : }
225 :
226 : return res;
227 : }
228 :
229 : bool
230 0 : TransferSession::resume_file(TransferMetadata& f_meta, bool is_multiple)
231 : {
232 0 : if (f_meta.get_status() != status_type::e_status::PAUSED) {
233 0 : ers::warning(SessionWrongStateTransitionError(ERS_HERE,
234 0 : get_session_id(),
235 0 : f_meta.get_file_name(),
236 0 : status_type::status_to_string(f_meta.get_status()),
237 0 : "RESUMING"));
238 0 : return false;
239 : }
240 :
241 0 : if (is_downloader()) {
242 0 : f_meta.set_status(status_type::e_status::DOWNLOADING);
243 0 : } else if (is_uploader()) {
244 0 : f_meta.set_status(status_type::e_status::UPLOADING);
245 : }
246 :
247 0 : bool res = m_transfer_interface->resume_file(f_meta);
248 0 : if (!res) {
249 0 : f_meta.set_status(status_type::e_status::ERROR);
250 : }
251 :
252 0 : if (!is_multiple) {
253 0 : send_notification_to_targets(notification_type::e_notification_type::RESUME_TRANSFER, f_meta.get_file_path());
254 0 : update_metadata_to_bookkeeper(f_meta);
255 : }
256 :
257 : return res;
258 : }
259 :
260 : bool
261 0 : TransferSession::hash_file(TransferMetadata& f_meta, bool is_multiple)
262 : {
263 0 : if (f_meta.get_status() != status_type::e_status::FINISHED) {
264 0 : ers::warning(SessionWrongStateTransitionError(ERS_HERE,
265 0 : get_session_id(),
266 0 : f_meta.get_file_name(),
267 0 : status_type::status_to_string(f_meta.get_status()),
268 0 : status_type::status_to_string(status_type::e_status::HASHING)));
269 0 : return false;
270 : }
271 :
272 0 : f_meta.set_status(status_type::e_status::HASHING);
273 :
274 0 : bool res = m_transfer_interface->hash_file(f_meta);
275 0 : if (!res) {
276 0 : f_meta.set_status(status_type::e_status::ERROR);
277 : }
278 :
279 0 : if (!is_multiple) {
280 : // send_notification_to_targets(notification_type::e_notification_type::HASH_TRANSFER, f_meta.get_file_path());
281 0 : update_metadata_to_bookkeeper(f_meta);
282 : }
283 : return res;
284 : }
285 :
286 : bool
287 0 : TransferSession::cancel_file(TransferMetadata& f_meta, bool is_multiple)
288 : {
289 0 : if (f_meta.get_status() == status_type::e_status::FINISHED ||
290 : f_meta.get_status() == status_type::e_status::CANCELLED) {
291 0 : ers::warning(SessionWrongStateTransitionError(ERS_HERE,
292 0 : get_session_id(),
293 0 : f_meta.get_file_name(),
294 0 : status_type::status_to_string(f_meta.get_status()),
295 0 : status_type::status_to_string(status_type::e_status::CANCELLED)));
296 0 : return false;
297 : }
298 :
299 0 : f_meta.set_status(status_type::e_status::CANCELLED);
300 :
301 0 : bool res = m_transfer_interface->cancel_file(f_meta);
302 0 : if (!res) {
303 0 : f_meta.set_status(status_type::e_status::ERROR);
304 : }
305 :
306 0 : if (!is_multiple) {
307 0 : send_notification_to_targets(notification_type::e_notification_type::CANCEL_TRANSFER, f_meta.get_file_path());
308 0 : update_metadata_to_bookkeeper(f_meta);
309 : }
310 : return res;
311 : }
312 :
313 : bool
314 0 : TransferSession::upload_file(TransferMetadata& f_meta, bool is_multiple)
315 : {
316 0 : if (m_type != e_session_type::Uploader) {
317 0 : ers::warning(SessionAccessToIncorrectActionError(ERS_HERE, get_session_id(), "upload_file"));
318 : }
319 :
320 0 : if (f_meta.get_status() != status_type::e_status::WAITING) {
321 0 : ers::warning(SessionWrongStateTransitionError(ERS_HERE,
322 0 : get_session_id(),
323 0 : f_meta.get_file_name(),
324 0 : status_type::status_to_string(f_meta.get_status()),
325 0 : status_type::status_to_string(status_type::e_status::UPLOADING)));
326 0 : return false;
327 : }
328 :
329 0 : f_meta.set_status(status_type::e_status::UPLOADING);
330 :
331 0 : bool res = m_transfer_interface->upload_file(f_meta);
332 0 : if (!res) {
333 0 : f_meta.set_status(status_type::e_status::ERROR);
334 : }
335 :
336 0 : if (!is_multiple) {
337 0 : send_notification_to_targets(notification_type::e_notification_type::START_TRANSFER, f_meta.get_file_path());
338 0 : update_metadata_to_bookkeeper(f_meta);
339 : }
340 : return res;
341 : }
342 :
343 : bool
344 0 : TransferSession::download_file(TransferMetadata& f_meta, std::filesystem::path dest, bool is_multiple)
345 : {
346 0 : if (m_type != e_session_type::Downloader) {
347 0 : ers::warning(SessionAccessToIncorrectActionError(ERS_HERE, get_session_id(), "download_file"));
348 : }
349 :
350 0 : if (f_meta.get_status() != status_type::e_status::WAITING) {
351 0 : ers::warning(SessionWrongStateTransitionError(ERS_HERE,
352 0 : get_session_id(),
353 0 : f_meta.get_file_name(),
354 0 : status_type::status_to_string(f_meta.get_status()),
355 0 : status_type::status_to_string(status_type::e_status::DOWNLOADING)));
356 0 : return false;
357 : }
358 :
359 : // wait for the uploader to be ready
360 0 : std::this_thread::sleep_for(std::chrono::seconds(1));
361 :
362 0 : f_meta.set_status(status_type::e_status::DOWNLOADING);
363 :
364 0 : bool res = m_transfer_interface->download_file(f_meta, std::move(dest));
365 0 : if (!res) {
366 0 : f_meta.set_status(status_type::e_status::ERROR);
367 : }
368 0 : if (!is_multiple) {
369 0 : update_metadata_to_bookkeeper(f_meta);
370 : }
371 : return res;
372 : }
373 :
374 : bool
375 0 : TransferSession::start_all()
376 : {
377 0 : if (is_downloader()) {
378 0 : return download_all(m_work_dir);
379 0 : } else if (is_uploader()) {
380 0 : return upload_all();
381 : } else {
382 0 : ers::error(SessionTypeNotSupportedError(ERS_HERE, get_session_id()));
383 0 : return false;
384 : }
385 : }
386 :
387 : bool
388 0 : TransferSession::pause_all()
389 : {
390 :
391 0 : send_notification_to_targets(notification_type::e_notification_type::PAUSE_TRANSFER);
392 :
393 : // wait 1 second
394 0 : std::this_thread::sleep_for(std::chrono::seconds(1));
395 :
396 0 : bool result = true;
397 0 : for (auto file : m_transfer_options.get_transfers_meta()) {
398 0 : result = result && pause_file(*file, true);
399 0 : }
400 :
401 0 : update_metadatas_to_bookkeeper();
402 0 : return result;
403 : }
404 :
405 : bool
406 0 : TransferSession::resume_all()
407 : {
408 0 : bool result = true;
409 0 : for (auto file : m_transfer_options.get_transfers_meta()) {
410 0 : result = result && resume_file(*file, true);
411 0 : }
412 :
413 : // wait 1 second
414 0 : std::this_thread::sleep_for(std::chrono::seconds(1));
415 :
416 0 : send_notification_to_targets(notification_type::e_notification_type::RESUME_TRANSFER);
417 0 : update_metadatas_to_bookkeeper();
418 0 : return result;
419 : }
420 :
421 : bool
422 0 : TransferSession::cancel_all()
423 : {
424 0 : bool result = true;
425 0 : for (auto file : m_transfer_options.get_transfers_meta()) {
426 0 : result = result && cancel_file(*file, true);
427 0 : }
428 :
429 0 : send_notification_to_targets(notification_type::e_notification_type::CANCEL_TRANSFER);
430 0 : update_metadatas_to_bookkeeper();
431 0 : return result;
432 : }
433 :
434 : // Downloaders only
435 : bool
436 0 : TransferSession::download_all(const std::filesystem::path& dest)
437 : {
438 0 : if (m_type != e_session_type::Downloader) {
439 0 : ers::warning(SessionAccessToIncorrectActionError(ERS_HERE, get_session_id(), "download_all"));
440 : }
441 :
442 0 : bool result = true;
443 0 : for (auto file : m_transfer_options.get_transfers_meta()) {
444 0 : result = result && download_file(*file, dest, true);
445 0 : }
446 0 : update_metadatas_to_bookkeeper();
447 0 : return result;
448 : }
449 :
450 : // Uploaders only
451 : bool
452 0 : TransferSession::upload_all()
453 : {
454 0 : if (m_type != e_session_type::Uploader) {
455 0 : ers::warning(SessionAccessToIncorrectActionError(ERS_HERE, get_session_id(), "upload_all"));
456 : }
457 :
458 0 : bool result = true;
459 0 : for (auto file : m_transfer_options.get_transfers_meta()) {
460 0 : result = result && upload_file(*file, true);
461 0 : }
462 0 : send_notification_to_targets(notification_type::e_notification_type::START_TRANSFER);
463 0 : update_metadatas_to_bookkeeper();
464 0 : return result;
465 : }
466 :
467 : } // namespace dunedaq::snbmodules
|