DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
dunedaq::snbmodules::TransferInterfaceRClone Class Reference

#include <transfer_interface_RClone.hpp>

Inheritance diagram for dunedaq::snbmodules::TransferInterfaceRClone:
[legend]
Collaboration diagram for dunedaq::snbmodules::TransferInterfaceRClone:
[legend]

Classes

struct  parameters
 

Public Member Functions

 TransferInterfaceRClone (GroupMetadata &config, const std::filesystem::path &work_dir)
 
virtual ~TransferInterfaceRClone ()
 
bool upload_file (TransferMetadata &f_meta) override
 
bool download_file (TransferMetadata &f_meta, std::filesystem::path dest) override
 
bool pause_file (TransferMetadata &f_meta) override
 
bool resume_file (TransferMetadata &f_meta) override
 
bool hash_file (TransferMetadata &f_meta) override
 
bool cancel_file (TransferMetadata &f_meta) override
 
- Public Member Functions inherited from dunedaq::snbmodules::TransferInterfaceAbstract
 TransferInterfaceAbstract (GroupMetadata &config)
 
virtual ~TransferInterfaceAbstract ()=default
 
GroupMetadataget_transfer_options ()
 

Private Member Functions

std::optional< nlohmann::json > requestRPC (const std::string &method, const std::string &input)
 
void do_work (std::atomic< bool > &running)
 

Private Attributes

struct dunedaq::snbmodules::TransferInterfaceRClone::parameters m_params
 
std::map< TransferMetadata *, int > m_jobs_id
 
std::filesystem::path m_work_dir
 
dunedaq::utilities::WorkerThread m_thread
 

Additional Inherited Members

- Protected Attributes inherited from dunedaq::snbmodules::TransferInterfaceAbstract
GroupMetadatam_config
 MetadataAbstract of the transfer, contain settings and status of the transfer.
 

Detailed Description

Definition at line 33 of file transfer_interface_RClone.hpp.

Constructor & Destructor Documentation

◆ TransferInterfaceRClone()

dunedaq::snbmodules::TransferInterfaceRClone::TransferInterfaceRClone ( GroupMetadata & config,
const std::filesystem::path & work_dir )
inline

Definition at line 37 of file transfer_interface_RClone.hpp.

39 , m_work_dir(work_dir)
40 , m_thread([&](std::atomic<bool>& running) { this->do_work(running); })
41 {
42 RcloneInitialize();
44
45 // protocol parameters
46
47 if (config.get_protocol_options().contains("protocol")) {
48 m_params.protocol = config.get_protocol_options()["protocol"].get<std::string>();
49 }
50
51 if (m_params.protocol == "sftp") {
52 if (config.get_protocol_options().contains("user")) {
53 m_params.user = config.get_protocol_options()["user"].get<std::string>();
54 }
55 }
56
57 if (config.get_protocol_options().contains("rate_limit")) {
58 m_params.bwlimit = config.get_protocol_options()["rate_limit"].get<std::string>();
59 }
60 if (config.get_protocol_options().contains("refresh_rate")) {
61 m_params.refresh_rate = config.get_protocol_options()["refresh_rate"].get<int>();
62 }
63 if (config.get_protocol_options().contains("port")) {
64 m_params.port = config.get_protocol_options()["port"].get<int>();
65 }
66
67 // config parameters
68 if (config.get_protocol_options().contains("simult_transfers")) {
69 m_params.simult_transfers = config.get_protocol_options()["simult_transfers"].get<int>();
70 }
71 if (config.get_protocol_options().contains("transfer_threads")) {
72 m_params.transfer_threads = config.get_protocol_options()["transfer_threads"].get<int>();
73 }
74 if (config.get_protocol_options().contains("checkers_threads")) {
75 m_params.checkers_threads = config.get_protocol_options()["checkers_threads"].get<int>();
76 }
77
78 if (config.get_protocol_options().contains("chunk_size")) {
79 m_params.chunk_size = config.get_protocol_options()["chunk_size"].get<std::string>();
80 }
81 if (config.get_protocol_options().contains("buffer_size")) {
82 m_params.buffer_size = config.get_protocol_options()["buffer_size"].get<std::string>();
83 }
84 if (config.get_protocol_options().contains("use_mmap")) {
85 m_params.use_mmap = config.get_protocol_options()["use_mmap"].get<bool>();
86 }
87 if (config.get_protocol_options().contains("checksum")) {
88 m_params.checksum = config.get_protocol_options()["checksum"].get<bool>();
89 }
90 if (config.get_protocol_options().contains("root_folder")) {
91 m_params.root_folder = std::filesystem::absolute(config.get_protocol_options()["root_folder"].get<std::string>());
92 }
93
94 char* input_request = new char[100];
95 sprintf(input_request,
96 "{"
97 "\"rate\": \"%s\""
98 "}",
99 m_params.bwlimit.c_str());
100
101 requestRPC("core/bwlimit", input_request);
102 delete[] input_request;
103
104 requestRPC("options/set",
105 "{"
106 "\"vfs\": "
107 "{"
108 // "\"ChunkSize\": 8388608,"
109 "\"CacheMaxSize\": 0"
110 "}"
111 "}");
112
113 // all global options
114 requestRPC("options/get", "{}");
115 }
struct dunedaq::snbmodules::TransferInterfaceRClone::parameters m_params
std::optional< nlohmann::json > requestRPC(const std::string &method, const std::string &input)
void start_working_thread(const std::string &name="noname")
Start the working thread (which executes the do_work() function)

