Line data Source code
1 : /**
2 : * @file SNBFileTransfer.cpp Module to transfer files between SNBFileTransfer modules.
3 : *
4 : * This is part of the DUNE DAQ , copyright 2020.
5 : * Licensing/copyright details are in the COPYING file that you should have
6 : * received with this code.
7 : */
8 :
9 : #include "SNBFileTransfer.hpp"
10 :
11 : #include "appfwk/cmd/Nljs.hpp"
12 :
13 : #include <memory>
14 : #include <set>
15 : #include <string>
16 :
17 : namespace dunedaq::snbmodules {
18 :
19 0 : SNBFileTransfer::SNBFileTransfer(const std::string& name)
20 0 : : DAQModule(name)
21 : {
22 0 : register_command("conf", &SNBFileTransfer::do_conf);
23 0 : register_command("scrap", &SNBFileTransfer::do_scrap);
24 0 : register_command("start", &SNBFileTransfer::do_start);
25 0 : register_command("stop", &SNBFileTransfer::do_stop);
26 :
27 0 : register_command("new_transfer", &SNBFileTransfer::do_tr_new);
28 0 : register_command("start_transfer", &SNBFileTransfer::do_tr_start);
29 0 : register_command("pause_transfer", &SNBFileTransfer::do_tr_pause);
30 0 : register_command("resume_transfer", &SNBFileTransfer::do_tr_resume);
31 0 : register_command("cancel_transfer", &SNBFileTransfer::do_tr_cancel);
32 :
33 0 : m_name = name;
34 0 : }
35 :
36 : void
37 0 : SNBFileTransfer::do_tr_new(const CommandData_t& args)
38 : {
39 0 : TLOG() << "debug : New transfer request !";
40 :
41 0 : std::string src = "";
42 0 : if (args.contains("src")) {
43 0 : src = args["src"].get<std::string>();
44 : } else {
45 0 : ers::error(ConfigError(ERS_HERE, "src is mandatory to create a new transfer"));
46 0 : return;
47 : }
48 :
49 0 : if (src == m_name) {
50 0 : std::set<std::string> dests = {};
51 0 : std::set<std::filesystem::path> files = {};
52 :
53 0 : if (args.contains("dests") && args.contains("files")) {
54 0 : dests = args["dests"].get<std::set<std::string>>();
55 0 : files = args["files"].get<std::set<std::filesystem::path>>();
56 : } else {
57 0 : ers::error(ConfigError(ERS_HERE, "dests and files are mandatory to create a new transfer"));
58 0 : return;
59 : }
60 :
61 0 : m_client->create_new_transfer(
62 0 : args["transfer_id"].get<std::string>(), args["protocol"].get<std::string>(), dests, files, args["protocol_args"]);
63 0 : } else {
64 0 : ers::error(InvalidSourceCommandRequestError(ERS_HERE, "New Transfer"));
65 : }
66 0 : }
67 : void
68 0 : SNBFileTransfer::do_tr_start(const CommandData_t& args)
69 : {
70 0 : if (args.contains("transfer_id")) {
71 0 : m_client->start_transfer(args["transfer_id"].get<std::string>());
72 : } else {
73 0 : ers::error(ConfigError(ERS_HERE, "transfer_id is mandatory to start a transfer"));
74 : }
75 0 : }
76 : void
77 0 : SNBFileTransfer::do_tr_pause(const CommandData_t& args)
78 : {
79 0 : if (args.contains("transfer_id")) {
80 0 : m_client->pause_transfer(args["transfer_id"].get<std::string>());
81 : } else {
82 0 : ers::error(ConfigError(ERS_HERE, "transfer_id is mandatory to pause a transfer"));
83 : }
84 0 : }
85 : void
86 0 : SNBFileTransfer::do_tr_resume(const CommandData_t& args)
87 : {
88 0 : if (args.contains("transfer_id")) {
89 0 : m_client->resume_transfer(args["transfer_id"].get<std::string>());
90 : } else {
91 0 : ers::error(ConfigError(ERS_HERE, "transfer_id is mandatory to resume a transfer"));
92 : }
93 0 : }
94 : void
95 0 : SNBFileTransfer::do_tr_cancel(const CommandData_t& args)
96 : {
97 0 : if (args.contains("transfer_id")) {
98 0 : m_client->cancel_transfer(args["transfer_id"].get<std::string>());
99 : } else {
100 0 : ers::error(ConfigError(ERS_HERE, "transfer_id is mandatory to cancel a transfer"));
101 : }
102 0 : }
103 :
104 : void
105 0 : SNBFileTransfer::init(std::shared_ptr<dunedaq::appfwk::ConfigurationManager> mcfg)
106 : {
107 0 : auto mdal = mcfg->get_dal<appmodel::SNBFileTransfer>(get_name());
108 0 : if (!mdal) {
109 0 : throw appfwk::CommandFailed(ERS_HERE, "init", get_name(), "Unable to retrieve configuration object");
110 : }
111 :
112 0 : m_snbft_conf = mdal->get_configuration();
113 0 : }
114 :
115 : void
116 0 : SNBFileTransfer::do_conf(const CommandData_t& /*payload*/)
117 : {
118 0 : TLOG() << "FAB " << __LINE__ << " " << m_snbft_conf->get_work_dir();
119 0 : m_client = std::make_shared<TransferClient>(IPFormat(m_snbft_conf->get_client_ip()),
120 0 : m_name,
121 0 : m_snbft_conf->get_work_dir(),
122 0 : m_snbft_conf->get_connection_prefix(),
123 0 : m_snbft_conf->get_timeout_send(),
124 0 : m_snbft_conf->get_timeout_receive());
125 0 : m_thread =
126 0 : std::make_unique<dunedaq::utilities::WorkerThread>([&](std::atomic<bool>& running) { m_client->do_work(running); });
127 0 : }
128 :
129 : void
130 0 : SNBFileTransfer::do_scrap(const CommandData_t& /*payload*/)
131 : {
132 0 : if (m_thread->thread_running()) {
133 0 : m_thread->stop_working_thread();
134 : // wait for thread to stop
135 0 : while (m_thread->thread_running())
136 : ;
137 0 : m_thread.reset();
138 : }
139 :
140 0 : m_client.reset();
141 0 : }
142 :
143 : void
144 0 : SNBFileTransfer::do_start(const CommandData_t& /*payload*/)
145 : {
146 0 : m_client->lookups_connections();
147 0 : m_thread->start_working_thread();
148 0 : }
149 :
150 : void
151 0 : SNBFileTransfer::do_stop(const CommandData_t& /*payload*/)
152 : {
153 0 : m_thread->stop_working_thread();
154 0 : }
155 :
156 : } // namespace dunedaq::snbmodules
157 :
158 0 : DEFINE_DUNE_DAQ_MODULE(dunedaq::snbmodules::SNBFileTransfer)
|