9#ifndef SNBMODULES_INCLUDE_SNBMODULES_INTERFACES_TRANSFER_INTERFACE_RCLONE_HPP_
10#define SNBMODULES_INCLUDE_SNBMODULES_INTERFACES_TRANSFER_INTERFACE_RCLONE_HPP_
47 if (
config.get_protocol_options().contains(
"protocol")) {
52 if (
config.get_protocol_options().contains(
"user")) {
57 if (config.get_protocol_options().contains(
"rate_limit")) {
58 m_params.bwlimit = config.get_protocol_options()[
"rate_limit"].get<std::string>();
60 if (config.get_protocol_options().contains(
"refresh_rate")) {
61 m_params.refresh_rate = config.get_protocol_options()[
"refresh_rate"].get<
int>();
63 if (config.get_protocol_options().contains(
"port")) {
64 m_params.port = config.get_protocol_options()[
"port"].get<
int>();
68 if (config.get_protocol_options().contains(
"simult_transfers")) {
69 m_params.simult_transfers = config.get_protocol_options()[
"simult_transfers"].get<
int>();
71 if (config.get_protocol_options().contains(
"transfer_threads")) {
72 m_params.transfer_threads = config.get_protocol_options()[
"transfer_threads"].get<
int>();
74 if (config.get_protocol_options().contains(
"checkers_threads")) {
75 m_params.checkers_threads = config.get_protocol_options()[
"checkers_threads"].get<
int>();
78 if (config.get_protocol_options().contains(
"chunk_size")) {
79 m_params.chunk_size = config.get_protocol_options()[
"chunk_size"].get<std::string>();
81 if (config.get_protocol_options().contains(
"buffer_size")) {
82 m_params.buffer_size = config.get_protocol_options()[
"buffer_size"].get<std::string>();
84 if (config.get_protocol_options().contains(
"use_mmap")) {
85 m_params.use_mmap = config.get_protocol_options()[
"use_mmap"].get<
bool>();
87 if (config.get_protocol_options().contains(
"checksum")) {
88 m_params.checksum = config.get_protocol_options()[
"checksum"].get<
bool>();
90 if (config.get_protocol_options().contains(
"root_folder")) {
91 m_params.root_folder = std::filesystem::absolute(config.get_protocol_options()[
"root_folder"].get<std::string>());
94 char* input_request =
new char[100];
95 sprintf(input_request,
102 delete[] input_request;
109 "\"CacheMaxSize\": 0"
154 char* input_request =
new char[1000];
157 std::string file_relative_path =
161 if (file_relative_path.find(
"..") != std::string::npos) {
162 TLOG() <<
"debug : RClone : File path is not relative to root folder";
164 f_meta.
set_error_code(
"File path is not relative to root folder !");
168 sprintf(input_request,
172 "\"type\": \"http\","
173 "\"url\": \"http://%s:%d\""
175 "\"srcRemote\": \"%s\","
179 "\"dstRemote\": \"%s\","
183 "\"BindAddr\": \"\","
185 "\"MultiThreadSet\": true,"
188 "\"MultiThreadStreams\": %d,"
189 "\"MultiThreadCutoff\": \"%s\","
190 "\"StreamingUploadCutoff\": \"%s\","
194 "\"BufferSize\": \"%s\","
195 "\"ErrorOnNoTransfer\": true"
204 file_relative_path.c_str(),
215 m_params.use_mmap ?
"true" :
"false",
216 m_params.checksum ?
"true" :
"false",
218 }
else if (
m_params.protocol ==
"sftp") {
219 sprintf(input_request,
231 "\"type\": \"sftp\","
235 "\"key_file\": \"/home/ljoly/.ssh/id_rsa\","
236 "\"disable_concurrent_writes\": \"false\","
237 "\"concurrency\": \"%d\""
243 "\"srcRemote\": \"../..%s\","
262 "\"dstRemote\": \"%s\","
266 "\"BindAddr\": \"\","
268 "\"MultiThreadSet\": true,"
271 "\"MultiThreadStreams\": %d,"
272 "\"MultiThreadCutoff\": \"%s\","
276 "\"BufferSize\": \"%s\","
277 "\"ErrorOnNoTransfer\": true"
301 m_params.use_mmap ?
"true" :
"false",
302 m_params.checksum ?
"true" :
"false",
306 auto res =
requestRPC(
"operations/copyfile", input_request);
307 TLOG() <<
"Requested copyfile operation with parameters : " << input_request;
344 TLOG() << input_request;
346 delete[] input_request;
347 if (res.has_value()) {
348 m_jobs_id[&f_meta] = res.value()[
"jobid"];
364 requestRPC(
"core/bwlimit",
"{\"bytesPerSecond\": 0}");
374 char* input_request =
new char[100];
375 sprintf(input_request,
377 "\"bytesPerSecond\": %s"
382 delete[] input_request;
403 TLOG() <<
"debug : RClone : Job id not found";
407 char* input_request =
new char[100];
408 sprintf(input_request,
415 delete[] input_request;
424 std::string
user =
"anonymous";
446 std::optional<nlohmann::json>
requestRPC(
const std::string& method,
const std::string& input)
448 char* m = strdup(method.c_str());
449 char* in = strdup(input.c_str());
451 struct RcloneRPCResult
out = RcloneRPC(m, in);
452 TLOG_DEBUG(2) <<
"debug : RClone : result status: " <<
out.Status;
453 TLOG_DEBUG(2) <<
"debug : RClone : result output: " <<
out.Output;
454 nlohmann::json j = nlohmann::json::parse(
out.Output);
457 if (
out.Status != 200) {
467 TLOG() <<
"debug : running thread ";
469 while (running.load()) {
474 if (stats.has_value()) {
475 if (stats.value()[
"transferring"] !=
nullptr) {
476 auto current_transfers = stats.value()[
"transferring"].get<std::vector<nlohmann::json>>();
478 for (
const auto& t : current_transfers) {
479 auto grp = t[
"group"].get<std::string>();
480 int job_id = std::stoi(grp.substr(grp.find(
"/") + 1));
484 meta->set_progress(t[
"percentage"].get<int>());
485 meta->set_transmission_speed(t[
"speedAvg"].get<int32_t>());
496 char* input_request =
new char[100];
497 sprintf(input_request,
503 auto res =
requestRPC(
"job/status", input_request);
504 delete[] input_request;
506 if (res.has_value()) {
507 if (res.value()[
"finished"] !=
nullptr && res.value()[
"finished"].get<
bool>()) {
508 if (res.value()[
"success"].get<
bool>()) {
510 meta->set_progress(100);
513 meta->set_error_code(res.value()[
"error"].get<std::string>());
515 meta->set_transmission_speed(0);
521 std::this_thread::sleep_for(std::chrono::seconds(
m_params.refresh_rate));
TransferInterfaceAbstract(GroupMetadata &config)
std::filesystem::path m_work_dir
bool cancel_file(TransferMetadata &f_meta) override
bool download_file(TransferMetadata &f_meta, std::filesystem::path dest) override
bool pause_file(TransferMetadata &f_meta) override
struct dunedaq::snbmodules::TransferInterfaceRClone::parameters m_params
std::optional< nlohmann::json > requestRPC(const std::string &method, const std::string &input)
void do_work(std::atomic< bool > &running)
virtual ~TransferInterfaceRClone()
std::map< TransferMetadata *, int > m_jobs_id
dunedaq::utilities::WorkerThread m_thread
bool hash_file(TransferMetadata &f_meta) override
bool resume_file(TransferMetadata &f_meta) override
bool upload_file(TransferMetadata &f_meta) override
TransferInterfaceRClone(GroupMetadata &config, const std::filesystem::path &work_dir)
WorkerThread contains a thread which runs the do_work() function.
void start_working_thread(const std::string &name="noname")
Start the working thread (which executes the do_work() function).
#define TLOG_DEBUG(lvl,...)
Unknown serialization type<< t,((char) t)) template< typename T > inline std::string datatype_to_string() { return "Unknown";} namespace serialization { template< typename T > struct is_serializable :std::false_type {};enum SerializationType { kMsgPack };inline SerializationType from_string(const std::string s) { if(s=="msgpack") return kMsgPack;throw UnknownSerializationTypeString(ERS_HERE, s);} constexpr uint8_t serialization_type_byte(SerializationType stype) { switch(stype) { case kMsgPack:return 'M';default:throw UnknownSerializationTypeEnum(ERS_HERE);} } constexpr SerializationType DEFAULT_SERIALIZATION_TYPE=kMsgPack;template< class T > std::vector< uint8_t > serialize(const T &obj, SerializationType stype=DEFAULT_SERIALIZATION_TYPE) { switch(stype) { case kMsgPack:{ msgpack::sbuffer buf;msgpack::pack(buf, obj);std::vector< uint8_t > ret(buf.size()+1);ret[0]=serialization_type_byte(stype);std::copy(buf.data(), buf.data()+buf.size(), ret.begin()+1);return ret;} default:throw UnknownSerializationTypeEnum(ERS_HERE);} } template< class T, typename CharType=unsigned char > T deserialize(const std::vector< CharType > &v) { switch(v[0]) { case serialization_type_byte(kMsgPack):{ try { msgpack::object_handle oh=msgpack::unpack(const_cast< char * >(reinterpret_cast< const char * >(v.data()+1)), v.size() - 1,[](msgpack::type::object_type, std::size_t, void *) -> bool
FELIX Initialization std::string initerror FELIX queue timed out
void warning(const Issue &issue)
std::filesystem::path root_folder