◆ ~TransferInterfaceRClone()

virtual dunedaq::snbmodules::TransferInterfaceRClone::~TransferInterfaceRClone ( )
inlinevirtual

Definition at line 117 of file transfer_interface_RClone.hpp.

118 {
120
121 RcloneFinalize();
122 }
void stop_working_thread()
Stop the working thread.

Member Function Documentation

◆ cancel_file()

bool dunedaq::snbmodules::TransferInterfaceRClone::cancel_file ( TransferMetadata & f_meta)
inlineoverridevirtual

Implements dunedaq::snbmodules::TransferInterfaceAbstract.

Definition at line 394 of file transfer_interface_RClone.hpp.

395 {
396 TLOG() << "debug : RClone : Cancelling file " << f_meta.get_file_name();
397
398 // find job id
399 int job_id = 0;
400 if (m_jobs_id.find(&f_meta) != m_jobs_id.end()) {
401 job_id = m_jobs_id[&f_meta];
402 } else {
403 TLOG() << "debug : RClone : Job id not found";
404 return false;
405 }
406
407 char* input_request = new char[100];
408 sprintf(input_request,
409 "{"
410 "\"jobid\": %d"
411 "}",
412 job_id);
413
414 requestRPC("job/stop", input_request);
415 delete[] input_request;
416
417 return true;
418 }
#define TLOG(...)
Definition macro.hpp:22

◆ do_work()

void dunedaq::snbmodules::TransferInterfaceRClone::do_work ( std::atomic< bool > & running)
inlineprivate

Definition at line 465 of file transfer_interface_RClone.hpp.

466 {
467 TLOG() << "debug : running thread ";
468
469 while (running.load()) {
470 // requestRPC("cache/stats", "{}");
471 // requestRPC("core/memstats", "{}");
472 auto stats = requestRPC("core/stats", "{}");
473
474 if (stats.has_value()) {
475 if (stats.value()["transferring"] != nullptr) {
476 auto current_transfers = stats.value()["transferring"].get<std::vector<nlohmann::json>>();
477
478 for (const auto& t : current_transfers) {
479 auto grp = t["group"].get<std::string>();
480 int job_id = std::stoi(grp.substr(grp.find("/") + 1));
481
482 for (auto& [meta, id] : m_jobs_id) {
483 if (id == job_id) {
484 meta->set_progress(t["percentage"].get<int>());
485 meta->set_transmission_speed(t["speedAvg"].get<int32_t>());
486 break;
487 }
488 }
489 }
490 }
491 }
492
493 // Update information about ongoing transfers
494 for (auto& [meta, id] : m_jobs_id) {
495 // get refreshed infos
496 char* input_request = new char[100];
497 sprintf(input_request,
498 "{"
499 "\"jobid\": %d"
500 "}",
501 id);
502
503 auto res = requestRPC("job/status", input_request);
504 delete[] input_request;
505
506 if (res.has_value()) {
507 if (res.value()["finished"] != nullptr && res.value()["finished"].get<bool>()) {
508 if (res.value()["success"].get<bool>()) {
509 meta->set_status(status_type::e_status::FINISHED);
510 meta->set_progress(100);
511 } else {
512 meta->set_status(status_type::e_status::ERROR);
513 meta->set_error_code(res.value()["error"].get<std::string>());
514 }
515 meta->set_transmission_speed(0);
516 }
517 }
518 }
519
520 // wait
521 std::this_thread::sleep_for(std::chrono::seconds(m_params.refresh_rate));
522 }
523
524 // TODO Leo joly 11/09/2023 : segmentation fault
525 // for (auto &meta : get_transfer_options().get_transfers_meta())
526 // {
527 // if (meta->get_status() == status_type::e_status::UPLOADING)
528 // {
529 // meta->set_status(status_type::e_status::FINISHED);
530 // }
531 // if (meta->get_status() == status_type::e_status::DOWNLOADING)
532 // {
533 // meta->set_status(status_type::e_status::ERROR);
534 // meta->set_error_code("Transfer interrupted");
535 // }
536 // }
537 }

