Line data Source code
1 : /**
2 : *
3 : * Implementations of DaphneV2Interface's functions
4 : *
5 : * This is part of the DUNE DAQ Software Suite, copyright 2020.
6 : * Licensing/copyright details are in the COPYING file that you should have
7 : */
8 :
9 : #include "DaphneV2Interface.hpp"
10 : #include "logging/Logging.hpp"
11 : #include <sys/time.h>
12 :
13 : using namespace dunedaq::daphnemodules;
14 :
15 1 : DaphneV2Interface::DaphneV2Interface( const char* ipaddr, int port,
16 1 : std::chrono::milliseconds timeout)
17 1 : : m_timeout(timeout) {
18 :
19 1 : m_connection_id = socket(AF_INET, SOCK_DGRAM, 0);
20 :
21 1 : if ( m_connection_id < 0 )
22 0 : throw SocketCreationError(ERS_HERE);
23 :
24 1 : m_target.sin_family = AF_INET;
25 1 : m_target.sin_port = htons(port);
26 1 : auto ret = inet_pton(AF_INET, ipaddr, &(m_target.sin_addr));
27 1 : if ( ret <= 0 )
28 1 : throw InvalidIPAddress(ERS_HERE, ipaddr);
29 :
30 0 : m_ip = ipaddr;
31 :
32 0 : if ( ! validate_connection() )
33 0 : throw FailedPing(ERS_HERE, ipaddr, port );
34 :
35 1 : }
36 :
37 :
38 0 : void DaphneV2Interface::close() {
39 :
40 0 : ::close( m_connection_id );
41 0 : }
42 :
43 :
44 0 : bool DaphneV2Interface::validate_connection() const {
45 :
46 0 : auto ret = read_register( 0xaa55, 1);
47 :
48 0 : static const uint64_t good_value = 0xdeadbeef;
49 0 : return ret[0] == good_value ;
50 :
51 0 : }
52 :
53 :
54 0 : command_result DaphneV2Interface::send_command_retry( std::string cmd,
55 : size_t retry ) const {
56 :
57 0 : do {
58 0 : try {
59 0 : auto ret = send_command(cmd);
60 0 : return ret;
61 0 : }
62 0 : catch ( const CommandTimeout & e ) {
63 0 : ers::warning( e );
64 0 : --retry;
65 0 : }
66 0 : catch ( const ers::Issue & e ) {
67 0 : throw FailedSocketInteraction(ERS_HERE, cmd, e);
68 0 : }
69 :
70 0 : } while (retry>0);
71 :
72 0 : throw FailedSocketInteraction(ERS_HERE, cmd);
73 :
74 : }
75 :
76 :
77 0 : command_result DaphneV2Interface::send_command_interruptible( std::string cmd,
78 : std::function<bool()> can_retry ) const {
79 :
80 0 : do {
81 0 : try {
82 0 : auto ret = send_command(cmd);
83 0 : return ret;
84 0 : }
85 0 : catch ( const CommandTimeout & e ) {
86 0 : ers::warning( e );
87 0 : }
88 0 : catch ( const ers::Issue & e ) {
89 0 : throw FailedSocketInteraction(ERS_HERE, cmd, e);
90 0 : }
91 :
92 0 : } while (can_retry());
93 :
94 : }
95 :
96 :
97 :
98 0 : command_result DaphneV2Interface::send_command( std::string cmd) const {
99 :
100 0 : TLOG() << "Board: " << m_ip << ", sending command " << cmd;
101 0 : std::vector<uint64_t> bytes;
102 0 : for (char ch : cmd) {
103 0 : bytes.push_back(static_cast<uint64_t>(ch));
104 : }
105 0 : bytes.push_back(0x0d); // dedicated command flag
106 :
107 0 : const std::lock_guard<std::mutex> lock(m_command_mutex);
108 :
109 : // we send the bytes in chunks of 50 words
110 0 : for (size_t i = 0; i < (bytes.size() + 49) / 50; ++i) {
111 0 : std::vector<uint64_t> part(bytes.begin() + i*50,
112 0 : bytes.begin() + std::min((i+1)*50, bytes.size()));
113 0 : write_buffer(0x90000000, std::move(part));
114 0 : }
115 :
116 0 : TLOG() << "Board: " << m_ip << ", Command sent, waiting for result";
117 :
118 :
119 0 : command_result res;
120 0 : std::string * writing_pointer = nullptr;
121 :
122 0 : auto start_time = std::chrono::high_resolution_clock::now();
123 :
124 0 : int more = 40;
125 0 : while (more > 0) {
126 0 : auto data_block = read_buffer(0x90000000, 50);
127 0 : for (size_t i = 0; i < data_block.size(); ++i) {
128 0 : if (data_block[i] == 255) {
129 : break;
130 0 : } else if (data_block[i] == 1) {
131 : // the following data are returning the command that was issued
132 0 : writing_pointer = & res.command;
133 0 : } else if (data_block[i] == 2) {
134 : // the following data are the immediate command response
135 0 : writing_pointer = & res.result;
136 0 : } else if (data_block[i] == 3) {
137 : // this is the message end
138 : writing_pointer = nullptr;
139 0 : } else if (isprint(static_cast<int>(data_block[i]))) {
140 0 : more = 40;
141 0 : char c = static_cast<char>(data_block[i]);
142 0 : if ( writing_pointer ) {
143 0 : *writing_pointer += static_cast<char>(data_block[i]);
144 : }
145 : else {
146 0 : TLOG() << "Failed adding charachter " << c;
147 : }
148 : }
149 : }
150 0 : auto now = std::chrono::high_resolution_clock::now();
151 :
152 0 : auto delay = now - start_time;
153 :
154 0 : if ( delay > m_timeout ) {
155 0 : TLOG() << "Details of timeout";
156 0 : for ( size_t i = 0; i < data_block.size(); ++i ) {
157 0 : TLOG() << i << "\t" << std::hex << data_block[i] << std::dec;
158 : }
159 0 : TLOG() << "Received so far: " << res.result;
160 0 : auto delay_us = std::chrono::duration_cast<std::chrono::microseconds>(delay);
161 0 : throw CommandTimeout(ERS_HERE, cmd, delay_us.count());
162 : }
163 :
164 0 : std::this_thread::sleep_for(std::chrono::milliseconds(1));
165 0 : --more;
166 0 : }
167 :
168 0 : return res;
169 0 : }
170 :
171 :
172 0 : std::vector<uint64_t> DaphneV2Interface::read(uint8_t command_id,
173 : uint64_t addr, uint8_t size) const {
174 :
175 0 : const std::lock_guard<std::mutex> lock(m_access_mutex);
176 :
177 0 : uint8_t cmd[10];
178 0 : cmd[0] = command_id;
179 0 : cmd[1] = size;
180 0 : memcpy(cmd + 2, &addr, sizeof(uint64_t));
181 0 : auto result = sendto(m_connection_id, cmd, sizeof(cmd), 0, (struct sockaddr*)&m_target, sizeof(m_target));
182 :
183 0 : if ( result < 0 ) throw FailedSocketInteraction(ERS_HERE, "sendto") ;
184 :
185 :
186 0 : struct timeval timeout;
187 0 : timeout.tv_sec = m_timeout.count() / 1000 ;
188 0 : timeout.tv_usec = (m_timeout.count() % 1000) * 1000 ;
189 0 : fd_set readfds, masterfds;
190 :
191 0 : FD_ZERO(&masterfds);
192 0 : FD_SET(m_connection_id, &masterfds);
193 :
194 0 : memcpy(&readfds, &masterfds, sizeof(fd_set));
195 :
196 0 : auto start_time = std::chrono::high_resolution_clock::now();
197 :
198 0 : if (select(m_connection_id+1, &readfds, NULL, NULL, &timeout) < 0) {
199 0 : throw FailedSocketInteraction(ERS_HERE, "select") ;
200 : }
201 :
202 0 : std::vector<uint64_t> ret_value;
203 :
204 0 : if (FD_ISSET(m_connection_id, &readfds)) {
205 0 : uint8_t buffer[2 + (8 * size)];
206 0 : socklen_t addrlen = sizeof(m_target);
207 0 : result = recvfrom(m_connection_id, buffer, sizeof(buffer), 0, (struct sockaddr*)&m_target, &addrlen);
208 :
209 0 : if ( result <= 0 ) throw FailedSocketInteraction(ERS_HERE, "recvfrom") ;
210 :
211 0 : uint8_t fmt[4 + size];
212 0 : fmt[0] = '<';
213 0 : fmt[1] = 'B';
214 0 : fmt[2] = 'B';
215 0 : fmt[3] = size;
216 0 : for (int i = 0; i < size; i++) {
217 0 : fmt[4 + i] = 'Q';
218 : }
219 :
220 0 : for (int i = 0; i < size; i++) {
221 0 : uint64_t value;
222 0 : memcpy(&value, buffer + 2 + (8 * i), sizeof(uint64_t));
223 0 : ret_value.push_back(value);
224 : }
225 :
226 0 : }
227 : else {
228 : // the socket timedout
229 0 : auto end_time = std::chrono::high_resolution_clock::now();
230 0 : auto duration = std::chrono::duration_cast<std::chrono::microseconds>(end_time - start_time);
231 :
232 0 : throw SocketTimeout(ERS_HERE, duration.count() );
233 : }
234 :
235 0 : return ret_value;
236 0 : }
237 :
238 :
239 0 : void DaphneV2Interface::write(uint8_t command_id, uint64_t addr, std::vector<uint64_t> && data) const {
240 :
241 0 : const std::lock_guard<std::mutex> lock(m_access_mutex);
242 :
243 0 : uint8_t cmd[10 + (8 * data.size())];
244 0 : cmd[0] = command_id;
245 0 : cmd[1] = data.size();
246 0 : memcpy(cmd + 2, &addr, sizeof(uint64_t));
247 0 : for (size_t i = 0; i < data.size(); i++) {
248 0 : memcpy(cmd + 10 + (8 * i), &(data[i]), sizeof(uint64_t));
249 : }
250 :
251 0 : auto result = sendto(m_connection_id, cmd, sizeof(cmd), 0, (struct sockaddr*)&m_target, sizeof(m_target));
252 0 : if ( result < 0 ) throw FailedSocketInteraction(ERS_HERE, "sendto") ;
253 0 : }
254 :
255 :
256 :
257 :
258 :
|