LCOV - code coverage report
Current view: top level - cibmodules/plugins - CIBModule.cpp (source / functions) Coverage Total Hit
Test: code.result Lines: 0.0 % 443 0
Test Date: 2026-03-29 15:29:34 Functions: 0.0 % 61 0

            Line data    Source code
       1              : /**
       2              :  * @file CIBModule.cpp
       3              :  *
       4              :  * Implementations of CIBModule's functions
       5              :  *
       6              :  * This is part of the DUNE DAQ Software Suite, copyright 2020.
       7              :  * Licensing/copyright details are in the COPYING file that you should have
       8              :  * received with this code.
       9              :  */
      10              : #include "confmodel/GeoId.hpp"
      11              : 
      12              : // these are the data types in the application model
      13              : #include "appmodel/CIBConf.hpp"
      14              : #include "appmodel/CIBCalibrationStream.hpp"
      15              : #include "appmodel/CIBoardConf.hpp"
      16              : #include "appmodel/CIBTrigger.hpp"
      17              : 
      18              : #include "CIBModule.hpp"
      19              : #include "CIBModuleIssues.hpp"
      20              : 
      21              : #include "iomanager/IOManager.hpp"
      22              : #include "logging/Logging.hpp"
      23              : 
      24              : #include <chrono>
      25              : #include <string>
      26              : #include <thread>
      27              : #include <vector>
      28              : #include <map>
      29              : #include <queue>
      30              : #include <utility>
      31              : #include <charconv>
      32              : #include <cstdint>
      33              : #include <string_view>
      34              : 
      35              : // since cib_data_fmt is shared with the board
      36              : // we need this macro to avoid clashes with the namespaces
      37              : // not the cleanest solution, but it works and avoids maintaining two separate versions of the same struct
      38              : #define CIB_DUNEDAQ 1
      39              : // we may need this to help parse the data format arriving from the CIB
      40              : #include <cib_data_fmt.h>
      41              : #include <cib_utilities.h>
      42              : /**
      43              :  * @brief Name used by TRACE TLOG calls from this source file
      44              :  */
      45              : #define TRACE_NAME "CIBModule" // NOLINT
      46              : #define TLVL_ENTER_EXIT_METHODS 10
      47              : #define TLVL_CIB_INFO 5
      48              : #define TLVL_CIB_DEBUG 10
      49              : #define TLVL_CIB_TRACE 15
      50              : 
      51              : // as it happens, the frame structure remains unchanged since  NP04
      52              : constexpr uint16_t CIB_HSI_FRAME_VERSION = 0x1; // NOLINT
      53              : namespace dunedaq::cibmodules {
      54              : 
      55            0 :   CIBModule::CIBModule(const std::string& name)
      56              :               : hsilibs::HSIEventSender(name)
      57            0 :                 , m_is_running(false)
      58            0 :                 , m_is_configured(false)
      59            0 :                 , m_stop_requested(false)
      60            0 :                 , m_receiver_port(8871)
      61            0 :                 , m_receiver_timeout(70000) // 70 ms - we know that triggers will come at 10 Hz max
      62            0 :                 , m_error_state(false)
      63            0 :                 , m_control_ios()
      64            0 :                 , m_control_socket(m_control_ios)
      65            0 :                 , m_control_endpoint()
      66            0 :                 , m_receiver_ios()
      67            0 :                 , m_receiver_socket(m_receiver_ios)
      68            0 :                 , m_thread_(std::bind(&CIBModule::do_hsi_work, this, std::placeholders::_1))
      69            0 :                 , m_calibration_stream_enable(false)
      70            0 :                 , m_calibration_dir("")
      71            0 :                 , m_calibration_prefix("")
      72            0 :                 , m_calibration_file_interval(std::chrono::minutes(15))
      73              :                 // metric utilities
      74            0 :                 , m_num_control_messages_sent(0)
      75            0 :                 , m_num_control_responses_received(0)
      76            0 :                 , m_num_total_triggers_received(0)
      77            0 :                 , m_num_run_triggers_received(0)
      78              : 
      79            0 :                 , m_trigger_bit(0)
      80            0 :                 , m_receiver_ready(false)
      81              :   {
      82              :     // we can infer the instance from the name
      83            0 :     TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Instantiating a cibmodule with argument [" << name << "]";
      84            0 :     register_command("conf", &CIBModule::do_configure);
      85            0 :     register_command("start", &CIBModule::do_start);
      86            0 :     register_command("stop", &CIBModule::do_stop);
      87            0 :     TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Leaving [" << name << "]constructor.";
      88            0 :   }
      89              : 
      90            0 :   CIBModule::~CIBModule()
      91              :   {
      92            0 :     if(m_is_running.load())
      93              :     {
      94            0 :       const CommandData_t stopobj;
      95              : 
      96              :       // const nlohmann::json stopobj;
      97              :       // this should also take care of closing the streaming socket
      98            0 :       do_stop(stopobj);
      99            0 :     }
     100            0 :     TLOG_DEBUG(TLVL_CIB_INFO) << get_name() << ": Closing the control socket " << std::endl;
     101            0 :     m_control_socket.close() ;
     102              : 
     103            0 :   }
     104              : 
     105              :   void
     106            0 :   CIBModule::init(std::shared_ptr<appfwk::ConfigurationManager> cfgMgr)
     107              :   {
     108            0 :     TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering init() method";
     109              : 
     110              :     // init the sender
     111            0 :     HSIEventSender::init(cfgMgr);
     112              : 
     113              :     // assign the local configuration manager to the argument
     114            0 :     m_cfg = cfgMgr;
     115              : 
     116              :     // get the configuration fragment for this module
     117            0 :     auto mdal = cfgMgr->get_dal<appmodel::CIBModule>(get_name());
     118            0 :     if (! mdal)
     119              :     {
     120            0 :       throw cibmodules::CIBConfigFailure(ERS_HERE, "Missing Module configuration for " + get_name());
     121              :     }
     122              : 
     123              :     // assign the module configuration
     124            0 :     m_module = mdal;
     125              : 
     126              :     // setting up connections
     127            0 :     auto iom = iomanager::IOManager::get();
     128              : 
     129            0 :     using hsi_frame_t = dunedaq::hsilibs::HSI_FRAME_STRUCT;
     130            0 :     for ( auto con : m_module->get_outputs() )
     131              :     {
     132            0 :       if ( con->get_data_type() == datatype_to_string<hsi_frame_t>() )
     133              :       {
     134              :         // Filter connections by UID: only process those with "CIB" or "cib" in the name
     135              :         // This string comes from the HSISignalWindow UID in the appmodel configuration
     136            0 :         if ( (con->UID().find("CIB")!=std::string::npos) || 
     137            0 :              (con->UID().find("cib")!=std::string::npos) )
     138              :         {
     139            0 :           TLOG() << get_name() << ": Setting up HSI Frame output : " << con->UID() << std::endl;
     140            0 :           m_cib_hsi_data_sender = iom->get_sender<hsi_frame_t>(con->UID());
     141              :         }
     142              :         else
     143              :         {
     144            0 :           TLOG_DEBUG(5) << get_name() << ": Skipping output : " << con->UID();
     145              :         }
     146              : 
     147              :       } // if data type is HSI Frame
     148              :     } // loop over outputs
     149              : 
     150            0 :     TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting init() method";
     151            0 :   }
     152              : 
     153              :   void
     154            0 :   CIBModule::do_configure(const CommandData_t &)
     155              :   {
     156              : 
     157            0 :     TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering CIB do_configure()";
     158              : 
     159              :     // this returns the structure of the CIBConf object
     160            0 :     auto conf = m_module ->get_configuration();
     161              :     // this gets the CIBoardConf object
     162            0 :     auto board = m_module -> get_board();
     163              : 
     164            0 :     auto trigger_conf = conf->get_cib_trigger();
     165              : 
     166              :     // identify the trigger bit that this receiver is assigned to
     167              :     // We need this to construct the HSI frame, right?
     168            0 :     if (!dunedaq::cibmodules::util::parse_hex(trigger_conf->get_trigger_bit(), m_trigger_bit))
     169              :     {
     170            0 :       std::ostringstream msg("");
     171            0 :       msg << get_name() << ": Unable to parse trigger bit hex string : " << trigger_conf->get_trigger_bit();
     172            0 :       throw CIBModuleError(ERS_HERE, msg.str());
     173            0 :     }
     174              :     else
     175              :     {
     176            0 :       TLOG_DEBUG(TLVL_CIB_DEBUG) << get_name() << ": Parsed trigger bit hex string "
     177            0 :                                 << trigger_conf->get_trigger_bit() << " to 0x"
     178            0 :                                 << std::hex << m_trigger_bit << std::dec
     179            0 :                                 << "[" << trigger_conf->get_trigger_id() << "]";
     180              :     }
     181              :     
     182              :     // init monitoring variables
     183            0 :     m_num_control_messages_sent = 0;
     184            0 :     m_num_control_responses_received = 0;
     185              : 
     186              :     // figure out the identifier of the CIB
     187              :     // this is set in the configuration
     188            0 :     auto geo_id = board->get_geo_id();
     189            0 :     m_det = geo_id->get_detector_id();
     190            0 :     m_crate = geo_id->get_crate_id();
     191            0 :     m_slot = geo_id->get_slot_id();
     192              : 
     193            0 :     auto session = m_cfg->get_session();
     194              : 
     195              :     // init trigger counters
     196            0 :     m_num_run_triggers_received.store(0);
     197              : 
     198            0 :     auto cib_host = conf->get_cib_host();
     199            0 :     auto cib_port = conf->get_cib_port();
     200              : 
     201              :     // network connection to the CIB module
     202            0 :     boost::asio::ip::tcp::resolver resolver( m_control_ios );
     203            0 :     boost::asio::ip::tcp::resolver::query query( cib_host,std::to_string(cib_port) ) ; //"np04-iols-cib-02", 8992
     204            0 :     boost::asio::ip::tcp::resolver::iterator iter = resolver.resolve(query) ;
     205              : 
     206            0 :     m_control_endpoint = iter->endpoint();
     207              :     // attempt the connection.
     208            0 :     try
     209              :     {
     210              : 
     211            0 :       m_control_socket.connect(m_control_endpoint);
     212            0 :       m_control_socket.set_option(boost::asio::ip::tcp::no_delay(true));
     213              :     }
     214            0 :     catch (std::exception &e)
     215              :     {
     216            0 :       std::ostringstream msg("");
     217            0 :       msg << get_name() << "Exception caught while establishing connection to CIB : " << e.what();
     218              :       // do nothing more. Just exist
     219            0 :       m_is_configured.store(false);
     220            0 :       throw CIBCommunicationError(ERS_HERE, msg.str());
     221            0 :     }
     222            0 :     TLOG_DEBUG(TLVL_CIB_INFO) << get_name() << ": Successfully connected to CIB endpoint "
     223            0 :                               << cib_host << ':' << cib_port << std::endl;
     224              : 
     225              :     // if necessary, set the calibration stream
     226            0 :     auto stream_conf = conf->get_calibration_stream();
     227            0 :     if (stream_conf)
     228              :     {
     229            0 :       TLOG() << "Calibration stream enabled";
     230            0 :       m_calibration_stream_enable = true;
     231            0 :       m_calibration_dir = stream_conf->get_output_directory();
     232            0 :       m_calibration_file_interval = std::chrono::duration_cast<decltype(m_calibration_file_interval)>(std::chrono::seconds(stream_conf->get_update_period_s()));
     233              :     }
     234              : 
     235              :     // these are for the local receiver operation
     236            0 :     m_receiver_port = board->get_receiver_port();
     237            0 :     m_receiver_timeout = std::chrono::milliseconds(conf->get_connection_timeout_ms());
     238            0 :     m_receiver_host = board->get_receiver_host();
     239            0 :     TLOG_DEBUG(TLVL_CIB_INFO) << get_name() << ": Default board receiver network location (PRELIMINARY) "
     240            0 :                               << m_receiver_host << ':' << m_receiver_port << std::endl;
     241              : 
     242              : 
     243              :     // config fragment to be sent to the CIB
     244            0 :     nlohmann::json config;
     245              :     //
     246              :     // if the host is localhost, we need to resolve it to the actual hostname
     247              :     // and also check whether the port is available
     248              :     //
     249            0 :     if (m_receiver_host == "localhost")
     250              :     {
     251            0 :       boost::asio::ip::tcp::resolver resolver(m_receiver_ios);
     252              :       // Check if port is already in use, to try to avoid future conflicts
     253            0 :       unsigned short port = m_receiver_port;
     254            0 :       while (check_port_in_use(port))
     255              :       {
     256              :         // the problem is that if the port is in use, the CIB will be sending the data to the wrong place
     257              :         // there is little point in continuing
     258            0 :         std::ostringstream msg("");
     259            0 :         msg << "Listener port [" << port << "] is in use by someone else. Trying another.";
     260            0 :         ers::warning(CIBMessage(ERS_HERE, msg.str()));
     261            0 :         port++;
     262            0 :       }
     263            0 :       if (port != m_receiver_port)
     264              :       {
     265            0 :         std::ostringstream msg("");
     266            0 :         msg << "Listener port [" << m_receiver_port << "] is in use. Relocating to port [" << port << "]";
     267            0 :         ers::warning(CIBMessage(ERS_HERE, msg.str()));
     268            0 :         m_receiver_port = port;
     269            0 :       }
     270              :       else
     271              :       {
     272            0 :         TLOG_DEBUG(TLVL_CIB_INFO) << get_name() << ": Listener port " << m_receiver_port << " is available." << std::endl;
     273              :       }
     274              : 
     275              :       // at this we have to find the hostname to tell the board where to send the data
     276            0 :       boost::asio::ip::tcp::resolver::query query_for_local(boost::asio::ip::host_name(), "");
     277            0 :       iter = resolver.resolve(query_for_local);
     278              : 
     279            0 :       TLOG_DEBUG(TLVL_CIB_INFO) << get_name() << ": Resolved localhost to "
     280            0 :                                 << iter->endpoint().address().to_string() << std::endl;
     281            0 :       m_receiver_host = iter->endpoint().address().to_string();
     282              : 
     283              :       // create the json string out of the config fragment
     284              :       // replacing the receiver address with the one that we just calculated
     285            0 :       try
     286              :       {
     287            0 :         config = board->get_cib_json(*session, m_receiver_host, m_receiver_port);
     288              :       }
     289            0 :       catch (nlohmann::json::exception &e)
     290              :       {
     291            0 :         std::ostringstream msg("");
     292            0 :         msg << get_name() << "Caught a JSON exception converting config fragment : " << e.what();
     293            0 :         m_is_configured.store(false);
     294            0 :         throw CIBModuleError(ERS_HERE, msg.str());
     295            0 :       }
     296            0 :       catch (std::exception &e)
     297              :       {
     298            0 :         std::ostringstream msg("");
     299            0 :         msg << get_name() << "Caught STD exception while converting config fragment : " << e.what();
     300              :         // do nothing more. Just exist
     301            0 :         m_is_configured.store(false);
     302            0 :         throw CIBModuleError(ERS_HERE, msg.str());
     303            0 :       }
     304            0 :     }
     305              :     else
     306              :     {
     307              :       /* just use the configuration information */
     308              :       // nlohmann::to_json(config, board->get_cib_json(*session));
     309            0 :       config = board->get_cib_json(*session);
     310              :     }
     311            0 :     auto json_dump = config.dump();
     312            0 :     TLOG_DEBUG(TLVL_CIB_INFO) << get_name() << "Sending configuration: [" << json_dump << "] to CIB board";
     313            0 :     send_config(config.dump());
     314            0 :     m_is_configured.store(true);
     315            0 :   }
     316              : 
     317              :     void
     318            0 :     CIBModule::do_start(const CommandData_t &startobj)
     319              :     {
     320              : 
     321            0 :       TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering do_start() method";
     322              :       // actually, the first thing to check is whether the CIB has been configured
     323              :       // if not, this won't work
     324            0 :       if (!m_is_configured.load())
     325              :       {
     326            0 :         throw CIBWrongState(ERS_HERE, "CIB has not been successfully configured.");
     327              :       }
     328              : 
     329              :       // Set this to false early so it doesn't interfere with the start
     330            0 :       m_stop_requested.store(false);
     331            0 :       m_run_number.store(startobj.at("run").get<daqdataformats::run_number_t>());
     332              :       // reset the metrics/counters
     333            0 :       m_num_run_triggers_received.store(0);
     334              : 
     335            0 :       TLOG_DEBUG(TLVL_CIB_INFO) << get_name() << ": Sending start of run command with run number " << m_run_number.load();
     336            0 :       m_thread_.start_working_thread();
     337              : 
     338              :       // NFB: There was a potential race condition here: the socket in the working thread
     339              :       // needs to be in place before the CIB receives order to send data, or we risk having a connection
     340              :       // failure, if for some reason the CIB attempts to connect before the working thread is ready to receive.
     341            0 :       if (m_calibration_stream_enable)
     342              :       {
     343            0 :         std::stringstream run;
     344            0 :         run << "run" << m_run_number.load();
     345            0 :         set_calibration_stream(run.str());
     346            0 :       }
     347              :       int cnt = 0;
     348            0 :       while (!m_receiver_ready.load())
     349              :       {
     350            0 :         std::this_thread::sleep_for(std::chrono::milliseconds(10));
     351            0 :         cnt++;
     352            0 :         if (cnt > 50)
     353              :         {
     354              :           // the socket didn't get ready on time
     355            0 :           throw CIBModuleError(ERS_HERE, "Receiver socket timed out before becoming ready.");
     356              :         }
     357              :       }
     358            0 :       TLOG_DEBUG(TLVL_CIB_DEBUG) << get_name() << ": All ready to signal the CIB to start";
     359              : 
     360            0 :       nlohmann::json cmd;
     361            0 :       cmd["command"] = "start_run";
     362            0 :       cmd["run_number"] = m_run_number.load();
     363              : 
     364            0 :       if (send_message(cmd.dump()))
     365              :       {        
     366            0 :         m_is_running.store(true);
     367            0 :         TLOG() << get_name() << ": CIB run started successfully";
     368              :       }
     369              :       else
     370              :       {
     371            0 :         throw CIBCommunicationError(ERS_HERE, "Unable to start CIB run");
     372              :       }
     373              : 
     374            0 :       TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting do_start() method";
     375            0 :     }
     376              : 
     377              :     void
     378            0 :     CIBModule::do_stop(const CommandData_t & /*stopobj*/)
     379              :     {
     380            0 :       TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering do_stop() method";
     381            0 :       TLOG_DEBUG(TLVL_CIB_DEBUG) << get_name() << ": Sending stop run command" << std::endl;
     382              :       
     383              :       // Set stop flag BEFORE sending command so receiver thread knows to expect EOF
     384            0 :       m_stop_requested.store(true);
     385              : 
     386            0 :       if (send_message("{\"command\":\"stop_run\"}"))
     387              :       {
     388              :         // Response arrival means CIB has closed its data socket (see Handler::stop_run())
     389              :         // Receiver thread will detect EOF and exit cleanly
     390            0 :         std::this_thread::sleep_for(std::chrono::milliseconds(2));
     391            0 :         TLOG() << get_name() << ": CIB run stopped successfully";
     392            0 :         m_is_running.store(false);
     393              :       }
     394              :       else
     395              :       {
     396              :         // failed to sent the message to stop the run.
     397              :         // stop the collecting thread and then throw, since that
     398              :         // attempts a cleaner exit
     399            0 :         m_thread_.stop_working_thread();
     400              : 
     401            0 :         throw CIBCommunicationError(ERS_HERE, "Unable to stop CIB");
     402              :       }
     403              :       //
     404            0 :       m_thread_.stop_working_thread();
     405              : 
     406              :       // -- print the counters for local info
     407            0 :       TLOG() << get_name() << ": CIB trigger counter summary after run [" << m_run_number << "]:\n\n"
     408            0 :              << "IOLS trigger counter in run : " << m_num_run_triggers_received << "\n"
     409            0 :              << "Global IOLS trigger count   : " << m_num_total_triggers_received << std::endl;
     410              : 
     411              :       // reset counters
     412            0 :       m_num_run_triggers_received = 0;
     413            0 :       TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting do_stop() method";
     414            0 :     }
     415              : 
     416              :   // this is where most of the work is really done
     417              :   void
     418            0 :   CIBModule::do_hsi_work(std::atomic<bool>& running_flag)
     419              :   {
     420            0 :     TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering do_work() method";
     421              : 
     422            0 :     std::size_t n_bytes = 0 ;
     423            0 :     std::size_t n_words = 0 ;
     424            0 :     std::size_t prev_seq = 0 ;
     425            0 :     bool first = true;
     426              : 
     427              :     //connect to socket
     428              :     // should we keep everything local or under the class?
     429            0 :     boost::system::error_code ec;
     430              : 
     431              :     // check that this port is still available
     432            0 :     if (check_port_in_use(m_receiver_port))
     433              :     {
     434              :       // the problem is that, at this stage, if the port is in use, 
     435              :       // the CIB will be sending the data to the wrong place
     436              :       // there is little point in continuing
     437            0 :       std::ostringstream msg("");
     438            0 :       msg << "Listener port [" << m_receiver_port << "] is in use by someone else. Failing.";
     439            0 :       ers::error(CIBMessage(ERS_HERE, msg.str()));
     440            0 :     }
     441              : 
     442            0 :     TLOG_DEBUG(TLVL_CIB_INFO) << get_name() << ": Will set up the listener on port " << m_receiver_port << std::endl;
     443            0 :     boost::asio::ip::tcp::acceptor acceptor(m_receiver_ios,boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(),m_receiver_port ));
     444              : 
     445            0 :     acceptor.listen(boost::asio::ip::tcp::socket::max_connections, ec);
     446            0 :     if (ec)
     447              :     {
     448            0 :       std::ostringstream msg("");
     449            0 :       msg << get_name() << ": CIB got an error listening on socket: :" << m_receiver_port << " -- reason: '" << ec << "'";
     450            0 :       throw CIBCommunicationError(ERS_HERE,msg.str());
     451              :       return;
     452            0 :     }
     453              :     else
     454              :     {
     455            0 :       TLOG_DEBUG(TLVL_CIB_INFO) << get_name() << ": Waiting for an incoming connection on port " << m_receiver_port << std::endl;
     456              :     }
     457              : 
     458            0 :     std::future<void> accepting = async( std::launch::async, [&]{ acceptor.accept(m_receiver_socket,ec) ; } ) ;
     459            0 :     if (ec)
     460              :     {
     461            0 :       std::stringstream msg;
     462            0 :       msg << "Socket opening failed:: " << ec.message();
     463            0 :       ers::error(CIBCommunicationError(ERS_HERE,msg.str()));
     464            0 :       return;
     465            0 :     }
     466              :     //
     467            0 :     m_receiver_ready.store(true);
     468              : 
     469            0 :     while ( running_flag.load()  && !m_stop_requested.load() )
     470              :     {
     471            0 :       if ( accepting.wait_for( m_receiver_timeout ) == std::future_status::ready )
     472              :       {
     473              :         break ;
     474              :       }
     475              :     }
     476              : 
     477            0 :     TLOG() << get_name() <<  ": Connection received: start reading" << std::endl;
     478              : 
     479              :     // -- A couple of variables to help in the data parsing
     480              :     /**
     481              :      * The structure is a bit different than in the CTB
     482              :      * The TCP packet contains a single word (the trigger)
     483              :      * But other than that, everything are triggers
     484              :      */
     485              : 
     486            0 :     dunedaq::cib::daq::iols_tcp_packet_t tcp_packet;
     487            0 :     TLOG_DEBUG(TLVL_CIB_TRACE) << "Checking expected sizes: "
     488            0 :                                << " sizeof(iols_tcp_packet_t)=" << sizeof(dunedaq::cib::daq::iols_tcp_packet_t)
     489            0 :                                << " sizeof(iols_trigger_t)=" << sizeof(dunedaq::cib::daq::iols_trigger_t)
     490            0 :                                << " sizeof(tcp_header_t)=" << sizeof(dunedaq::cib::daq::tcp_header_t)
     491            0 :                                << std::endl;
     492              : 
     493              :     //boost::system::error_code receiving_error;
     494            0 :     bool connection_closed = false ;
     495              : 
     496            0 :     while (running_flag.load() && !m_stop_requested.load())
     497              :     {
     498            0 :       update_calibration_file();
     499              : 
     500            0 :       if ( ! read(m_receiver_socket, tcp_packet ) )
     501              :       {
     502            0 :         connection_closed = true ;
     503            0 :         break;
     504              :       }
     505              : 
     506            0 :       n_bytes = tcp_packet.header.packet_size ;
     507            0 :       n_words = n_bytes/sizeof(dunedaq::cib::daq::iols_trigger_t);
     508              : 
     509            0 :       if (n_words != 1)
     510              :       {
     511            0 :         std::ostringstream msg("");
     512            0 :         msg << "Received more than one IoLS trigger word at once! This should never happen. Got "
     513            0 :             << n_bytes << " (expected " << sizeof(dunedaq::cib::daq::iols_trigger_t) << ")";
     514            0 :         ers::warning(CIBMessage(ERS_HERE, msg.str()));
     515            0 :       }
     516              :       // the CIB only ships one word per packet....so this error should be impossible
     517              :       // check continuity of the sequence numbers
     518            0 :       if (first)
     519              :       {
     520              :         // first word being fetched. The sequence number should be zero
     521            0 :         if (tcp_packet.header.sequence_id != 0)
     522              :         {
     523            0 :           std::ostringstream msg("");
     524            0 :           msg << "Missing sequence. First word should have sequence number 0. Got " << static_cast<int>(tcp_packet.header.sequence_id );
     525            0 :           ers::warning(CIBMessage(ERS_HERE, msg.str()));
     526            0 :         }
     527              :         first = false;
     528              :       }
     529              :       else
     530              :       {
     531            0 :         bool failed = false;
     532              :         // in case it rolled over, compare to 255
     533            0 :         if (tcp_packet.header.sequence_id == 0)
     534              :         {
     535            0 :           if (prev_seq != 255)
     536              :           {
     537              :             failed = true;
     538              :           }
     539              :         }
     540              :         else
     541              :         {
     542            0 :           if (tcp_packet.header.sequence_id != (prev_seq+1))
     543              :           {
     544              :             failed = true;
     545              :           }
     546              :         }
     547              :         if (failed)
     548              :         {
     549            0 :           std::ostringstream msg("");
     550            0 :           msg << "Skipped CIB word sequence. Prev word " << prev_seq << " current word " << static_cast<int>(tcp_packet.header.sequence_id );
     551            0 :           ers::warning(CIBMessage(ERS_HERE, msg.str()));
     552            0 :         }
     553              :       }
     554            0 :       prev_seq = tcp_packet.header.sequence_id;
     555              : 
     556            0 :       update_buffer_counts(n_words);
     557              : 
     558              :       // temporarily print the trigger
     559              :       // I leave this here for debugging purposes, but don“t even keep it in the logs, since it is too verbose. We can always add it back if we need to debug something
     560              :       // TLOG_DEBUG(TLVL_CIB_TRACE) << "TRIGGER : ts " << tcp_packet.word.timestamp
     561              :       //                            << " pos_m1 " << util::get_m1(tcp_packet.word)
     562              :       //                            << " pos_m2 " << util::get_m2(tcp_packet.word)
     563              :       //                            << " pos_m3 " << util::get_m3(tcp_packet.word);
     564              : 
     565            0 :       if ( m_calibration_stream_enable )
     566              :       {
     567            0 :         m_calibration_file.write( reinterpret_cast<const char*>( & tcp_packet.word ), sizeof(tcp_packet.word) ) ; // NOLINT
     568            0 :         m_calibration_file.flush() ;
     569              :       } // word printing in calibration stream
     570              : 
     571            0 :       TLOG_DEBUG(TLVL_CIB_DEBUG) << get_name() << "Received IoLS trigger word!";
     572            0 :       ++m_num_total_triggers_received;
     573            0 :       ++m_num_run_triggers_received;
     574              : 
     575              : 
     576              :       // we do not need to know anything else
     577              :       // ideally, one could add other information such as the direction
     578              :       // this should be coming packed in the trigger word
     579              :       // note, however, that to reconstruct the trace direction we also would need the source position
     580              :       // and that we cannot afford to send, so we can just make it up out of the  IoLS system
     581              :       //
     582              :       // Send HSI data to a DLH
     583            0 :       std::array<uint32_t, 7> hsi_struct;
     584            0 :       hsi_struct[0] = (0x1 << 26)     |  // some random bit that could indicate the type of frame
     585            0 :                       (m_slot << 22)  |  // slot number
     586            0 :                       (m_crate << 12) |  // crate number
     587            0 :                       (m_det << 6)    |  // detector number
     588              :                       CIB_HSI_FRAME_VERSION; 
     589              :       // timestamps -  I like the explicit masking here to mistakes
     590            0 :       hsi_struct[1] = tcp_packet.word.timestamp & 0xFFFFFFFF;       // ts low
     591            0 :       hsi_struct[2] = (tcp_packet.word.timestamp >> 32) & 0xFFFFFFFF; // ts high
     592              : 
     593              :       // we shall use these 2 sets of 32 bits to define the periscope position
     594              :       // pos_m3 == linear stage
     595            0 :       hsi_struct[3] = dunedaq::cibmodules::util::get_m3(tcp_packet.word); // lower 32b 0
     596              :       // pos_m3 == RNN600
     597            0 :       hsi_struct[4] = dunedaq::cibmodules::util::get_m2(tcp_packet.word); // upper 32b
     598              :       /**
     599              :        * A note about the 5th entry
     600              :        * The trigger bit is actually mapped into a single bit, that is then remapped back
     601              :        * into an index
     602              :        */
     603            0 :       hsi_struct[5] = m_trigger_bit;            // trigger_map;
     604            0 :       hsi_struct[6] = m_num_run_triggers_received.load();    // m_generated_counter;
     605              : 
     606              :       // Same thing here. If something really bad happens, this code can be very useful, but we do not want to print this in the logs, unless strictly necessary, since it is too verbose. 
     607              :       // TLOG_DEBUG(TLVL_CIB_TRACE) << "CIB HSI Frame: "
     608              :       //                            << "0x" << std::hex << hsi_struct[0]
     609              :       //                            << ", 0x" << hsi_struct[1]
     610              :       //                            << ", 0x" << hsi_struct[2]
     611              :       //                            << ", 0x" << hsi_struct[3]
     612              :       //                            << ", 0x" << hsi_struct[4]
     613              :       //                            << ", 0x" << hsi_struct[5]
     614              :       //                            << ", 0x" << hsi_struct[6]
     615              :       //                            << std::dec << std::endl;
     616              : 
     617            0 :       if (!m_cib_hsi_data_sender)
     618              :       {
     619            0 :         std::ostringstream msg("");
     620            0 :         msg << "HSI Data Sender not properly configured! This will go down in flames.";
     621            0 :         ers::error(CIBCommunicationError(ERS_HERE, msg.str()));
     622              :         // we cannot continue
     623            0 :         break ;
     624            0 :       }
     625              : 
     626            0 :       send_raw_hsi_data(hsi_struct, m_cib_hsi_data_sender.get());
     627              : 
     628              :       // TODO Nuno Barros Apr-02-2024 : properly fill device id when someone explains me
     629              :       // how to get it
     630              :       // in fact, this may disappear in the future where the CIB is decoupled from 
     631              :       // the HSI infrastructure
     632            0 :       dfmessages::HSIEvent event(m_det,
     633              :                                  m_trigger_bit,
     634              :                                  tcp_packet.word.timestamp,
     635              :                                  m_num_run_triggers_received.load(),
     636            0 :                                  m_run_number);
     637              : 
     638            0 :       send_hsi_event(event);
     639            0 :      if (connection_closed)
     640              :       {
     641              :         break ;
     642              :       }
     643              :     }
     644              : 
     645              :     // Wait for stop signal (which only comes after CIB closed its sender socket)
     646            0 :     while (m_is_running.load())
     647              :     {
     648            0 :       std::this_thread::sleep_for(std::chrono::microseconds(100));
     649              :     }
     650              : 
     651            0 :     boost::system::error_code closing_error;
     652              :     // if the system is already in an error state
     653              :     // we should call for a socket shutdown to force
     654              :     // the connection to close
     655            0 :     if (m_error_state.load())
     656              :     {
     657              : 
     658            0 :       m_receiver_socket.shutdown(boost::asio::ip::tcp::socket::shutdown_send, closing_error);
     659              : 
     660            0 :       if (closing_error)
     661              :       {
     662            0 :         std::stringstream msg;
     663            0 :         msg << "Error in shutdown " << closing_error.message();
     664            0 :         ers::error(CIBCommunicationError(ERS_HERE, msg.str()));
     665            0 :       }
     666              :     }
     667              : 
     668            0 :     m_receiver_socket.close(closing_error) ;
     669              : 
     670            0 :     if ( closing_error )
     671              :     {
     672            0 :       std::stringstream msg;
     673            0 :       msg << "Socket closing failed:: " << closing_error.message();
     674            0 :       ers::error(CIBCommunicationError(ERS_HERE,msg.str()));
     675            0 :     }
     676              : 
     677            0 :     m_receiver_ready.store(false);
     678              : 
     679            0 :     TLOG_DEBUG(TLVL_CIB_INFO) << get_name() << ": End of do_work loop: stop receiving data from the CIB";
     680              : 
     681            0 :     TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting do_work() method";
     682            0 :   }
     683              : 
     684              :   template<typename T>
     685            0 :   bool CIBModule::read(boost::asio::ip::tcp::socket &socket, T &obj)
     686              :   {
     687              : 
     688            0 :     boost::system::error_code receiving_error;
     689            0 :     boost::asio::read( socket,
     690            0 :                        boost::asio::buffer( &obj, sizeof(T) ),
     691              :                        receiving_error ) ;
     692              : 
     693            0 :     if ( ! receiving_error )
     694              :     {
     695              :       return true ;
     696              :     }
     697              : 
     698            0 :     if ( receiving_error == boost::asio::error::eof)
     699              :     {
     700            0 :       if (m_stop_requested.load())
     701              :       {
     702            0 :         TLOG_DEBUG(TLVL_CIB_INFO) << get_name() << ": Socket closed after stop request." << std::endl;
     703              :       }
     704              :       else
     705              :       {
     706            0 :         std::string error_message = "Socket closed: " + receiving_error.message();
     707            0 :         ers::error(CIBCommunicationError(ERS_HERE, error_message));
     708            0 :       }
     709              :       
     710            0 :       return false ;
     711              :     }
     712              : 
     713              :     if ( receiving_error )
     714              :     {
     715            0 :       std::string error_message = "Read failure: " + receiving_error.message();
     716            0 :       ers::error(CIBCommunicationError(ERS_HERE, error_message));
     717              :       return false ;
     718            0 :     }
     719              : 
     720              :     return true ;
     721              :   }
     722              : 
     723            0 :   void CIBModule::init_calibration_file()
     724              :   {
     725            0 :     if ( ! m_calibration_stream_enable )
     726              :     {
     727            0 :       return ;
     728              :     }
     729            0 :     char file_name[200] = "" ;
     730            0 :     time_t rawtime;
     731            0 :     time( & rawtime ) ;
     732            0 :     struct tm local_tm;
     733            0 :     struct tm * timeinfo = localtime_r( & rawtime , &local_tm) ;
     734            0 :     strftime( file_name, sizeof(file_name), "%F_%H.%M.%S.iols.calib", timeinfo );
     735            0 :     std::string global_name = m_calibration_dir + m_calibration_prefix + file_name ;
     736            0 :     m_calibration_file.open( global_name, std::ofstream::binary ) ;
     737            0 :     if ( ! m_calibration_file.is_open() )
     738              :     {
     739            0 :       std::ostringstream msg ;
     740            0 :       msg << get_name() << ": Unable to open calibration stream file: " << global_name ;
     741            0 :       ers::warning(CIBMessage(ERS_HERE, msg.str()));
     742            0 :       m_calibration_stream_enable = false ;
     743            0 :       return ;
     744            0 :     }
     745            0 :     m_last_calibration_file_update = std::chrono::steady_clock::now();
     746            0 :     TLOG_DEBUG(TLVL_CIB_INFO) << get_name() << ": New Calibration Stream file: " << global_name << std::endl ;
     747            0 :   }
     748              : 
     749            0 :   void CIBModule::update_calibration_file()
     750              :   {
     751              : 
     752            0 :     if ( ! m_calibration_stream_enable )
     753              :     {
     754            0 :       return ;
     755              :     }
     756              : 
     757            0 :     std::chrono::steady_clock::time_point check_point = std::chrono::steady_clock::now();
     758              : 
     759            0 :     if ( check_point - m_last_calibration_file_update < m_calibration_file_interval )
     760              :     {
     761              :       return ;
     762              :     }
     763              : 
     764            0 :     m_calibration_file.close() ;
     765            0 :     init_calibration_file() ;
     766              : 
     767              :   }
     768              : 
     769            0 :   bool CIBModule::set_calibration_stream( const std::string & prefix )
     770              :   {
     771              : 
     772            0 :     if ( m_calibration_dir.back() != '/' )
     773              :     {
     774            0 :       m_calibration_dir += '/' ;
     775              :     }
     776            0 :     m_calibration_prefix = prefix ;
     777            0 :     if ( prefix.size() > 0 )
     778              :     {
     779            0 :       m_calibration_prefix += '_' ;
     780              :     }
     781              :     // possibly we could check here if the directory is valid and  writable before assuming the calibration stream is valid
     782            0 :     return true ;
     783              :   }
     784              : 
     785            0 :   void CIBModule::send_config( const std::string & config ) {
     786              : 
     787            0 :     TLOG_DEBUG(TLVL_CIB_INFO) << get_name() << ": Sending config" << std::endl;
     788              : 
     789              :     // structure the message to have a common management structure
     790            0 :     nlohmann::json conf;
     791            0 :     conf["command"] = "config";
     792            0 :     conf["config"] = nlohmann::json::parse(config);
     793              : 
     794            0 :     TLOG_DEBUG(TLVL_CIB_DEBUG) << get_name() << ": Shipped config : " << conf.dump() << std::endl;
     795              : 
     796            0 :     if ( send_message( conf.dump() ) )
     797              :     {
     798            0 :       m_is_configured.store(true) ;
     799              :     }
     800              :     else
     801              :     {
     802            0 :       throw CIBCommunicationError(ERS_HERE, "Unable to configure CIB");
     803              :     }
     804            0 :   }
     805              : 
     806            0 :   bool CIBModule::send_message( const std::string & msg )
     807              :   {
     808              : 
     809              :     //add error options
     810            0 :     boost::system::error_code error;
     811            0 :     TLOG_DEBUG(1) << get_name() << ": Sending message: " << msg;
     812              : 
     813            0 :     m_num_control_messages_sent++;
     814              : 
     815            0 :     boost::asio::write( m_control_socket, boost::asio::buffer( msg ), error ) ;
     816            0 :     boost::array<char, 1024> reply_buf{" "} ;
     817            0 :     m_control_socket.read_some( boost::asio::buffer(reply_buf ), error);
     818            0 :     std::stringstream raw_answer( std::string(reply_buf .begin(), reply_buf .end() ) ) ;
     819            0 :     TLOG_DEBUG(1) << get_name() << ": Unformatted answer: " << raw_answer.str();
     820              : 
     821            0 :     nlohmann::json answer ;
     822            0 :     raw_answer >> answer ;
     823            0 :     nlohmann::json & messages = answer["feedback"] ;
     824            0 :     TLOG_DEBUG(1) << get_name() << ": Received messages: " << messages.size();
     825              : 
     826            0 :     bool ret = true ;
     827            0 :     for (nlohmann::json::size_type i = 0; i != messages.size(); ++i )
     828              :     {
     829              : 
     830            0 :       m_num_control_responses_received++;
     831              : 
     832            0 :       std::string type = messages[i]["type"].dump() ;
     833            0 :       if ( type.find("error") != std::string::npos || type.find("Error") != std::string::npos || type.find("ERROR") != std::string::npos )
     834              :       {
     835            0 :         ers::error(CIBMessage(ERS_HERE, messages[i]["message"].dump()));
     836            0 :         ret = false ;
     837              :       }
     838            0 :       else if ( type.find("warning") != std::string::npos || type.find("Warning") != std::string::npos || type.find("WARNING") != std::string::npos )
     839              :       {
     840            0 :         ers::warning(CIBMessage(ERS_HERE, messages[i]["message"].dump()));
     841              :       }
     842            0 :       else if ( type.find("info") != std::string::npos || type.find("Info") != std::string::npos || type.find("INFO") != std::string::npos)
     843              :       {
     844            0 :         TLOG() << "Message from the CIB : " << messages[i]["message"].dump();
     845              :       }
     846              :       else
     847              :       {
     848            0 :         std::stringstream blob;
     849            0 :         blob << messages[i] ;
     850            0 :         TLOG() << get_name() << ": Unformatted feedback from the board: " << blob.str();
     851            0 :       }
     852            0 :     }
     853            0 :     return ret;
     854            0 :   }
     855              : 
     856              :   void
     857            0 :   CIBModule::update_buffer_counts(uint new_count) // NOLINT(build/unsigned)
     858              :   {
     859            0 :     std::unique_lock mon_data_lock(m_buffer_counts_mutex);
     860            0 :     if (m_buffer_counts.size() > 1000)
     861              :     {
     862            0 :       m_buffer_counts.pop_front();
     863              :     }
     864            0 :     m_buffer_counts.push_back(new_count);
     865            0 :   }
     866              : 
     867              :   double
     868            0 :   CIBModule::read_average_buffer_counts()
     869              :   {
     870            0 :     std::unique_lock mon_data_lock(m_buffer_counts_mutex);
     871              : 
     872            0 :     double total_counts;
     873            0 :     uint32_t number_of_counts; // NOLINT(build/unsigned)
     874              : 
     875            0 :     total_counts = 0;
     876            0 :     number_of_counts = m_buffer_counts.size();
     877              : 
     878            0 :     if (number_of_counts) {
     879            0 :       for (uint i = 0; i < number_of_counts; ++i) { // NOLINT(build/unsigned)
     880            0 :         total_counts = total_counts + m_buffer_counts.at(i);
     881              :       }
     882            0 :       return total_counts / number_of_counts;
     883              :     } else {
     884              :       return 0;
     885              :     }
     886            0 :   }
     887              : 
     888            0 :   void CIBModule::generate_opmon_data()
     889              :   {
     890            0 :     dunedaq::cibmodules::opmon::CIBModuleInfo module_info;
     891              : 
     892            0 :     module_info.set_num_control_messages_sent(m_num_control_messages_sent.load());
     893            0 :     module_info.set_num_control_responses_received(m_num_control_responses_received.load());
     894            0 :     module_info.set_hardware_running(m_is_running.load());
     895            0 :     module_info.set_hardware_configured(m_is_configured.load());
     896            0 :     module_info.set_num_total_triggers_received(m_num_total_triggers_received.load());
     897            0 :     module_info.set_num_run_triggers_received(m_num_run_triggers_received.load());
     898              : 
     899              :     // -- need to define these counters (and set the code to update them
     900            0 :     module_info.set_sent_hsi_events_counter(m_sent_counter.load());
     901            0 :     module_info.set_failed_to_send_hsi_events_counter(m_failed_to_send_counter.load());
     902              :     //module_info.set_last_sent_timestamp(m_last_sent_timestamp.load());
     903            0 :     module_info.set_average_buffer_occupancy(read_average_buffer_counts());
     904              : 
     905            0 :     publish(std::move(module_info));
     906              : 
     907              :     // should we also publish specific trigger info? 
     908              :     // doesn't seem necessary at this time
     909              : 
     910            0 :   }
     911              : 
     912            0 :   bool CIBModule::check_port_in_use(unsigned short port)
     913              :   {
     914            0 :     using namespace boost::asio;
     915            0 :     using ip::tcp;
     916              : 
     917            0 :     io_service svc;
     918            0 :     tcp::acceptor a(svc);
     919              : 
     920            0 :     boost::system::error_code ec;
     921            0 :     a.open(tcp::v4(), ec) || a.bind({ tcp::v4(), port }, ec);
     922            0 :     a.close();
     923            0 :     return ec == error::address_in_use;
     924              : 
     925            0 :   }
     926              : } // namespace dunedaq::cibmodules
     927              : 
     928            0 : DEFINE_DUNE_DAQ_MODULE(dunedaq::cibmodules::CIBModule)
        

Generated by: LCOV version 2.0-1