◆ download_file()

bool dunedaq::snbmodules::TransferInterfaceRClone::download_file ( TransferMetadata & f_meta,
std::filesystem::path dest )
inlineoverridevirtual

Implements dunedaq::snbmodules::TransferInterfaceAbstract.

Definition at line 150 of file transfer_interface_RClone.hpp.

151 {
152 TLOG() << "debug : RClone : Downloading file " << f_meta.get_file_name();
153
154 char* input_request = new char[1000];
155
156 if (m_params.protocol == "http") {
157 std::string file_relative_path =
158 std::filesystem::relative(f_meta.get_file_path(), m_params.root_folder).generic_string();
159
160 // check if file path is relative to root folder
161 if (file_relative_path.find("..") != std::string::npos) {
162 TLOG() << "debug : RClone : File path is not relative to root folder";
163 f_meta.set_status(status_type::e_status::ERROR);
164 f_meta.set_error_code("File path is not relative to root folder !");
165 return false;
166 }
167
168 sprintf(input_request,
169 "{"
170 "\"srcFs\": "
171 "{"
172 "\"type\": \"http\"," // HTTP/WebDAV/FTP/SFTP/DLNA/DOCKER
173 "\"url\": \"http://%s:%d\""
174 "},"
175 "\"srcRemote\": \"%s\","
176
177 "\"dstFs\": "
178 "\"/\","
179 "\"dstRemote\": \"%s\","
180
181 "\"_config\": {"
182
183 "\"BindAddr\": \"\","
184
185 "\"MultiThreadSet\": true,"
186 "\"Transfers\": %d,"
187 "\"Checkers\": %d,"
188 "\"MultiThreadStreams\": %d,"
189 "\"MultiThreadCutoff\": \"%s\","
190 "\"StreamingUploadCutoff\": \"%s\","
191
192 "\"UseMmap\": %s,"
193 "\"CheckSum\": %s,"
194 "\"BufferSize\": \"%s\","
195 "\"ErrorOnNoTransfer\": true"
196 "},"
197
198 "\"_async\": true"
199 "}",
200
201 // source
202 f_meta.get_src().get_ip().c_str(),
203 m_params.port,
204 file_relative_path.c_str(),
205
206 // destination
207 dest.append(f_meta.get_file_name()).string().c_str(),
208
209 // config
210 m_params.simult_transfers,
211 m_params.transfer_threads,
212 m_params.checkers_threads,
213 m_params.chunk_size.c_str(),
214 m_params.chunk_size.c_str(),
215 m_params.use_mmap ? "true" : "false",
216 m_params.checksum ? "true" : "false",
217 m_params.buffer_size.c_str());
218 } else if (m_params.protocol == "sftp") {
219 sprintf(input_request,
220 "{"
221 "\"srcFs\": "
222 // "\":%s,host=\'%s\',user=\'%s\',key_file=\'/home/ljoly/.ssh/id_rsa\':/\","
223
224 // "{"
225 // "\"type\": \"http\"," // HTTP/WebDAV/FTP/SFTP/DLNA/DOCKER
226 // "\"url\": \"http://127.0.0.1:8000\""
227 // // "\"no-head\": \"true\","
228 // "},"
229
230 "{"
231 "\"type\": \"sftp\"," // HTTP/WebDAV/FTP/SFTP/DLNA/DOCKER
232 "\"host\": \"%s\","
233 "\"user\": \"%s\","
234 "\"port\": \"%d\","
235 "\"key_file\": \"/home/ljoly/.ssh/id_rsa\","
236 "\"disable_concurrent_writes\": \"false\","
237 "\"concurrency\": \"%d\""
238 // "\"pubkey_file\": \"/home/ljoly/.ssh/id_rsa.pub\","
239 // "\"known_hosts_file\": \"/home/ljoly/.ssh/known_hosts\","
240 // "\"_path\": \"/home/ljoly/N23-04-17\""
241 // "\"_name\": \"distant\""
242 "},"
243 "\"srcRemote\": \"../..%s\","
244
245 // "{" // not supported enought
246 // "\"type\": \"ftp\"," // HTTP/WebDAV/FTP/SFTP/DLNA/DOCKER
247 // "\"host\": \"%s\","
248 // "\"user\": \"anonymous\","
249 // "\"port\": \"2121\","
250 // "\"pass\": \"LXOcWWr5yMievDLHrJ2IapoJZB_AQhinb9V0\""
251 // "},"
252
253 // "{"
254 // "\"type\": \"chunker\","
255 // "\"remote\": \":%s,host=\'%s\',user=\'%s\',key_file=\'/home/ljoly/.ssh/id_rsa\':%s\","
256 // "\"chunk_size\": \"1Gi\","
257 // "\"hash_type\": \"md5\""
258 // "},"
259
260 "\"dstFs\": "
261 "\"/\","
262 "\"dstRemote\": \"%s\","
263
264 "\"_config\": {"
265
266 "\"BindAddr\": \"\","
267
268 "\"MultiThreadSet\": true,"
269 "\"Transfers\": %d,"
270 "\"Checkers\": %d,"
271 "\"MultiThreadStreams\": %d,"
272 "\"MultiThreadCutoff\": \"%s\","
273
274 "\"UseMmap\": %s,"
275 "\"CheckSum\": %s,"
276 "\"BufferSize\": \"%s\","
277 "\"ErrorOnNoTransfer\": true"
278 "},"
279
280 "\"_async\": true"
281 "}",
282
283 // source
284 // m_params.type.c_str(),
285 f_meta.get_src().get_ip().c_str(),
286 m_params.user.c_str(),
287 m_params.port,
288 m_params.simult_transfers,
289
290 // f_meta.get_file_path().remove_filename().string().c_str(),
291 f_meta.get_file_path().string().c_str(),
292 // destination
293 // dest.string().c_str(),
294 dest.append(f_meta.get_file_name()).string().c_str(),
295
296 // config
297 m_params.simult_transfers,
298 m_params.transfer_threads,
299 m_params.checkers_threads,
300 m_params.chunk_size.c_str(),
301 m_params.use_mmap ? "true" : "false",
302 m_params.checksum ? "true" : "false",
303 m_params.buffer_size.c_str());
304 }
305
306 auto res = requestRPC("operations/copyfile", input_request);
307 TLOG() << "Requested copyfile operation with parameters : " << input_request;
308
309 // sprintf(input_request, "{"
310 // "\"url\": \"http://%s:8080%s\","
311
312 // "\"fs\": \"/\","
313 // "\"remote\": \"%s\","
314
315 // "\"autoFilename\": true,"
316
317 // // "\"_config\": {"
318 // // // for this file only
319 // // // "\"Progress\": true,"
320 // // // general config to move
321 // // "\"CheckSum\": true,"
322 // // "\"BindAddr\": \"\","
323 // // "\"BufferSize\": 67108864,"
324 // // "\"CheckSum\": true,"
325 // // "\"ErrorOnNoTransfer\": true,"
326 // // "\"MultiThreadSet\": true,"
327 // // "\"MultiThreadStreams\": 4,"
328 // // "\"Transfers\": 200,"
329 // // "\"UseMmap\": true"
330 // // "},"
331
332 // "\"_async\": true"
333 // "}",
334
335 // // source
336 // f_meta.get_src().get_ip().c_str(),
337 // f_meta.get_file_path().string().c_str(),
338
339 // // destination
340 // dest.string().c_str());
341
342 // auto res = requestRPC("operations/copyurl", input_request);
343
344 TLOG() << input_request;
345
346 delete[] input_request;
347 if (res.has_value()) {
348 m_jobs_id[&f_meta] = res.value()["jobid"];
349 } else {
350 return false;
351 }
352
353 // print local options of the transfer
354 requestRPC("options/local", "");
355
356 return true;
357 }

