Line data Source code
1 : /**
2 : * @file CIBModule.cpp
3 : *
4 : * Implementations of CIBModule's functions
5 : *
6 : * This is part of the DUNE DAQ Software Suite, copyright 2020.
7 : * Licensing/copyright details are in the COPYING file that you should have
8 : * received with this code.
9 : */
10 : #include "confmodel/GeoId.hpp"
11 :
12 : // these are the data types in the application model
13 : #include "appmodel/CIBConf.hpp"
14 : #include "appmodel/CIBCalibrationStream.hpp"
15 : #include "appmodel/CIBoardConf.hpp"
16 : #include "appmodel/CIBTrigger.hpp"
17 :
18 : #include "CIBModule.hpp"
19 : #include "CIBModuleIssues.hpp"
20 :
21 : #include "iomanager/IOManager.hpp"
22 : #include "logging/Logging.hpp"
23 :
24 : #include <chrono>
25 : #include <string>
26 : #include <thread>
27 : #include <vector>
28 : #include <map>
29 : #include <queue>
30 : #include <utility>
31 : #include <charconv>
32 : #include <cstdint>
33 : #include <string_view>
34 :
35 : // since cib_data_fmt is shared with the board
36 : // we need this macro to avoid clashes with the namespaces
37 : // not the cleanest solution, but it works and avoids maintaining two separate versions of the same struct
38 : #define CIB_DUNEDAQ 1
39 : // we may need this to help parse the data format arriving from the CIB
40 : #include <cib_data_fmt.h>
41 : #include <cib_utilities.h>
42 : /**
43 : * @brief Name used by TRACE TLOG calls from this source file
44 : */
45 : #define TRACE_NAME "CIBModule" // NOLINT
46 : #define TLVL_ENTER_EXIT_METHODS 10
47 : #define TLVL_CIB_INFO 5
48 : #define TLVL_CIB_DEBUG 10
49 : #define TLVL_CIB_TRACE 15
50 :
51 : // as it happens, the frame structure remains unchanged since NP04
52 : constexpr uint16_t CIB_HSI_FRAME_VERSION = 0x1; // NOLINT
53 : namespace dunedaq::cibmodules {
54 :
55 0 : CIBModule::CIBModule(const std::string& name)
56 : : hsilibs::HSIEventSender(name)
57 0 : , m_is_running(false)
58 0 : , m_is_configured(false)
59 0 : , m_stop_requested(false)
60 0 : , m_receiver_port(8871)
61 0 : , m_receiver_timeout(70000) // 70 ms - we know that triggers will come at 10 Hz max
62 0 : , m_error_state(false)
63 0 : , m_control_ios()
64 0 : , m_control_socket(m_control_ios)
65 0 : , m_control_endpoint()
66 0 : , m_receiver_ios()
67 0 : , m_receiver_socket(m_receiver_ios)
68 0 : , m_thread_(std::bind(&CIBModule::do_hsi_work, this, std::placeholders::_1))
69 0 : , m_calibration_stream_enable(false)
70 0 : , m_calibration_dir("")
71 0 : , m_calibration_prefix("")
72 0 : , m_calibration_file_interval(std::chrono::minutes(15))
73 : // metric utilities
74 0 : , m_num_control_messages_sent(0)
75 0 : , m_num_control_responses_received(0)
76 0 : , m_num_total_triggers_received(0)
77 0 : , m_num_run_triggers_received(0)
78 :
79 0 : , m_trigger_bit(0)
80 0 : , m_receiver_ready(false)
81 : {
82 : // we can infer the instance from the name
83 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Instantiating a cibmodule with argument [" << name << "]";
84 0 : register_command("conf", &CIBModule::do_configure);
85 0 : register_command("start", &CIBModule::do_start);
86 0 : register_command("stop", &CIBModule::do_stop);
87 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Leaving [" << name << "]constructor.";
88 0 : }
89 :
90 0 : CIBModule::~CIBModule()
91 : {
92 0 : if(m_is_running.load())
93 : {
94 0 : const CommandData_t stopobj;
95 :
96 : // const nlohmann::json stopobj;
97 : // this should also take care of closing the streaming socket
98 0 : do_stop(stopobj);
99 0 : }
100 0 : TLOG_DEBUG(TLVL_CIB_INFO) << get_name() << ": Closing the control socket " << std::endl;
101 0 : m_control_socket.close() ;
102 :
103 0 : }
104 :
105 : void
106 0 : CIBModule::init(std::shared_ptr<appfwk::ConfigurationManager> cfgMgr)
107 : {
108 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering init() method";
109 :
110 : // init the sender
111 0 : HSIEventSender::init(cfgMgr);
112 :
113 : // assign the local configuration manager to the argument
114 0 : m_cfg = cfgMgr;
115 :
116 : // get the configuration fragment for this module
117 0 : auto mdal = cfgMgr->get_dal<appmodel::CIBModule>(get_name());
118 0 : if (! mdal)
119 : {
120 0 : throw cibmodules::CIBConfigFailure(ERS_HERE, "Missing Module configuration for " + get_name());
121 : }
122 :
123 : // assign the module configuration
124 0 : m_module = mdal;
125 :
126 : // setting up connections
127 0 : auto iom = iomanager::IOManager::get();
128 :
129 0 : using hsi_frame_t = dunedaq::hsilibs::HSI_FRAME_STRUCT;
130 0 : for ( auto con : m_module->get_outputs() )
131 : {
132 0 : if ( con->get_data_type() == datatype_to_string<hsi_frame_t>() )
133 : {
134 : // Filter connections by UID: only process those with "CIB" or "cib" in the name
135 : // This string comes from the HSISignalWindow UID in the appmodel configuration
136 0 : if ( (con->UID().find("CIB")!=std::string::npos) ||
137 0 : (con->UID().find("cib")!=std::string::npos) )
138 : {
139 0 : TLOG() << get_name() << ": Setting up HSI Frame output : " << con->UID() << std::endl;
140 0 : m_cib_hsi_data_sender = iom->get_sender<hsi_frame_t>(con->UID());
141 : }
142 : else
143 : {
144 0 : TLOG_DEBUG(5) << get_name() << ": Skipping output : " << con->UID();
145 : }
146 :
147 : } // if data type is HSI Frame
148 : } // loop over outputs
149 :
150 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting init() method";
151 0 : }
152 :
153 : void
154 0 : CIBModule::do_configure(const CommandData_t &)
155 : {
156 :
157 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering CIB do_configure()";
158 :
159 : // this returns the structure of the CIBConf object
160 0 : auto conf = m_module ->get_configuration();
161 : // this gets the CIBoardConf object
162 0 : auto board = m_module -> get_board();
163 :
164 0 : auto trigger_conf = conf->get_cib_trigger();
165 :
166 : // identify the trigger bit that this receiver is assigned to
167 : // We need this to construct the HSI frame, right?
168 0 : if (!dunedaq::cibmodules::util::parse_hex(trigger_conf->get_trigger_bit(), m_trigger_bit))
169 : {
170 0 : std::ostringstream msg("");
171 0 : msg << get_name() << ": Unable to parse trigger bit hex string : " << trigger_conf->get_trigger_bit();
172 0 : throw CIBModuleError(ERS_HERE, msg.str());
173 0 : }
174 : else
175 : {
176 0 : TLOG_DEBUG(TLVL_CIB_DEBUG) << get_name() << ": Parsed trigger bit hex string "
177 0 : << trigger_conf->get_trigger_bit() << " to 0x"
178 0 : << std::hex << m_trigger_bit << std::dec
179 0 : << "[" << trigger_conf->get_trigger_id() << "]";
180 : }
181 :
182 : // init monitoring variables
183 0 : m_num_control_messages_sent = 0;
184 0 : m_num_control_responses_received = 0;
185 :
186 : // figure out the identifier of the CIB
187 : // this is set in the configuration
188 0 : auto geo_id = board->get_geo_id();
189 0 : m_det = geo_id->get_detector_id();
190 0 : m_crate = geo_id->get_crate_id();
191 0 : m_slot = geo_id->get_slot_id();
192 :
193 0 : auto session = m_cfg->get_session();
194 :
195 : // init trigger counters
196 0 : m_num_run_triggers_received.store(0);
197 :
198 0 : auto cib_host = conf->get_cib_host();
199 0 : auto cib_port = conf->get_cib_port();
200 :
201 : // network connection to the CIB module
202 0 : boost::asio::ip::tcp::resolver resolver( m_control_ios );
203 0 : boost::asio::ip::tcp::resolver::query query( cib_host,std::to_string(cib_port) ) ; //"np04-iols-cib-02", 8992
204 0 : boost::asio::ip::tcp::resolver::iterator iter = resolver.resolve(query) ;
205 :
206 0 : m_control_endpoint = iter->endpoint();
207 : // attempt the connection.
208 0 : try
209 : {
210 :
211 0 : m_control_socket.connect(m_control_endpoint);
212 0 : m_control_socket.set_option(boost::asio::ip::tcp::no_delay(true));
213 : }
214 0 : catch (std::exception &e)
215 : {
216 0 : std::ostringstream msg("");
217 0 : msg << get_name() << "Exception caught while establishing connection to CIB : " << e.what();
218 : // do nothing more. Just exist
219 0 : m_is_configured.store(false);
220 0 : throw CIBCommunicationError(ERS_HERE, msg.str());
221 0 : }
222 0 : TLOG_DEBUG(TLVL_CIB_INFO) << get_name() << ": Successfully connected to CIB endpoint "
223 0 : << cib_host << ':' << cib_port << std::endl;
224 :
225 : // if necessary, set the calibration stream
226 0 : auto stream_conf = conf->get_calibration_stream();
227 0 : if (stream_conf)
228 : {
229 0 : TLOG() << "Calibration stream enabled";
230 0 : m_calibration_stream_enable = true;
231 0 : m_calibration_dir = stream_conf->get_output_directory();
232 0 : m_calibration_file_interval = std::chrono::duration_cast<decltype(m_calibration_file_interval)>(std::chrono::seconds(stream_conf->get_update_period_s()));
233 : }
234 :
235 : // these are for the local receiver operation
236 0 : m_receiver_port = board->get_receiver_port();
237 0 : m_receiver_timeout = std::chrono::milliseconds(conf->get_connection_timeout_ms());
238 0 : m_receiver_host = board->get_receiver_host();
239 0 : TLOG_DEBUG(TLVL_CIB_INFO) << get_name() << ": Default board receiver network location (PRELIMINARY) "
240 0 : << m_receiver_host << ':' << m_receiver_port << std::endl;
241 :
242 :
243 : // config fragment to be sent to the CIB
244 0 : nlohmann::json config;
245 : //
246 : // if the host is localhost, we need to resolve it to the actual hostname
247 : // and also check whether the port is available
248 : //
249 0 : if (m_receiver_host == "localhost")
250 : {
251 0 : boost::asio::ip::tcp::resolver resolver(m_receiver_ios);
252 : // Check if port is already in use, to try to avoid future conflicts
253 0 : unsigned short port = m_receiver_port;
254 0 : while (check_port_in_use(port))
255 : {
256 : // the problem is that if the port is in use, the CIB will be sending the data to the wrong place
257 : // there is little point in continuing
258 0 : std::ostringstream msg("");
259 0 : msg << "Listener port [" << port << "] is in use by someone else. Trying another.";
260 0 : ers::warning(CIBMessage(ERS_HERE, msg.str()));
261 0 : port++;
262 0 : }
263 0 : if (port != m_receiver_port)
264 : {
265 0 : std::ostringstream msg("");
266 0 : msg << "Listener port [" << m_receiver_port << "] is in use. Relocating to port [" << port << "]";
267 0 : ers::warning(CIBMessage(ERS_HERE, msg.str()));
268 0 : m_receiver_port = port;
269 0 : }
270 : else
271 : {
272 0 : TLOG_DEBUG(TLVL_CIB_INFO) << get_name() << ": Listener port " << m_receiver_port << " is available." << std::endl;
273 : }
274 :
275 : // at this we have to find the hostname to tell the board where to send the data
276 0 : boost::asio::ip::tcp::resolver::query query_for_local(boost::asio::ip::host_name(), "");
277 0 : iter = resolver.resolve(query_for_local);
278 :
279 0 : TLOG_DEBUG(TLVL_CIB_INFO) << get_name() << ": Resolved localhost to "
280 0 : << iter->endpoint().address().to_string() << std::endl;
281 0 : m_receiver_host = iter->endpoint().address().to_string();
282 :
283 : // create the json string out of the config fragment
284 : // replacing the receiver address with the one that we just calculated
285 0 : try
286 : {
287 0 : config = board->get_cib_json(*session, m_receiver_host, m_receiver_port);
288 : }
289 0 : catch (nlohmann::json::exception &e)
290 : {
291 0 : std::ostringstream msg("");
292 0 : msg << get_name() << "Caught a JSON exception converting config fragment : " << e.what();
293 0 : m_is_configured.store(false);
294 0 : throw CIBModuleError(ERS_HERE, msg.str());
295 0 : }
296 0 : catch (std::exception &e)
297 : {
298 0 : std::ostringstream msg("");
299 0 : msg << get_name() << "Caught STD exception while converting config fragment : " << e.what();
300 : // do nothing more. Just exist
301 0 : m_is_configured.store(false);
302 0 : throw CIBModuleError(ERS_HERE, msg.str());
303 0 : }
304 0 : }
305 : else
306 : {
307 : /* just use the configuration information */
308 : // nlohmann::to_json(config, board->get_cib_json(*session));
309 0 : config = board->get_cib_json(*session);
310 : }
311 0 : auto json_dump = config.dump();
312 0 : TLOG_DEBUG(TLVL_CIB_INFO) << get_name() << "Sending configuration: [" << json_dump << "] to CIB board";
313 0 : send_config(config.dump());
314 0 : m_is_configured.store(true);
315 0 : }
316 :
317 : void
318 0 : CIBModule::do_start(const CommandData_t &startobj)
319 : {
320 :
321 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering do_start() method";
322 : // actually, the first thing to check is whether the CIB has been configured
323 : // if not, this won't work
324 0 : if (!m_is_configured.load())
325 : {
326 0 : throw CIBWrongState(ERS_HERE, "CIB has not been successfully configured.");
327 : }
328 :
329 : // Set this to false early so it doesn't interfere with the start
330 0 : m_stop_requested.store(false);
331 0 : m_run_number.store(startobj.at("run").get<daqdataformats::run_number_t>());
332 : // reset the metrics/counters
333 0 : m_num_run_triggers_received.store(0);
334 :
335 0 : TLOG_DEBUG(TLVL_CIB_INFO) << get_name() << ": Sending start of run command with run number " << m_run_number.load();
336 0 : m_thread_.start_working_thread();
337 :
338 : // NFB: There was a potential race condition here: the socket in the working thread
339 : // needs to be in place before the CIB receives order to send data, or we risk having a connection
340 : // failure, if for some reason the CIB attempts to connect before the working thread is ready to receive.
341 0 : if (m_calibration_stream_enable)
342 : {
343 0 : std::stringstream run;
344 0 : run << "run" << m_run_number.load();
345 0 : set_calibration_stream(run.str());
346 0 : }
347 : int cnt = 0;
348 0 : while (!m_receiver_ready.load())
349 : {
350 0 : std::this_thread::sleep_for(std::chrono::milliseconds(10));
351 0 : cnt++;
352 0 : if (cnt > 50)
353 : {
354 : // the socket didn't get ready on time
355 0 : throw CIBModuleError(ERS_HERE, "Receiver socket timed out before becoming ready.");
356 : }
357 : }
358 0 : TLOG_DEBUG(TLVL_CIB_DEBUG) << get_name() << ": All ready to signal the CIB to start";
359 :
360 0 : nlohmann::json cmd;
361 0 : cmd["command"] = "start_run";
362 0 : cmd["run_number"] = m_run_number.load();
363 :
364 0 : if (send_message(cmd.dump()))
365 : {
366 0 : m_is_running.store(true);
367 0 : TLOG() << get_name() << ": CIB run started successfully";
368 : }
369 : else
370 : {
371 0 : throw CIBCommunicationError(ERS_HERE, "Unable to start CIB run");
372 : }
373 :
374 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting do_start() method";
375 0 : }
376 :
377 : void
378 0 : CIBModule::do_stop(const CommandData_t & /*stopobj*/)
379 : {
380 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering do_stop() method";
381 0 : TLOG_DEBUG(TLVL_CIB_DEBUG) << get_name() << ": Sending stop run command" << std::endl;
382 :
383 : // Set stop flag BEFORE sending command so receiver thread knows to expect EOF
384 0 : m_stop_requested.store(true);
385 :
386 0 : if (send_message("{\"command\":\"stop_run\"}"))
387 : {
388 : // Response arrival means CIB has closed its data socket (see Handler::stop_run())
389 : // Receiver thread will detect EOF and exit cleanly
390 0 : std::this_thread::sleep_for(std::chrono::milliseconds(2));
391 0 : TLOG() << get_name() << ": CIB run stopped successfully";
392 0 : m_is_running.store(false);
393 : }
394 : else
395 : {
396 : // failed to sent the message to stop the run.
397 : // stop the collecting thread and then throw, since that
398 : // attempts a cleaner exit
399 0 : m_thread_.stop_working_thread();
400 :
401 0 : throw CIBCommunicationError(ERS_HERE, "Unable to stop CIB");
402 : }
403 : //
404 0 : m_thread_.stop_working_thread();
405 :
406 : // -- print the counters for local info
407 0 : TLOG() << get_name() << ": CIB trigger counter summary after run [" << m_run_number << "]:\n\n"
408 0 : << "IOLS trigger counter in run : " << m_num_run_triggers_received << "\n"
409 0 : << "Global IOLS trigger count : " << m_num_total_triggers_received << std::endl;
410 :
411 : // reset counters
412 0 : m_num_run_triggers_received = 0;
413 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting do_stop() method";
414 0 : }
415 :
416 : // this is where most of the work is really done
417 : void
418 0 : CIBModule::do_hsi_work(std::atomic<bool>& running_flag)
419 : {
420 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering do_work() method";
421 :
422 0 : std::size_t n_bytes = 0 ;
423 0 : std::size_t n_words = 0 ;
424 0 : std::size_t prev_seq = 0 ;
425 0 : bool first = true;
426 :
427 : //connect to socket
428 : // should we keep everything local or under the class?
429 0 : boost::system::error_code ec;
430 :
431 : // check that this port is still available
432 0 : if (check_port_in_use(m_receiver_port))
433 : {
434 : // the problem is that, at this stage, if the port is in use,
435 : // the CIB will be sending the data to the wrong place
436 : // there is little point in continuing
437 0 : std::ostringstream msg("");
438 0 : msg << "Listener port [" << m_receiver_port << "] is in use by someone else. Failing.";
439 0 : ers::error(CIBMessage(ERS_HERE, msg.str()));
440 0 : }
441 :
442 0 : TLOG_DEBUG(TLVL_CIB_INFO) << get_name() << ": Will set up the listener on port " << m_receiver_port << std::endl;
443 0 : boost::asio::ip::tcp::acceptor acceptor(m_receiver_ios,boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(),m_receiver_port ));
444 :
445 0 : acceptor.listen(boost::asio::ip::tcp::socket::max_connections, ec);
446 0 : if (ec)
447 : {
448 0 : std::ostringstream msg("");
449 0 : msg << get_name() << ": CIB got an error listening on socket: :" << m_receiver_port << " -- reason: '" << ec << "'";
450 0 : throw CIBCommunicationError(ERS_HERE,msg.str());
451 : return;
452 0 : }
453 : else
454 : {
455 0 : TLOG_DEBUG(TLVL_CIB_INFO) << get_name() << ": Waiting for an incoming connection on port " << m_receiver_port << std::endl;
456 : }
457 :
458 0 : std::future<void> accepting = async( std::launch::async, [&]{ acceptor.accept(m_receiver_socket,ec) ; } ) ;
459 0 : if (ec)
460 : {
461 0 : std::stringstream msg;
462 0 : msg << "Socket opening failed:: " << ec.message();
463 0 : ers::error(CIBCommunicationError(ERS_HERE,msg.str()));
464 0 : return;
465 0 : }
466 : //
467 0 : m_receiver_ready.store(true);
468 :
469 0 : while ( running_flag.load() && !m_stop_requested.load() )
470 : {
471 0 : if ( accepting.wait_for( m_receiver_timeout ) == std::future_status::ready )
472 : {
473 : break ;
474 : }
475 : }
476 :
477 0 : TLOG() << get_name() << ": Connection received: start reading" << std::endl;
478 :
479 : // -- A couple of variables to help in the data parsing
480 : /**
481 : * The structure is a bit different than in the CTB
482 : * The TCP packet contains a single word (the trigger)
483 : * But other than that, everything are triggers
484 : */
485 :
486 0 : dunedaq::cib::daq::iols_tcp_packet_t tcp_packet;
487 0 : TLOG_DEBUG(TLVL_CIB_TRACE) << "Checking expected sizes: "
488 0 : << " sizeof(iols_tcp_packet_t)=" << sizeof(dunedaq::cib::daq::iols_tcp_packet_t)
489 0 : << " sizeof(iols_trigger_t)=" << sizeof(dunedaq::cib::daq::iols_trigger_t)
490 0 : << " sizeof(tcp_header_t)=" << sizeof(dunedaq::cib::daq::tcp_header_t)
491 0 : << std::endl;
492 :
493 : //boost::system::error_code receiving_error;
494 0 : bool connection_closed = false ;
495 :
496 0 : while (running_flag.load() && !m_stop_requested.load())
497 : {
498 0 : update_calibration_file();
499 :
500 0 : if ( ! read(m_receiver_socket, tcp_packet ) )
501 : {
502 0 : connection_closed = true ;
503 0 : break;
504 : }
505 :
506 0 : n_bytes = tcp_packet.header.packet_size ;
507 0 : n_words = n_bytes/sizeof(dunedaq::cib::daq::iols_trigger_t);
508 :
509 0 : if (n_words != 1)
510 : {
511 0 : std::ostringstream msg("");
512 0 : msg << "Received more than one IoLS trigger word at once! This should never happen. Got "
513 0 : << n_bytes << " (expected " << sizeof(dunedaq::cib::daq::iols_trigger_t) << ")";
514 0 : ers::warning(CIBMessage(ERS_HERE, msg.str()));
515 0 : }
516 : // the CIB only ships one word per packet....so this error should be impossible
517 : // check continuity of the sequence numbers
518 0 : if (first)
519 : {
520 : // first word being fetched. The sequence number should be zero
521 0 : if (tcp_packet.header.sequence_id != 0)
522 : {
523 0 : std::ostringstream msg("");
524 0 : msg << "Missing sequence. First word should have sequence number 0. Got " << static_cast<int>(tcp_packet.header.sequence_id );
525 0 : ers::warning(CIBMessage(ERS_HERE, msg.str()));
526 0 : }
527 : first = false;
528 : }
529 : else
530 : {
531 0 : bool failed = false;
532 : // in case it rolled over, compare to 255
533 0 : if (tcp_packet.header.sequence_id == 0)
534 : {
535 0 : if (prev_seq != 255)
536 : {
537 : failed = true;
538 : }
539 : }
540 : else
541 : {
542 0 : if (tcp_packet.header.sequence_id != (prev_seq+1))
543 : {
544 : failed = true;
545 : }
546 : }
547 : if (failed)
548 : {
549 0 : std::ostringstream msg("");
550 0 : msg << "Skipped CIB word sequence. Prev word " << prev_seq << " current word " << static_cast<int>(tcp_packet.header.sequence_id );
551 0 : ers::warning(CIBMessage(ERS_HERE, msg.str()));
552 0 : }
553 : }
554 0 : prev_seq = tcp_packet.header.sequence_id;
555 :
556 0 : update_buffer_counts(n_words);
557 :
558 : // temporarily print the trigger
559 : // I leave this here for debugging purposes, but don“t even keep it in the logs, since it is too verbose. We can always add it back if we need to debug something
560 : // TLOG_DEBUG(TLVL_CIB_TRACE) << "TRIGGER : ts " << tcp_packet.word.timestamp
561 : // << " pos_m1 " << util::get_m1(tcp_packet.word)
562 : // << " pos_m2 " << util::get_m2(tcp_packet.word)
563 : // << " pos_m3 " << util::get_m3(tcp_packet.word);
564 :
565 0 : if ( m_calibration_stream_enable )
566 : {
567 0 : m_calibration_file.write( reinterpret_cast<const char*>( & tcp_packet.word ), sizeof(tcp_packet.word) ) ; // NOLINT
568 0 : m_calibration_file.flush() ;
569 : } // word printing in calibration stream
570 :
571 0 : TLOG_DEBUG(TLVL_CIB_DEBUG) << get_name() << "Received IoLS trigger word!";
572 0 : ++m_num_total_triggers_received;
573 0 : ++m_num_run_triggers_received;
574 :
575 :
576 : // we do not need to know anything else
577 : // ideally, one could add other information such as the direction
578 : // this should be coming packed in the trigger word
579 : // note, however, that to reconstruct the trace direction we also would need the source position
580 : // and that we cannot afford to send, so we can just make it up out of the IoLS system
581 : //
582 : // Send HSI data to a DLH
583 0 : std::array<uint32_t, 7> hsi_struct;
584 0 : hsi_struct[0] = (0x1 << 26) | // some random bit that could indicate the type of frame
585 0 : (m_slot << 22) | // slot number
586 0 : (m_crate << 12) | // crate number
587 0 : (m_det << 6) | // detector number
588 : CIB_HSI_FRAME_VERSION;
589 : // timestamps - I like the explicit masking here to mistakes
590 0 : hsi_struct[1] = tcp_packet.word.timestamp & 0xFFFFFFFF; // ts low
591 0 : hsi_struct[2] = (tcp_packet.word.timestamp >> 32) & 0xFFFFFFFF; // ts high
592 :
593 : // we shall use these 2 sets of 32 bits to define the periscope position
594 : // pos_m3 == linear stage
595 0 : hsi_struct[3] = dunedaq::cibmodules::util::get_m3(tcp_packet.word); // lower 32b 0
596 : // pos_m3 == RNN600
597 0 : hsi_struct[4] = dunedaq::cibmodules::util::get_m2(tcp_packet.word); // upper 32b
598 : /**
599 : * A note about the 5th entry
600 : * The trigger bit is actually mapped into a single bit, that is then remapped back
601 : * into an index
602 : */
603 0 : hsi_struct[5] = m_trigger_bit; // trigger_map;
604 0 : hsi_struct[6] = m_num_run_triggers_received.load(); // m_generated_counter;
605 :
606 : // Same thing here. If something really bad happens, this code can be very useful, but we do not want to print this in the logs, unless strictly necessary, since it is too verbose.
607 : // TLOG_DEBUG(TLVL_CIB_TRACE) << "CIB HSI Frame: "
608 : // << "0x" << std::hex << hsi_struct[0]
609 : // << ", 0x" << hsi_struct[1]
610 : // << ", 0x" << hsi_struct[2]
611 : // << ", 0x" << hsi_struct[3]
612 : // << ", 0x" << hsi_struct[4]
613 : // << ", 0x" << hsi_struct[5]
614 : // << ", 0x" << hsi_struct[6]
615 : // << std::dec << std::endl;
616 :
617 0 : if (!m_cib_hsi_data_sender)
618 : {
619 0 : std::ostringstream msg("");
620 0 : msg << "HSI Data Sender not properly configured! This will go down in flames.";
621 0 : ers::error(CIBCommunicationError(ERS_HERE, msg.str()));
622 : // we cannot continue
623 0 : break ;
624 0 : }
625 :
626 0 : send_raw_hsi_data(hsi_struct, m_cib_hsi_data_sender.get());
627 :
628 : // TODO Nuno Barros Apr-02-2024 : properly fill device id when someone explains me
629 : // how to get it
630 : // in fact, this may disappear in the future where the CIB is decoupled from
631 : // the HSI infrastructure
632 0 : dfmessages::HSIEvent event(m_det,
633 : m_trigger_bit,
634 : tcp_packet.word.timestamp,
635 : m_num_run_triggers_received.load(),
636 0 : m_run_number);
637 :
638 0 : send_hsi_event(event);
639 0 : if (connection_closed)
640 : {
641 : break ;
642 : }
643 : }
644 :
645 : // Wait for stop signal (which only comes after CIB closed its sender socket)
646 0 : while (m_is_running.load())
647 : {
648 0 : std::this_thread::sleep_for(std::chrono::microseconds(100));
649 : }
650 :
651 0 : boost::system::error_code closing_error;
652 : // if the system is already in an error state
653 : // we should call for a socket shutdown to force
654 : // the connection to close
655 0 : if (m_error_state.load())
656 : {
657 :
658 0 : m_receiver_socket.shutdown(boost::asio::ip::tcp::socket::shutdown_send, closing_error);
659 :
660 0 : if (closing_error)
661 : {
662 0 : std::stringstream msg;
663 0 : msg << "Error in shutdown " << closing_error.message();
664 0 : ers::error(CIBCommunicationError(ERS_HERE, msg.str()));
665 0 : }
666 : }
667 :
668 0 : m_receiver_socket.close(closing_error) ;
669 :
670 0 : if ( closing_error )
671 : {
672 0 : std::stringstream msg;
673 0 : msg << "Socket closing failed:: " << closing_error.message();
674 0 : ers::error(CIBCommunicationError(ERS_HERE,msg.str()));
675 0 : }
676 :
677 0 : m_receiver_ready.store(false);
678 :
679 0 : TLOG_DEBUG(TLVL_CIB_INFO) << get_name() << ": End of do_work loop: stop receiving data from the CIB";
680 :
681 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting do_work() method";
682 0 : }
683 :
684 : template<typename T>
685 0 : bool CIBModule::read(boost::asio::ip::tcp::socket &socket, T &obj)
686 : {
687 :
688 0 : boost::system::error_code receiving_error;
689 0 : boost::asio::read( socket,
690 0 : boost::asio::buffer( &obj, sizeof(T) ),
691 : receiving_error ) ;
692 :
693 0 : if ( ! receiving_error )
694 : {
695 : return true ;
696 : }
697 :
698 0 : if ( receiving_error == boost::asio::error::eof)
699 : {
700 0 : if (m_stop_requested.load())
701 : {
702 0 : TLOG_DEBUG(TLVL_CIB_INFO) << get_name() << ": Socket closed after stop request." << std::endl;
703 : }
704 : else
705 : {
706 0 : std::string error_message = "Socket closed: " + receiving_error.message();
707 0 : ers::error(CIBCommunicationError(ERS_HERE, error_message));
708 0 : }
709 :
710 0 : return false ;
711 : }
712 :
713 : if ( receiving_error )
714 : {
715 0 : std::string error_message = "Read failure: " + receiving_error.message();
716 0 : ers::error(CIBCommunicationError(ERS_HERE, error_message));
717 : return false ;
718 0 : }
719 :
720 : return true ;
721 : }
722 :
723 0 : void CIBModule::init_calibration_file()
724 : {
725 0 : if ( ! m_calibration_stream_enable )
726 : {
727 0 : return ;
728 : }
729 0 : char file_name[200] = "" ;
730 0 : time_t rawtime;
731 0 : time( & rawtime ) ;
732 0 : struct tm local_tm;
733 0 : struct tm * timeinfo = localtime_r( & rawtime , &local_tm) ;
734 0 : strftime( file_name, sizeof(file_name), "%F_%H.%M.%S.iols.calib", timeinfo );
735 0 : std::string global_name = m_calibration_dir + m_calibration_prefix + file_name ;
736 0 : m_calibration_file.open( global_name, std::ofstream::binary ) ;
737 0 : if ( ! m_calibration_file.is_open() )
738 : {
739 0 : std::ostringstream msg ;
740 0 : msg << get_name() << ": Unable to open calibration stream file: " << global_name ;
741 0 : ers::warning(CIBMessage(ERS_HERE, msg.str()));
742 0 : m_calibration_stream_enable = false ;
743 0 : return ;
744 0 : }
745 0 : m_last_calibration_file_update = std::chrono::steady_clock::now();
746 0 : TLOG_DEBUG(TLVL_CIB_INFO) << get_name() << ": New Calibration Stream file: " << global_name << std::endl ;
747 0 : }
748 :
749 0 : void CIBModule::update_calibration_file()
750 : {
751 :
752 0 : if ( ! m_calibration_stream_enable )
753 : {
754 0 : return ;
755 : }
756 :
757 0 : std::chrono::steady_clock::time_point check_point = std::chrono::steady_clock::now();
758 :
759 0 : if ( check_point - m_last_calibration_file_update < m_calibration_file_interval )
760 : {
761 : return ;
762 : }
763 :
764 0 : m_calibration_file.close() ;
765 0 : init_calibration_file() ;
766 :
767 : }
768 :
769 0 : bool CIBModule::set_calibration_stream( const std::string & prefix )
770 : {
771 :
772 0 : if ( m_calibration_dir.back() != '/' )
773 : {
774 0 : m_calibration_dir += '/' ;
775 : }
776 0 : m_calibration_prefix = prefix ;
777 0 : if ( prefix.size() > 0 )
778 : {
779 0 : m_calibration_prefix += '_' ;
780 : }
781 : // possibly we could check here if the directory is valid and writable before assuming the calibration stream is valid
782 0 : return true ;
783 : }
784 :
785 0 : void CIBModule::send_config( const std::string & config ) {
786 :
787 0 : TLOG_DEBUG(TLVL_CIB_INFO) << get_name() << ": Sending config" << std::endl;
788 :
789 : // structure the message to have a common management structure
790 0 : nlohmann::json conf;
791 0 : conf["command"] = "config";
792 0 : conf["config"] = nlohmann::json::parse(config);
793 :
794 0 : TLOG_DEBUG(TLVL_CIB_DEBUG) << get_name() << ": Shipped config : " << conf.dump() << std::endl;
795 :
796 0 : if ( send_message( conf.dump() ) )
797 : {
798 0 : m_is_configured.store(true) ;
799 : }
800 : else
801 : {
802 0 : throw CIBCommunicationError(ERS_HERE, "Unable to configure CIB");
803 : }
804 0 : }
805 :
806 0 : bool CIBModule::send_message( const std::string & msg )
807 : {
808 :
809 : //add error options
810 0 : boost::system::error_code error;
811 0 : TLOG_DEBUG(1) << get_name() << ": Sending message: " << msg;
812 :
813 0 : m_num_control_messages_sent++;
814 :
815 0 : boost::asio::write( m_control_socket, boost::asio::buffer( msg ), error ) ;
816 0 : boost::array<char, 1024> reply_buf{" "} ;
817 0 : m_control_socket.read_some( boost::asio::buffer(reply_buf ), error);
818 0 : std::stringstream raw_answer( std::string(reply_buf .begin(), reply_buf .end() ) ) ;
819 0 : TLOG_DEBUG(1) << get_name() << ": Unformatted answer: " << raw_answer.str();
820 :
821 0 : nlohmann::json answer ;
822 0 : raw_answer >> answer ;
823 0 : nlohmann::json & messages = answer["feedback"] ;
824 0 : TLOG_DEBUG(1) << get_name() << ": Received messages: " << messages.size();
825 :
826 0 : bool ret = true ;
827 0 : for (nlohmann::json::size_type i = 0; i != messages.size(); ++i )
828 : {
829 :
830 0 : m_num_control_responses_received++;
831 :
832 0 : std::string type = messages[i]["type"].dump() ;
833 0 : if ( type.find("error") != std::string::npos || type.find("Error") != std::string::npos || type.find("ERROR") != std::string::npos )
834 : {
835 0 : ers::error(CIBMessage(ERS_HERE, messages[i]["message"].dump()));
836 0 : ret = false ;
837 : }
838 0 : else if ( type.find("warning") != std::string::npos || type.find("Warning") != std::string::npos || type.find("WARNING") != std::string::npos )
839 : {
840 0 : ers::warning(CIBMessage(ERS_HERE, messages[i]["message"].dump()));
841 : }
842 0 : else if ( type.find("info") != std::string::npos || type.find("Info") != std::string::npos || type.find("INFO") != std::string::npos)
843 : {
844 0 : TLOG() << "Message from the CIB : " << messages[i]["message"].dump();
845 : }
846 : else
847 : {
848 0 : std::stringstream blob;
849 0 : blob << messages[i] ;
850 0 : TLOG() << get_name() << ": Unformatted feedback from the board: " << blob.str();
851 0 : }
852 0 : }
853 0 : return ret;
854 0 : }
855 :
856 : void
857 0 : CIBModule::update_buffer_counts(uint new_count) // NOLINT(build/unsigned)
858 : {
859 0 : std::unique_lock mon_data_lock(m_buffer_counts_mutex);
860 0 : if (m_buffer_counts.size() > 1000)
861 : {
862 0 : m_buffer_counts.pop_front();
863 : }
864 0 : m_buffer_counts.push_back(new_count);
865 0 : }
866 :
867 : double
868 0 : CIBModule::read_average_buffer_counts()
869 : {
870 0 : std::unique_lock mon_data_lock(m_buffer_counts_mutex);
871 :
872 0 : double total_counts;
873 0 : uint32_t number_of_counts; // NOLINT(build/unsigned)
874 :
875 0 : total_counts = 0;
876 0 : number_of_counts = m_buffer_counts.size();
877 :
878 0 : if (number_of_counts) {
879 0 : for (uint i = 0; i < number_of_counts; ++i) { // NOLINT(build/unsigned)
880 0 : total_counts = total_counts + m_buffer_counts.at(i);
881 : }
882 0 : return total_counts / number_of_counts;
883 : } else {
884 : return 0;
885 : }
886 0 : }
887 :
888 0 : void CIBModule::generate_opmon_data()
889 : {
890 0 : dunedaq::cibmodules::opmon::CIBModuleInfo module_info;
891 :
892 0 : module_info.set_num_control_messages_sent(m_num_control_messages_sent.load());
893 0 : module_info.set_num_control_responses_received(m_num_control_responses_received.load());
894 0 : module_info.set_hardware_running(m_is_running.load());
895 0 : module_info.set_hardware_configured(m_is_configured.load());
896 0 : module_info.set_num_total_triggers_received(m_num_total_triggers_received.load());
897 0 : module_info.set_num_run_triggers_received(m_num_run_triggers_received.load());
898 :
899 : // -- need to define these counters (and set the code to update them
900 0 : module_info.set_sent_hsi_events_counter(m_sent_counter.load());
901 0 : module_info.set_failed_to_send_hsi_events_counter(m_failed_to_send_counter.load());
902 : //module_info.set_last_sent_timestamp(m_last_sent_timestamp.load());
903 0 : module_info.set_average_buffer_occupancy(read_average_buffer_counts());
904 :
905 0 : publish(std::move(module_info));
906 :
907 : // should we also publish specific trigger info?
908 : // doesn't seem necessary at this time
909 :
910 0 : }
911 :
912 0 : bool CIBModule::check_port_in_use(unsigned short port)
913 : {
914 0 : using namespace boost::asio;
915 0 : using ip::tcp;
916 :
917 0 : io_service svc;
918 0 : tcp::acceptor a(svc);
919 :
920 0 : boost::system::error_code ec;
921 0 : a.open(tcp::v4(), ec) || a.bind({ tcp::v4(), port }, ec);
922 0 : a.close();
923 0 : return ec == error::address_in_use;
924 :
925 0 : }
926 : } // namespace dunedaq::cibmodules
927 :
928 0 : DEFINE_DUNE_DAQ_MODULE(dunedaq::cibmodules::CIBModule)
|