Line data Source code
1 : /**
2 : * @file CTBModule.cpp CTBModule class
3 : * implementation
4 : *
5 : * This is part of the DUNE DAQ Software Suite, copyright 2020.
6 : * Licensing/copyright details are in the COPYING file that you should have
7 : * received with this code.
8 : */
9 :
10 : #include "confmodel/GeoId.hpp"
11 :
12 : #include "appmodel/CTBConf.hpp"
13 : #include "appmodel/CTBCalibrationStream.hpp"
14 : #include "appmodel/CTBoardConf.hpp"
15 : #include "appmodel/CTBMisc.hpp"
16 : #include "appmodel/CTBRandomTrigger.hpp"
17 : #include "appmodel/CTBHLT.hpp"
18 : #include "appmodel/CTBLLT.hpp"
19 : #include "appmodel/CTBCountLLT.hpp"
20 : #include "appmodel/CTBSockets.hpp"
21 : #include "appmodel/CTBReceiverSocket.hpp"
22 :
23 : #include "CTBModule.hpp"
24 : #include "CTBModuleIssues.hpp"
25 :
26 : #include "iomanager/IOManager.hpp"
27 : #include "logging/Logging.hpp"
28 :
29 : #include <chrono>
30 : #include <string>
31 : #include <thread>
32 : #include <vector>
33 : #include <memory>
34 : #include <map>
35 : #include <queue>
36 : #include <utility>
37 :
38 : /**
39 : * @brief Name used by TRACE TLOG calls from this source file
40 : */
41 : #define TRACE_NAME "CTBModule" // NOLINT
42 : enum
43 : {
44 : TLVL_ENTER_EXIT_METHODS = 10,
45 : TLVL_CTB_MODULE = 15
46 : };
47 :
48 : constexpr uint16_t CTB_HSI_FRAME_VERSION = 0x1; // NOLINT
49 :
50 : using namespace dunedaq;
51 : using namespace ctbmodules;
52 :
53 0 : CTBModule::CTBModule(const std::string& name)
54 : : hsilibs::HSIEventSender(name)
55 0 : , m_is_running(false)
56 0 : , m_stop_requested(false)
57 0 : , m_is_configured(false)
58 0 : , m_error_state(false)
59 0 : , m_total_hlt_counter(0)
60 0 : , m_ts_word_counter(0)
61 0 : , m_hlt_trigger_counter()
62 0 : , m_llt_trigger_counter()
63 0 : , m_control_ios()
64 0 : , m_receiver_ios()
65 0 : , m_control_socket(m_control_ios)
66 0 : , m_receiver_socket(m_receiver_ios)
67 0 : , m_thread_(std::bind(&CTBModule::do_hsi_work, this, std::placeholders::_1))
68 0 : , m_has_calibration_stream( false )
69 0 : , m_run_HLT_counter(0)
70 0 : , m_run_LLT_counter(0)
71 0 : , m_run_channel_status_counter(0)
72 0 : , m_num_control_messages_sent(0)
73 0 : , m_num_control_responses_received(0)
74 0 : , m_last_readout_hlt_timestamp(0)
75 : {
76 0 : register_command("conf", &CTBModule::do_configure);
77 0 : register_command("start", &CTBModule::do_start);
78 0 : register_command("stop", &CTBModule::do_stop);
79 0 : }
80 :
81 0 : CTBModule::~CTBModule(){
82 : //check if running. and in case stop the run
83 0 : if(m_is_running){
84 0 : const CommandData_t stopobj;
85 0 : do_stop(stopobj);
86 0 : }
87 0 : m_control_socket.close() ;
88 :
89 0 : }
90 :
91 : void
92 0 : CTBModule::init(std::shared_ptr<appfwk::ConfigurationManager> cfgMgr)
93 : {
94 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering init() method";
95 :
96 0 : HSIEventSender::init(cfgMgr);
97 :
98 0 : m_cfg = cfgMgr;
99 :
100 0 : auto mdal = cfgMgr->get_dal<appmodel::CTBModule>(get_name());
101 :
102 0 : if (! mdal) {
103 0 : throw ctbmodules::CTBConfigFailure(ERS_HERE, "Missing Module configuration for " + get_name());
104 : }
105 :
106 0 : m_module = mdal;
107 :
108 : // setting up connections
109 0 : auto iom = iomanager::IOManager::get();
110 :
111 0 : using hsi_frame_t = dunedaq::hsilibs::HSI_FRAME_STRUCT;
112 0 : for ( auto con : m_module->get_outputs() ) {
113 0 : if ( con->get_data_type() == datatype_to_string<hsi_frame_t>() ) {
114 0 : if ( con->UID().find("HLT")!=std::string::npos
115 0 : || con->UID().find("hlt")!=std::string::npos ) {
116 0 : m_hlt_hsi_data_sender = iom->get_sender<hsi_frame_t>(con->UID());
117 : }
118 0 : if ( con->UID().find("LLT")!=std::string::npos
119 0 : || con->UID().find("llt")!=std::string::npos ) {
120 0 : m_llt_hsi_data_sender = iom->get_sender<hsi_frame_t>(con->UID());
121 : }
122 : } // if data type is HSI Frame
123 : } // loop over outputs
124 :
125 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting init() method";
126 0 : }
127 :
128 : void
129 0 : CTBModule::do_configure(const CommandData_t&)
130 : {
131 :
132 0 : TLOG_DEBUG(0) << get_name() << ": Configuring CTB";
133 :
134 0 : auto conf = m_module ->get_configuration();
135 :
136 0 : m_receiver_port = m_module->get_board()->get_sockets()->get_receiver()->get_port();
137 0 : m_timeout = std::chrono::milliseconds( conf->get_connection_timeout_ms() ) ;
138 :
139 0 : auto hostname = conf->get_hostname();
140 0 : TLOG() << get_name() << ": Board receiver network location "
141 0 : << hostname << ':' << m_receiver_port << std::endl;
142 :
143 : // Initialise monitoring variables
144 0 : m_num_control_messages_sent = 0;
145 0 : m_num_control_responses_received = 0;
146 0 : m_ts_word_counter = 0;
147 :
148 0 : std::map<std::string, size_t> id_to_idx;
149 0 : for(size_t i = 0; i < m_hlt_range; i++) id_to_idx["HLT_" + std::to_string(i)] = i;
150 0 : for(size_t i = 0; i < m_llt_range; i++) id_to_idx["LLT_" + std::to_string(i)] = i;
151 :
152 : // configuring the board
153 0 : auto board = m_module->get_board();
154 0 : auto geo_id = board->get_geo_id();
155 0 : m_det = geo_id->get_detector_id();
156 0 : m_crate = geo_id->get_crate_id();
157 0 : m_slot = geo_id->get_slot_id();
158 :
159 0 : const auto & misc = board->get_misc();
160 0 : auto session = m_cfg->get_session();
161 : // HLTs
162 : // 0th HLT is random trigger that's not in HLT array
163 0 : if (! misc->get_randomtrigger_1()->is_disabled( *session ) ) m_hlt_trigger_counter[0] = 0;
164 :
165 0 : auto hlts = board->get_HLTs();
166 0 : for (const auto& hlt : hlts) { if (! hlt->is_disabled(*session) ) m_hlt_trigger_counter[id_to_idx[hlt->UID()]] = 0; }
167 :
168 : // LLTs: Beam and CRT
169 : // 0th LLT is random trigger that's not in HLT array
170 0 : if (! misc->get_randomtrigger_2()->is_disabled( *session ) ) m_llt_trigger_counter[0] = 0;
171 :
172 0 : auto beam_llts = board->get_beam_LLTs();
173 0 : for (const auto& llt : beam_llts) { if (! llt->is_disabled(*session)) m_llt_trigger_counter[id_to_idx[llt->UID()]] = 0; }
174 :
175 0 : auto crt_llts = board->get_CRT_LLTs();
176 0 : for (const auto& llt : crt_llts) { if (! llt->is_disabled(*session)) m_llt_trigger_counter[id_to_idx[llt->UID()]] = 0; }
177 :
178 : // network connection to ctb hardware control
179 0 : boost::asio::ip::tcp::resolver resolver( m_control_ios );
180 0 : boost::asio::ip::tcp::resolver::query query( hostname,
181 0 : std::to_string(conf->get_control_connection_port()) ) ; //"np04-ctb-1", 8991
182 0 : boost::asio::ip::tcp::resolver::iterator iter = resolver.resolve(query) ;
183 :
184 0 : m_endpoint = iter->endpoint();
185 0 : m_control_socket.connect( m_endpoint );
186 :
187 : // if necessary, set the calibration stream
188 0 : auto stream_conf = conf->get_calibration_stream();
189 0 : if ( stream_conf ) {
190 0 : m_has_calibration_stream = true ;
191 0 : m_calibration_dir = stream_conf->get_directory();
192 0 : m_calibration_file_interval = std::chrono::duration_cast<decltype(m_calibration_file_interval)>(std::chrono::seconds(stream_conf->get_update_period_s()));
193 0 : ;
194 : }
195 :
196 : // at this point we have to find the hostname to tell the board what to get
197 0 : boost::asio::ip::tcp::resolver::query query_for_local(boost::asio::ip::host_name(), "");
198 0 : iter = resolver.resolve(query_for_local);
199 :
200 : // create the json string
201 0 : auto json_conf = m_module->get_board()->get_ctb_json(*session, iter->endpoint().address().to_string());
202 :
203 0 : auto json_dump = json_conf.dump();
204 :
205 0 : TLOG() << "Sending configuration: " << json_dump;
206 :
207 0 : send_config(json_dump);
208 0 : }
209 :
210 : void
211 0 : CTBModule::do_start(const CommandData_t& startobj)
212 : {
213 :
214 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering do_start() method";
215 :
216 : // Set this to false early so it doesn't interfere with the start
217 0 : m_stop_requested.store(false);
218 :
219 0 : m_run_number.store(startobj.at("run").get<daqdataformats::run_number_t>());
220 :
221 0 : m_total_hlt_counter.store(0);
222 :
223 0 : TLOG_DEBUG(0) << get_name() << ": Sending start of run command";
224 0 : m_thread_.start_working_thread();
225 :
226 0 : if ( m_has_calibration_stream ) {
227 0 : std::stringstream run;
228 0 : run << "run" << m_run_number.load();
229 0 : SetCalibrationStream(run.str()) ;
230 0 : }
231 :
232 0 : if ( send_message( "{\"command\":\"StartRun\"}" ) ) {
233 0 : m_is_running.store(true);
234 0 : TLOG_DEBUG(1) << get_name() << ": successfully started";
235 : } else{
236 0 : throw CTBCommunicationError(ERS_HERE, "Unable to start CTB");
237 : }
238 :
239 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting do_start() method";
240 0 : }
241 :
242 : void
243 0 : CTBModule::do_stop(const CommandData_t& /*stopobj*/)
244 : {
245 :
246 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering do_stop() method";
247 :
248 0 : TLOG_DEBUG(0) << get_name() << ": Sending stop run command" << std::endl;
249 :
250 : // Give the do_work thread a chance to stop before stopping the CTB,
251 : // otherwise we end up reading from an empty buffer
252 0 : m_stop_requested.store(true);
253 0 : std::this_thread::sleep_for(std::chrono::milliseconds(2));
254 :
255 0 : if(send_message( "{\"command\":\"StopRun\"}" ) ){
256 0 : TLOG_DEBUG(1) << get_name() << ": successfully stopped";
257 0 : m_is_running.store( false ) ;
258 : } else {
259 0 : throw CTBCommunicationError(ERS_HERE, "Unable to stop CTB");
260 : }
261 0 : m_thread_.stop_working_thread();
262 :
263 0 : m_run_HLT_counter=0;
264 0 : m_run_LLT_counter=0;
265 0 : m_run_channel_status_counter=0;
266 :
267 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting do_stop() method";
268 0 : }
269 :
270 : void
271 0 : CTBModule::do_hsi_work(std::atomic<bool>& running_flag)
272 : {
273 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering do_work() method";
274 :
275 0 : std::size_t n_bytes = 0 ;
276 0 : std::size_t n_words = 0 ;
277 :
278 0 : const size_t header_size = sizeof( content::tcp_header_t ) ;
279 0 : const size_t word_size = content::word::word_t::size_bytes ;
280 :
281 0 : TLOG_DEBUG(TLVL_CTB_MODULE) << get_name() << ": Header size: " << header_size << std::endl << "Word size: " << word_size << std::endl;
282 :
283 : //connect to socket
284 0 : boost::asio::ip::tcp::acceptor acceptor(m_receiver_ios, boost::asio::ip::tcp::endpoint( boost::asio::ip::tcp::v4(), m_receiver_port ) );
285 0 : TLOG_DEBUG(0) << get_name() << ": Waiting for an incoming connection on port " << m_receiver_port << std::endl;
286 :
287 0 : std::future<void> accepting = async( std::launch::async, [&]{ acceptor.accept(m_receiver_socket) ; } ) ;
288 :
289 0 : while ( running_flag.load() && !m_stop_requested.load() ) {
290 0 : if ( accepting.wait_for( m_timeout ) == std::future_status::ready ){
291 : break ;
292 : }
293 : }
294 :
295 0 : TLOG_DEBUG(0) << get_name() << ": Connection received: start reading" << std::endl;
296 :
297 0 : content::tcp_header_t head ;
298 0 : head.packet_size = 0;
299 0 : content::word::word_t temp_word ;
300 0 : bool connection_closed = false ;
301 0 : uint64_t ch_stat_beam, ch_stat_crt, ch_stat_pds; // NOLINT
302 0 : uint64_t prev_timestamp = 0; // NOLINT
303 0 : ts_payload prev_hlt, prev_llt, prev_ch_stat;
304 0 : ts_payload curr_hlt, curr_llt, curr_ch_stat;
305 : // buffers for word matching. buf_a are the trigger words, buf_b are corresponding payloads
306 0 : std::queue<content::word::trigger_t> match_buf_a_hlts, match_buf_a_llts;
307 0 : std::queue<ts_payload> match_buf_b_llts, match_buf_b_chstatus;
308 :
309 0 : while (running_flag.load() && !m_stop_requested.load()) {
310 :
311 0 : update_calibration_file();
312 :
313 0 : if ( ! read( head ) ) {
314 0 : connection_closed = true ;
315 : break;
316 : }
317 :
318 0 : n_bytes = head.packet_size ;
319 : // extract n_words
320 :
321 0 : n_words = n_bytes / word_size ;
322 : // read n words as requested from the header
323 :
324 0 : update_buffer_counts(n_words);
325 :
326 0 : for ( unsigned int i = 0 ; i < n_words ; ++i ) {
327 :
328 0 : if (!running_flag.load() || m_stop_requested.load()) {
329 : break;
330 : }
331 :
332 : //read a word
333 0 : if ( ! read( temp_word ) ) {
334 : connection_closed = true ;
335 : break ;
336 : }
337 : // put it in the calibration stream
338 0 : if ( m_has_calibration_stream ) {
339 0 : m_calibration_file.write( reinterpret_cast<const char*>( & temp_word ), word_size ) ; // NOLINT
340 0 : m_calibration_file.flush() ;
341 : } // word printing in calibration stream
342 :
343 : //check if it is a TS word and increment the counter
344 0 : if ( IsTSWord( temp_word ) ) {
345 0 : ++m_ts_word_counter;
346 0 : TLOG_DEBUG(9) << "Received timestamp word! TS: "+temp_word.timestamp;
347 0 : prev_timestamp = temp_word.timestamp;
348 0 : } else if ( IsFeedbackWord( temp_word ) ) {
349 0 : m_error_state.store( true ) ;
350 0 : content::word::feedback_t * feedback = reinterpret_cast<content::word::feedback_t*>( & temp_word ) ; // NOLINT
351 0 : TLOG_DEBUG(7) << "Received feedback word!";
352 :
353 0 : TLOG_DEBUG(8) << get_name() << ": Feedback word: " << std::endl
354 0 : << std::hex
355 0 : << " \t Type -> " << feedback -> word_type << std::endl
356 0 : << " \t TS -> " << feedback -> timestamp << std::endl
357 0 : << " \t Code -> " << feedback -> code << std::endl
358 0 : << " \t Source -> " << feedback -> source << std::endl
359 0 : << " \t Padding -> " << feedback -> padding << std::dec << std::endl ;
360 0 : } else if (temp_word.word_type == content::word::t_gt) {
361 0 : TLOG_DEBUG(3) << "Received HLT word! TS: " + temp_word.timestamp;
362 0 : content::word::trigger_t * hlt_word = reinterpret_cast<content::word::trigger_t*>( & temp_word ); //NOLINT
363 0 : curr_hlt = {hlt_word->timestamp, (hlt_word->trigger_word & 0x1FFFFFFFFFFFFFFF)};
364 0 : if (check_repeated_word(curr_hlt, prev_hlt, temp_word.word_type)) continue;
365 0 : match_buf_a_hlts.push(*hlt_word);
366 : // Count the total HLTs and each specific one
367 0 : ++m_run_HLT_counter;
368 0 : ++m_total_hlt_counter;
369 0 : for (auto &hlt : m_hlt_trigger_counter) {
370 0 : if( (hlt_word->trigger_word >> hlt.first) & 0x1 )
371 0 : ++hlt.second;
372 : }
373 0 : m_last_readout_hlt_timestamp = temp_word.timestamp;
374 0 : prev_hlt = curr_hlt;
375 0 : } else if (temp_word.word_type == content::word::t_lt) {
376 0 : TLOG_DEBUG(5) << "Received LLT word! TS: " + temp_word.timestamp;
377 0 : content::word::trigger_t * llt_word = reinterpret_cast<content::word::trigger_t*>( & temp_word ) ; //NOLINT
378 0 : curr_llt = {llt_word->timestamp, (llt_word->trigger_word & 0xFFFFFFFF)};
379 0 : if (check_repeated_word(curr_llt, prev_llt, temp_word.word_type)) continue;
380 0 : match_buf_a_llts.push(*llt_word);
381 0 : match_buf_b_llts.push(curr_llt);
382 :
383 0 : ++m_run_LLT_counter;
384 0 : for (auto &llt : m_llt_trigger_counter) {
385 0 : if( (llt_word->trigger_word >> llt.first) & 0x1 )
386 0 : ++llt.second;
387 : }
388 0 : prev_llt = curr_llt;
389 0 : } else if (temp_word.word_type == content::word::t_ch) {
390 :
391 0 : content::word::ch_status_t * ch_stat_word = reinterpret_cast<content::word::ch_status_t*>( & temp_word ) ; // NOLINT
392 : // The channel status only has 60b TS so complete the upper 4b from the TS Word. (fyi 60b rolls over >500yr @ 62.5MHz)
393 0 : uint64_t corrected_ts = ((prev_timestamp & 0xF000000000000000) | ch_stat_word->timestamp); // NOLINT
394 0 : TLOG_DEBUG(6) << "Received Channel Status word! TS: " + corrected_ts;
395 0 : ch_stat_beam = ch_stat_word->get_beam();
396 0 : ch_stat_crt = ch_stat_word->get_crt();
397 0 : ch_stat_pds = ch_stat_word->get_pds();
398 0 : curr_ch_stat = {
399 : corrected_ts,
400 0 : ((ch_stat_pds << 48) | (ch_stat_crt << 16) | ch_stat_beam)
401 0 : };
402 0 : if (check_repeated_word(curr_ch_stat, prev_ch_stat, temp_word.word_type)) continue;
403 0 : match_buf_b_chstatus.push(curr_ch_stat);
404 0 : prev_ch_stat = curr_ch_stat;
405 :
406 0 : ++m_run_channel_status_counter;
407 : } // else if on word types
408 : // do matching
409 0 : match_between_buffers(match_buf_a_hlts, match_buf_b_llts,
410 : prev_hlt.first, content::word::word_type::t_gt);
411 0 : match_between_buffers(match_buf_a_llts, match_buf_b_chstatus,
412 : prev_llt.first, content::word::word_type::t_lt);
413 :
414 : } // n_words loop
415 :
416 0 : if ( connection_closed ){
417 : break ;
418 : }
419 : }
420 :
421 : // Make sure CTB run stops before closing socket
422 0 : while ( m_is_running.load() ) {
423 0 : std::this_thread::sleep_for(std::chrono::microseconds(100));
424 : }
425 :
426 0 : boost::system::error_code closing_error;
427 :
428 0 : if ( m_error_state.load() ) {
429 :
430 0 : m_receiver_socket.shutdown(boost::asio::ip::tcp::socket::shutdown_send, closing_error);
431 :
432 0 : if ( closing_error ) {
433 0 : std::stringstream msg;
434 0 : msg << "Error in shutdown " << closing_error.message();
435 0 : ers::error(CTBCommunicationError(ERS_HERE,msg.str())) ;
436 0 : }
437 :
438 : }
439 :
440 0 : m_receiver_socket.close(closing_error) ;
441 :
442 0 : if ( closing_error ) {
443 0 : std::stringstream msg;
444 0 : msg << "Socket closing failed:: " << closing_error.message();
445 0 : ers::error(CTBCommunicationError(ERS_HERE,msg.str()));
446 0 : }
447 :
448 :
449 0 : TLOG_DEBUG(TLVL_CTB_MODULE) << get_name() << ": End of do_work loop: stop receiving data from the CTB";
450 :
451 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting do_work() method";
452 :
453 0 : } // NOLINT
454 :
455 0 : bool CTBModule::check_repeated_word(ts_payload& curr_word, ts_payload& prev_word, uint64_t wtype){ // NOLINT
456 0 : if (curr_word.first == prev_word.first) { // words with repeated timestamp. Not good!
457 0 : std::stringstream msg;
458 0 : msg << "Multiple words have the same timestamp, Using the first one. Word type: ";
459 0 : if (wtype == content::word::word_type::t_gt) msg << "HLT";
460 0 : else if (wtype == content::word::word_type::t_lt) msg << "LLT";
461 0 : else if (wtype == content::word::word_type::t_ch) msg << "Channel Status";
462 0 : else msg << wtype;
463 0 : msg << ", TS: "<< curr_word.first << ".";
464 0 : if (curr_word.second != prev_word.second) { // not only do we have repeated timestamp, they have different payload...
465 0 : msg << " Different payload!! Previous payload: 0x" << std::hex << prev_word.second
466 0 : << " Current payload: 0x" << curr_word.second;
467 0 : ers::warning(CTBRepeatedTimestampWarning(ERS_HERE, msg.str()));
468 : } else {
469 0 : msg << " Both have payload 0x" << std::hex << prev_word.second;
470 0 : TLOG() << msg.str();
471 : }
472 0 : return true;
473 0 : }
474 : return false;
475 : }
476 :
477 0 : void CTBModule::send_matched_trigger_word(const content::word::trigger_t& word, uint64_t payload) { // NOLINT
478 : // Send HSI data to a DLH
479 0 : std::array<uint32_t, 7> hsi_struct; // NOLINT
480 0 : bool is_hlt = word.IsHLT();
481 0 : hsi_struct[0] = (is_hlt << 26) | // link
482 0 : (m_slot << 22) |
483 0 : (m_crate << 12) |
484 0 : (m_det << 6) |
485 : CTB_HSI_FRAME_VERSION
486 : ;
487 0 : hsi_struct[1] = word.timestamp; // ts low
488 0 : hsi_struct[2] = word.timestamp >> 32; // ts high
489 0 : hsi_struct[3] = payload; // lower 32b
490 0 : hsi_struct[4] = payload >> 32; // upper 32b (will be 0x0 for llt payloads)
491 0 : hsi_struct[5] = word.trigger_word; // trigger_map;
492 0 : hsi_struct[6] = is_hlt ? m_run_HLT_counter : m_run_LLT_counter; // m_generated_counter;
493 0 : int dbg_lvl = is_hlt ? 4 : 6;
494 0 : TLOG_DEBUG(dbg_lvl) << get_name() << ": Formed HSI_FRAME_STRUCT for " << (is_hlt? "HLT" : "LLT")
495 0 : << std::hex
496 0 : << "0x" << hsi_struct[0]
497 0 : << ", 0x" << hsi_struct[1]
498 0 : << ", 0x" << hsi_struct[2]
499 0 : << ", 0x" << hsi_struct[3]
500 0 : << ", 0x" << hsi_struct[4]
501 0 : << ", 0x" << hsi_struct[5]
502 0 : << ", 0x" << hsi_struct[6]
503 0 : << "\n";
504 0 : if (is_hlt) {
505 0 : send_raw_hsi_data(hsi_struct, m_hlt_hsi_data_sender.get());
506 0 : dfmessages::HSIEvent event(m_det, word.trigger_word, word.timestamp, m_run_HLT_counter, m_run_number);
507 0 : send_hsi_event(event);
508 : } else {
509 0 : send_raw_hsi_data(hsi_struct, m_llt_hsi_data_sender.get());
510 : }
511 :
512 0 : }
513 :
514 0 : void CTBModule::match_between_buffers(std::queue<content::word::trigger_t>& buf_a, std::queue<ts_payload>& buf_b,
515 : uint64_t timeout_reference, content::word::word_type buf_a_wtype) { // NOLINT
516 0 : bool is_hlt = (buf_a_wtype == content::word::word_type::t_gt);
517 0 : while (buf_a.size() > 0) {
518 0 : content::word::trigger_t trigger = buf_a.front();
519 0 : auto trigger_ts = trigger.timestamp;
520 0 : auto trigger_word = trigger.trigger_word;
521 0 : if (timeout_reference > (trigger_ts + 100)) {
522 0 : std::stringstream msg;
523 0 : msg << "Time out while waiting for a match for the "<< (is_hlt? "HLT" : "LLT")
524 0 : << ": TS = " << trigger_ts << ", trigger word = " << std::hex << "0x"<< trigger_word
525 0 : << " Timeout reference: " << std::dec << timeout_reference;
526 0 : ers::warning(CTBWordMatchWarning(ERS_HERE, msg.str()));
527 0 : buf_a.pop();
528 0 : continue;
529 0 : }
530 0 : if (buf_b.size() == 0) break; // no input to match yet. Return for now, wait for matching input to come
531 0 : while (buf_b.size() > 0) {
532 0 : auto input_ts = buf_b.front().first;
533 0 : if (input_ts < (trigger_ts - 1)) { // word is too early. No longer needed
534 0 : if (is_hlt) last_popped_llt = buf_b.front();
535 0 : else last_popped_chstatus = buf_b.front();
536 0 : buf_b.pop();
537 0 : } else if (input_ts == (trigger_ts - 1)) { // match is found
538 0 : send_matched_trigger_word(trigger, buf_b.front().second);
539 0 : buf_a.pop();
540 0 : if (is_hlt) last_popped_llt = buf_b.front();
541 0 : else last_popped_chstatus = buf_b.front();
542 0 : buf_b.pop();
543 0 : break;
544 : } else { // buf_b is already past the match window. No matching is found, error!
545 0 : if (is_hlt && (trigger_word == 0x1 || trigger_word == (0x1 << 16))) { // Fake HLTs, no matching is OK
546 0 : send_matched_trigger_word(trigger, 0);
547 : } else{
548 0 : std::stringstream msg;
549 0 : msg << "No match found for " << (is_hlt? "HLT" : "LLT")
550 0 : << ": TS = " << trigger_ts << ", trigger word = 0x" << std::hex << trigger_word
551 0 : << " Adjacent input ts: " << std::dec << (is_hlt? last_popped_llt.first : last_popped_chstatus.first) << " "
552 0 : << input_ts;
553 0 : ers::warning(CTBWordMatchWarning(ERS_HERE, msg.str()));
554 0 : }
555 0 : buf_a.pop();
556 0 : break;
557 : }
558 : } // end loop buf_b
559 : } // end loop buf_a
560 : // Don't let buf_b get too long (e.g. when LLT rate is high but HLT rate is low)
561 0 : while (buf_b.size() > 32) {
562 0 : if (is_hlt) last_popped_llt = buf_b.front();
563 0 : else last_popped_chstatus = buf_b.front();
564 0 : buf_b.pop();
565 : }
566 0 : }
567 :
568 :
569 : template<typename T>
570 0 : bool CTBModule::read( T &obj) {
571 :
572 0 : boost::system::error_code receiving_error;
573 0 : boost::asio::read( m_receiver_socket, boost::asio::buffer( &obj, sizeof(T) ), receiving_error ) ;
574 :
575 0 : if ( ! receiving_error ) {
576 : return true ;
577 : }
578 :
579 0 : if ( receiving_error == boost::asio::error::eof) {
580 0 : std::string error_message = "Socket closed: " + receiving_error.message();
581 0 : ers::error(CTBCommunicationError(ERS_HERE, error_message));
582 : return false ;
583 0 : }
584 :
585 : if ( receiving_error ) {
586 0 : std::string error_message = "Read failure: " + receiving_error.message();
587 0 : ers::error(CTBCommunicationError(ERS_HERE, error_message));
588 : return false ;
589 0 : }
590 :
591 : return true ;
592 : }
593 :
594 0 : bool CTBModule::IsTSWord( const content::word::word_t &w ) noexcept {
595 :
596 0 : if ( w.word_type == content::word::t_ts ) {
597 0 : return true;
598 : }
599 : return false;
600 :
601 : }
602 :
603 0 : bool CTBModule::IsFeedbackWord( const content::word::word_t &w ) noexcept {
604 :
605 0 : if ( w.word_type == content::word::t_fback ) {
606 0 : return true;
607 : }
608 : return false;
609 :
610 : }
611 :
612 0 : void CTBModule::init_calibration_file() {
613 :
614 0 : if ( ! m_has_calibration_stream ){
615 0 : return ;
616 : }
617 0 : std::array<char, 200> file_name;
618 0 : time_t rawtime = time( nullptr ) ;
619 0 : struct tm timeinfo;
620 0 : localtime_r( & rawtime, & timeinfo) ;
621 0 : strftime( file_name.data(), file_name.size(), "%F_%H.%M.%S.calib", & timeinfo );
622 0 : std::string global_name = m_calibration_dir + m_calibration_prefix + file_name.data() ;
623 0 : m_calibration_file.open( global_name, std::ofstream::binary ) ;
624 0 : m_last_calibration_file_update = std::chrono::steady_clock::now();
625 : // _calibration_file.setf ( std::ios::hex, std::ios::basefield );
626 : // _calibration_file.unsetf ( std::ios::showbase );
627 0 : TLOG_DEBUG(0) << get_name() << ": New Calibration Stream file: " << global_name << std::endl ;
628 :
629 0 : }
630 :
631 0 : void CTBModule::update_calibration_file() {
632 :
633 0 : if ( ! m_has_calibration_stream ) {
634 0 : return ;
635 : }
636 :
637 0 : std::chrono::steady_clock::time_point check_point = std::chrono::steady_clock::now();
638 :
639 0 : if ( check_point - m_last_calibration_file_update < m_calibration_file_interval ) {
640 : return ;
641 : }
642 :
643 0 : m_calibration_file.close() ;
644 0 : init_calibration_file() ;
645 :
646 : }
647 :
648 0 : bool CTBModule::SetCalibrationStream( const std::string & prefix ) {
649 :
650 0 : if ( m_calibration_dir.back() != '/' ){
651 0 : m_calibration_dir += '/' ;
652 : }
653 0 : m_calibration_prefix = prefix ;
654 0 : if ( prefix.size() > 0 ){
655 0 : m_calibration_prefix += '_' ;
656 : }
657 : // possibly we could check here if the directory is valid and writable before assuming the calibration stream is valid
658 0 : return true ;
659 :
660 : }
661 :
662 :
663 0 : void CTBModule::send_config( const std::string & config ) {
664 :
665 0 : if ( m_is_configured.load() ) {
666 :
667 0 : TLOG_DEBUG(1) << get_name() << ": Resetting before configuring" << std::endl;
668 0 : send_reset();
669 :
670 : }
671 :
672 0 : TLOG_DEBUG(1) << get_name() << ": Sending config" << std::endl;
673 :
674 0 : if ( send_message( config ) ) {
675 :
676 0 : m_is_configured.store(true) ;
677 :
678 : } else {
679 0 : throw CTBCommunicationError(ERS_HERE, "Unable to configure CTB");
680 : }
681 0 : }
682 :
683 0 : void CTBModule::send_reset() {
684 :
685 0 : TLOG_DEBUG(1) << get_name() << ": Sending a reset" << std::endl;
686 :
687 0 : if(send_message( "{\"command\":\"HardReset\"}" )){
688 :
689 0 : m_is_running.store(false);
690 0 : m_is_configured.store(false);
691 :
692 : } else {
693 0 : ers::error(CTBCommunicationError(ERS_HERE, "Unable to reset CTB"));
694 : }
695 :
696 0 : }
697 :
698 0 : bool CTBModule::send_message( const std::string & msg ) {
699 :
700 : //add error options
701 :
702 0 : boost::system::error_code error;
703 0 : TLOG_DEBUG(1) << get_name() << ": Sending message: " << msg;
704 :
705 0 : m_num_control_messages_sent++;
706 :
707 0 : boost::asio::write( m_control_socket, boost::asio::buffer( msg ), error ) ;
708 0 : boost::array<char, 4096> reply_buf{" "} ;
709 0 : m_control_socket.read_some( boost::asio::buffer(reply_buf ), error);
710 0 : std::stringstream raw_answer( std::string(reply_buf .begin(), reply_buf .end() ) ) ;
711 0 : TLOG_DEBUG(1) << get_name() << ": Unformatted answer: " << raw_answer.str();
712 :
713 0 : nlohmann::json answer ;
714 0 : raw_answer >> answer ;
715 0 : nlohmann::json & messages = answer["feedback"] ;
716 0 : TLOG_DEBUG(1) << get_name() << ": Received messages: " << messages.size();
717 :
718 0 : bool ret = true ;
719 0 : for (nlohmann::json::size_type i = 0; i != messages.size(); ++i ) {
720 :
721 0 : m_num_control_responses_received++;
722 :
723 0 : std::string type = messages[i]["type"].dump() ;
724 0 : if ( type.find("error") != std::string::npos || type.find("Error") != std::string::npos || type.find("ERROR") != std::string::npos ) {
725 0 : ers::error(CTBMessage(ERS_HERE, messages[i]["message"].dump()));
726 0 : ret = false ;
727 0 : } else if ( type.find("warning") != std::string::npos || type.find("Warning") != std::string::npos || type.find("WARNING") != std::string::npos ) {
728 0 : ers::warning(CTBMessage(ERS_HERE, messages[i]["message"].dump()));
729 0 : } else if ( type.find("info") != std::string::npos || type.find("Info") != std::string::npos || type.find("INFO") != std::string::npos) {
730 0 : TLOG() << "Message from the board: " << messages[i]["message"].dump();
731 : } else {
732 0 : std::stringstream blob;
733 0 : blob << messages[i] ;
734 0 : TLOG() << get_name() << ": Unformatted from the board: " << blob.str();
735 0 : }
736 0 : }
737 :
738 0 : return ret;
739 :
740 0 : }
741 :
742 : void
743 0 : CTBModule::update_buffer_counts(uint new_count) // NOLINT(build/unsigned)
744 : {
745 0 : std::unique_lock mon_data_lock(m_buffer_counts_mutex);
746 0 : if (m_buffer_counts.size() > 1000)
747 0 : m_buffer_counts.pop_front();
748 0 : m_buffer_counts.push_back(new_count);
749 0 : }
750 :
751 : double
752 0 : CTBModule::read_average_buffer_counts()
753 : {
754 0 : std::unique_lock mon_data_lock(m_buffer_counts_mutex);
755 :
756 0 : double total_counts = 0;;
757 0 : uint32_t number_of_counts = m_buffer_counts.size(); // NOLINT(build/unsigned)
758 :
759 0 : if (number_of_counts) {
760 0 : for (uint i = 0; i < number_of_counts; ++i) { // NOLINT(build/unsigned)
761 0 : total_counts = total_counts + m_buffer_counts.at(i);
762 : }
763 0 : return total_counts / number_of_counts;
764 : } else {
765 : return 0;
766 : }
767 0 : }
768 :
769 0 : void CTBModule::generate_opmon_data()
770 : {
771 0 : dunedaq::ctbmodules::opmon::CTBModuleInfo module_info;
772 :
773 0 : module_info.set_num_control_messages_sent(m_num_control_messages_sent.load());
774 0 : module_info.set_num_control_responses_received(m_num_control_responses_received.load());
775 0 : module_info.set_ctb_hardware_running(m_is_running.load());
776 0 : module_info.set_ctb_hardware_configured(m_is_configured.load());
777 :
778 0 : module_info.set_last_readout_timestamp(m_last_readout_hlt_timestamp.load());
779 0 : module_info.set_failed_to_send_hsi_events_counter( m_failed_to_send_counter.load() );
780 0 : module_info.set_last_sent_timestamp(m_last_sent_timestamp.load());
781 0 : module_info.set_average_buffer_occupancy( read_average_buffer_counts() );
782 :
783 0 : module_info.set_total_hlt_count(m_total_hlt_counter.load() );
784 0 : module_info.set_ts_word_count(m_ts_word_counter.exchange(0));
785 :
786 0 : publish( std::move(module_info) );
787 :
788 0 : for (auto &hlt : m_hlt_trigger_counter) {
789 0 : dunedaq::ctbmodules::opmon::TriggerInfo ti;
790 0 : ti.set_count(hlt.second.exchange(0));
791 0 : publish( std::move(ti), {{ "trigger", "HLT_" + std::to_string(hlt.first)}} );
792 0 : }
793 :
794 0 : for (auto &llt : m_llt_trigger_counter) {
795 0 : dunedaq::ctbmodules::opmon::TriggerInfo ti;
796 0 : ti.set_count(llt.second.exchange(0));
797 0 : publish( std::move(ti), {{ "trigger", "LLT_" + std::to_string(llt.first)}} );
798 0 : }
799 :
800 0 : }
801 :
802 0 : DEFINE_DUNE_DAQ_MODULE(dunedaq::ctbmodules::CTBModule)
803 :
804 : // Local Variables:
805 : // c-basic-offset: 2
806 : // End:
|