◆ hash_file()

bool dunedaq::snbmodules::TransferInterfaceRClone::hash_file ( TransferMetadata & f_meta)
inlineoverridevirtual

Implements dunedaq::snbmodules::TransferInterfaceAbstract.

Definition at line 387 of file transfer_interface_RClone.hpp.

388 {
389 TLOG() << "debug : RClone : Hashing file " << f_meta.get_file_name();
390
391 return true;
392 }

◆ pause_file()

bool dunedaq::snbmodules::TransferInterfaceRClone::pause_file ( TransferMetadata & f_meta)
inlineoverridevirtual

Implements dunedaq::snbmodules::TransferInterfaceAbstract.

Definition at line 359 of file transfer_interface_RClone.hpp.

360 {
361 TLOG() << "debug : RClone : Pausing file " << f_meta.get_file_name();
362 ers::warning(RCloneNotSupportError(ERS_HERE, "pausing a single file. Pausing everything."));
363
364 requestRPC("core/bwlimit", "{\"bytesPerSecond\": 0}");
365
366 return true;
367 }
#define ERS_HERE
void warning(const Issue &issue)
Definition ers.hpp:115

◆ requestRPC()

std::optional< nlohmann::json > dunedaq::snbmodules::TransferInterfaceRClone::requestRPC ( const std::string & method,
const std::string & input )
inlineprivate

Definition at line 446 of file transfer_interface_RClone.hpp.

447 {
448 char* m = strdup(method.c_str());
449 char* in = strdup(input.c_str());
450
451 struct RcloneRPCResult out = RcloneRPC(m, in);
452 TLOG_DEBUG(2) << "debug : RClone : result status: " << out.Status;
453 TLOG_DEBUG(2) << "debug : RClone : result output: " << out.Output;
454 nlohmann::json j = nlohmann::json::parse(out.Output);
455 free(out.Output); // NOLINT
456
457 if (out.Status != 200) {
458 return std::nullopt;
459 }
460 return j;
461 }
#define TLOG_DEBUG(lvl,...)
Definition Logging.hpp:112
FELIX Initialization std::string initerror FELIX queue timed out

◆ resume_file()

bool dunedaq::snbmodules::TransferInterfaceRClone::resume_file ( TransferMetadata & f_meta)
inlineoverridevirtual

Implements dunedaq::snbmodules::TransferInterfaceAbstract.

Definition at line 369 of file transfer_interface_RClone.hpp.

370 {
371 TLOG() << "debug : RClone : Resuming file " << f_meta.get_file_name();
372 ers::warning(RCloneNotSupportError(ERS_HERE, "resuming a single file. Resuming everything."));
373
374 char* input_request = new char[100];
375 sprintf(input_request,
376 "{"
377 "\"bytesPerSecond\": %s"
378 "}",
379 m_params.bwlimit.c_str());
380
381 requestRPC("core/bwlimit", input_request);
382 delete[] input_request;
383
384 return true;
385 }

◆ upload_file()

bool dunedaq::snbmodules::TransferInterfaceRClone::upload_file ( TransferMetadata & f_meta)
inlineoverridevirtual

Implements dunedaq::snbmodules::TransferInterfaceAbstract.

Definition at line 124 of file transfer_interface_RClone.hpp.

125 {
126 TLOG() << "debug : RClone : Uploading file " << f_meta.get_file_name();
127
128 // char exec[300];
129 // sprintf(exec,
130 // "rclone serve http / --addr %s:%d --buffer-size '%s' --no-modtime --transfers %d -vv --vfs-cache-mode
131 // 'off' --vfs-cache-max-size 'off'", f_meta.get_src().get_ip().c_str(), m_params.port,
132 // m_params.buffer_size,
133 // m_params.simult_transfers);
134
135 // TLOG() << "debug : executing " << exec;
136 // if (system(exec) == 0)
137 // {
138 // TLOG() << "debug : RClone : Sucess Starting server";
139 // }
140 // else
141 // {
142 // // TODO: error
143 // return false;
144 // }
145
146 f_meta.set_status(status_type::e_status::FINISHED);
147
148 return true;
149 }

Member Data Documentation

◆ m_jobs_id

std::map<TransferMetadata*, int> dunedaq::snbmodules::TransferInterfaceRClone::m_jobs_id
private

Definition at line 443 of file transfer_interface_RClone.hpp.

◆ m_params

struct dunedaq::snbmodules::TransferInterfaceRClone::parameters dunedaq::snbmodules::TransferInterfaceRClone::m_params
private

◆ m_thread

dunedaq::utilities::WorkerThread dunedaq::snbmodules::TransferInterfaceRClone::m_thread
private

Definition at line 464 of file transfer_interface_RClone.hpp.

◆ m_work_dir

std::filesystem::path dunedaq::snbmodules::TransferInterfaceRClone::m_work_dir
private

Definition at line 444 of file transfer_interface_RClone.hpp.


The documentation for this class was generated from the following file: