LCOV - code coverage report
Current view: top level - ctbmodules/plugins - CTBModule.cpp (source / functions) Coverage Total Hit
Test: code.result Lines: 0.0 % 468 0
Test Date: 2025-12-21 13:07:08 Functions: 0.0 % 61 0

            Line data    Source code
       1              : /**
       2              :  * @file CTBModule.cpp CTBModule class
       3              :  * implementation
       4              :  *
       5              :  * This is part of the DUNE DAQ Software Suite, copyright 2020.
       6              :  * Licensing/copyright details are in the COPYING file that you should have
       7              :  * received with this code.
       8              :  */
       9              : 
      10              : #include "confmodel/GeoId.hpp"
      11              : 
      12              : #include "appmodel/CTBConf.hpp"
      13              : #include "appmodel/CTBCalibrationStream.hpp"
      14              : #include "appmodel/CTBoardConf.hpp"
      15              : #include "appmodel/CTBMisc.hpp"
      16              : #include "appmodel/CTBRandomTrigger.hpp"
      17              : #include "appmodel/CTBHLT.hpp"
      18              : #include "appmodel/CTBLLT.hpp"
      19              : #include "appmodel/CTBCountLLT.hpp"
      20              : #include "appmodel/CTBSockets.hpp"
      21              : #include "appmodel/CTBReceiverSocket.hpp"
      22              : 
      23              : #include "CTBModule.hpp"
      24              : #include "CTBModuleIssues.hpp"
      25              : 
      26              : #include "iomanager/IOManager.hpp"
      27              : #include "logging/Logging.hpp"
      28              : 
      29              : #include <chrono>
      30              : #include <string>
      31              : #include <thread>
      32              : #include <vector>
      33              : #include <memory>
      34              : #include <map>
      35              : #include <queue>
      36              : #include <utility>
      37              : 
      38              : /**
      39              :  * @brief Name used by TRACE TLOG calls from this source file
      40              :  */
      41              : #define TRACE_NAME "CTBModule" // NOLINT
      42              : enum
      43              : {
      44              :   TLVL_ENTER_EXIT_METHODS = 10,
      45              :   TLVL_CTB_MODULE = 15
      46              : };
      47              :   
      48              : constexpr uint16_t CTB_HSI_FRAME_VERSION = 0x1;  // NOLINT
      49              : 
      50              : using namespace dunedaq;
      51              : using namespace ctbmodules;
      52              : 
      53            0 : CTBModule::CTBModule(const std::string& name)
      54              :   : hsilibs::HSIEventSender(name)
      55            0 :   , m_is_running(false)
      56            0 :   , m_stop_requested(false)
      57            0 :   , m_is_configured(false)
      58            0 :   , m_error_state(false)
      59            0 :   , m_total_hlt_counter(0)
      60            0 :   , m_ts_word_counter(0) 
      61            0 :   , m_hlt_trigger_counter()
      62            0 :   , m_llt_trigger_counter()
      63            0 :   , m_control_ios()
      64            0 :   , m_receiver_ios()
      65            0 :   , m_control_socket(m_control_ios)
      66            0 :   , m_receiver_socket(m_receiver_ios)
      67            0 :   , m_thread_(std::bind(&CTBModule::do_hsi_work, this, std::placeholders::_1))
      68            0 :   , m_has_calibration_stream( false )
      69            0 :   , m_run_HLT_counter(0)
      70            0 :   , m_run_LLT_counter(0)
      71            0 :   , m_run_channel_status_counter(0)
      72            0 :   , m_num_control_messages_sent(0)
      73            0 :   , m_num_control_responses_received(0)
      74            0 :   , m_last_readout_hlt_timestamp(0)
      75              : {
      76            0 :   register_command("conf", &CTBModule::do_configure);
      77            0 :   register_command("start", &CTBModule::do_start);
      78            0 :   register_command("stop", &CTBModule::do_stop);
      79            0 : }
      80              : 
      81            0 : CTBModule::~CTBModule(){
      82              :     //check if running. and in case stop the run
      83            0 :   if(m_is_running){
      84            0 :     const CommandData_t stopobj;
      85            0 :     do_stop(stopobj);
      86            0 :   } 
      87            0 :   m_control_socket.close() ;
      88              : 
      89            0 : }
      90              : 
      91              : void
      92            0 : CTBModule::init(std::shared_ptr<appfwk::ConfigurationManager> cfgMgr)
      93              : {
      94            0 :   TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering init() method";
      95              : 
      96            0 :   HSIEventSender::init(cfgMgr);
      97              : 
      98            0 :   m_cfg = cfgMgr;
      99              :   
     100            0 :   auto mdal = cfgMgr->get_dal<appmodel::CTBModule>(get_name()); 
     101              : 
     102            0 :   if (! mdal) {
     103            0 :     throw ctbmodules::CTBConfigFailure(ERS_HERE, "Missing Module configuration for " + get_name());
     104              :   }
     105              : 
     106            0 :   m_module = mdal;
     107              : 
     108              :   // setting up connections
     109            0 :   auto iom = iomanager::IOManager::get();
     110              : 
     111            0 :   using hsi_frame_t = dunedaq::hsilibs::HSI_FRAME_STRUCT;
     112            0 :   for ( auto con : m_module->get_outputs() ) {
     113            0 :     if ( con->get_data_type() == datatype_to_string<hsi_frame_t>() ) {
     114            0 :       if ( con->UID().find("HLT")!=std::string::npos
     115            0 :            || con->UID().find("hlt")!=std::string::npos ) {
     116            0 :         m_hlt_hsi_data_sender = iom->get_sender<hsi_frame_t>(con->UID());
     117              :       }
     118            0 :       if ( con->UID().find("LLT")!=std::string::npos
     119            0 :            || con->UID().find("llt")!=std::string::npos ) {
     120            0 :         m_llt_hsi_data_sender = iom->get_sender<hsi_frame_t>(con->UID());
     121              :       } 
     122              :     } // if data type is HSI Frame
     123              :   } // loop over outputs
     124              :     
     125            0 :   TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting init() method";
     126            0 : }
     127              : 
     128              : void
     129            0 : CTBModule::do_configure(const CommandData_t&)
     130              : {
     131              : 
     132            0 :   TLOG_DEBUG(0) << get_name() << ": Configuring CTB";
     133              : 
     134            0 :   auto conf = m_module ->get_configuration();
     135              : 
     136            0 :   m_receiver_port = m_module->get_board()->get_sockets()->get_receiver()->get_port();
     137            0 :   m_timeout = std::chrono::milliseconds( conf->get_connection_timeout_ms() ) ;
     138              : 
     139            0 :   auto hostname = conf->get_hostname();
     140            0 :   TLOG() << get_name() << ": Board receiver network location "
     141            0 :          << hostname << ':' << m_receiver_port << std::endl;
     142              : 
     143              :   // Initialise monitoring variables
     144            0 :   m_num_control_messages_sent = 0;
     145            0 :   m_num_control_responses_received = 0;
     146            0 :   m_ts_word_counter = 0;
     147              : 
     148            0 :   std::map<std::string, size_t> id_to_idx;
     149            0 :   for(size_t i = 0; i < m_hlt_range; i++) id_to_idx["HLT_" + std::to_string(i)] = i;
     150            0 :   for(size_t i = 0; i < m_llt_range; i++) id_to_idx["LLT_" + std::to_string(i)] = i;
     151              : 
     152              :   // configuring the board   
     153            0 :   auto board = m_module->get_board();
     154            0 :   auto geo_id = board->get_geo_id();
     155            0 :   m_det = geo_id->get_detector_id();
     156            0 :   m_crate = geo_id->get_crate_id();
     157            0 :   m_slot = geo_id->get_slot_id();
     158              :   
     159            0 :   const auto & misc = board->get_misc();
     160            0 :   auto session = m_cfg->get_session();
     161              :   // HLTs
     162              :   // 0th HLT is random trigger that's not in HLT array
     163            0 :   if (! misc->get_randomtrigger_1()->is_disabled( *session ) ) m_hlt_trigger_counter[0] = 0;
     164              : 
     165            0 :   auto hlts = board->get_HLTs();
     166            0 :   for (const auto& hlt : hlts) { if (! hlt->is_disabled(*session) ) m_hlt_trigger_counter[id_to_idx[hlt->UID()]] = 0; }
     167              : 
     168              :   // LLTs: Beam and CRT
     169              :   // 0th LLT is random trigger that's not in HLT array
     170            0 :   if (! misc->get_randomtrigger_2()->is_disabled( *session ) ) m_llt_trigger_counter[0] = 0;
     171              : 
     172            0 :   auto beam_llts = board->get_beam_LLTs();
     173            0 :   for (const auto& llt : beam_llts) { if (! llt->is_disabled(*session)) m_llt_trigger_counter[id_to_idx[llt->UID()]] = 0; }
     174              : 
     175            0 :   auto crt_llts = board->get_CRT_LLTs();
     176            0 :   for (const auto& llt : crt_llts) { if (! llt->is_disabled(*session)) m_llt_trigger_counter[id_to_idx[llt->UID()]] = 0; }
     177              : 
     178              :   // network connection to ctb hardware control
     179            0 :   boost::asio::ip::tcp::resolver resolver( m_control_ios ); 
     180            0 :   boost::asio::ip::tcp::resolver::query query( hostname,
     181            0 :                                                std::to_string(conf->get_control_connection_port()) ) ; //"np04-ctb-1", 8991
     182            0 :   boost::asio::ip::tcp::resolver::iterator iter = resolver.resolve(query) ;
     183              : 
     184            0 :   m_endpoint = iter->endpoint(); 
     185            0 :   m_control_socket.connect( m_endpoint );
     186              : 
     187              :   // if necessary, set the calibration stream
     188            0 :   auto stream_conf = conf->get_calibration_stream();
     189            0 :   if ( stream_conf ) {
     190            0 :     m_has_calibration_stream = true ; 
     191            0 :     m_calibration_dir = stream_conf->get_directory();
     192            0 :     m_calibration_file_interval = std::chrono::duration_cast<decltype(m_calibration_file_interval)>(std::chrono::seconds(stream_conf->get_update_period_s()));
     193            0 :                                                        ; 
     194              :   }
     195              : 
     196              :   // at this point we have to find the hostname to tell the board what to get
     197            0 :   boost::asio::ip::tcp::resolver::query query_for_local(boost::asio::ip::host_name(), "");
     198            0 :   iter = resolver.resolve(query_for_local);
     199              :   
     200              :   // create the json string
     201            0 :   auto json_conf = m_module->get_board()->get_ctb_json(*session, iter->endpoint().address().to_string());
     202              : 
     203            0 :   auto json_dump = json_conf.dump();
     204              : 
     205            0 :   TLOG() << "Sending configuration: " << json_dump;
     206              : 
     207            0 :   send_config(json_dump);
     208            0 : }
     209              : 
     210              : void
     211            0 : CTBModule::do_start(const CommandData_t& startobj)
     212              : {
     213              : 
     214            0 :   TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering do_start() method";
     215              : 
     216              :   // Set this to false early so it doesn't interfere with the start
     217            0 :   m_stop_requested.store(false);
     218              : 
     219            0 :   m_run_number.store(startobj.at("run").get<daqdataformats::run_number_t>());
     220              : 
     221            0 :   m_total_hlt_counter.store(0);
     222              : 
     223            0 :   TLOG_DEBUG(0) << get_name() << ": Sending start of run command";
     224            0 :   m_thread_.start_working_thread();
     225              : 
     226            0 :   if ( m_has_calibration_stream ) {
     227            0 :     std::stringstream run;
     228            0 :     run << "run" << m_run_number.load();
     229            0 :     SetCalibrationStream(run.str()) ;
     230            0 :   }
     231              : 
     232            0 :   if ( send_message( "{\"command\":\"StartRun\"}" )  ) {
     233            0 :     m_is_running.store(true);
     234            0 :     TLOG_DEBUG(1) << get_name() << ": successfully started";
     235              :   } else{
     236            0 :     throw CTBCommunicationError(ERS_HERE, "Unable to start CTB");
     237              :   }
     238              : 
     239            0 :   TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting do_start() method";
     240            0 : }
     241              : 
     242              : void
     243            0 : CTBModule::do_stop(const CommandData_t& /*stopobj*/)
     244              : {
     245              : 
     246            0 :   TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering do_stop() method";
     247              : 
     248            0 :   TLOG_DEBUG(0) << get_name() << ": Sending stop run command" << std::endl;
     249              :     
     250              :   // Give the do_work thread a chance to stop before stopping the CTB,
     251              :   // otherwise we end up reading from an empty buffer
     252            0 :   m_stop_requested.store(true);
     253            0 :   std::this_thread::sleep_for(std::chrono::milliseconds(2));
     254              : 
     255            0 :   if(send_message( "{\"command\":\"StopRun\"}" ) ){
     256            0 :     TLOG_DEBUG(1) << get_name() << ": successfully stopped";
     257            0 :     m_is_running.store( false ) ;
     258              :   } else {
     259            0 :     throw CTBCommunicationError(ERS_HERE, "Unable to stop CTB");
     260              :   } 
     261            0 :   m_thread_.stop_working_thread();
     262              : 
     263            0 :   m_run_HLT_counter=0;
     264            0 :   m_run_LLT_counter=0;
     265            0 :   m_run_channel_status_counter=0;
     266              : 
     267            0 :   TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting do_stop() method";
     268            0 : }
     269              : 
     270              : void
     271            0 : CTBModule::do_hsi_work(std::atomic<bool>& running_flag)
     272              : {
     273            0 :   TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering do_work() method";
     274              : 
     275            0 :   std::size_t n_bytes = 0 ;
     276            0 :   std::size_t n_words = 0 ;
     277              : 
     278            0 :   const size_t header_size = sizeof( content::tcp_header_t ) ;
     279            0 :   const size_t word_size = content::word::word_t::size_bytes ;
     280              : 
     281            0 :   TLOG_DEBUG(TLVL_CTB_MODULE) << get_name() <<  ": Header size: " << header_size << std::endl << "Word size: " << word_size << std::endl;
     282              : 
     283              :   //connect to socket
     284            0 :   boost::asio::ip::tcp::acceptor acceptor(m_receiver_ios, boost::asio::ip::tcp::endpoint( boost::asio::ip::tcp::v4(), m_receiver_port ) );
     285            0 :   TLOG_DEBUG(0) << get_name() << ": Waiting for an incoming connection on port " << m_receiver_port << std::endl;
     286              : 
     287            0 :   std::future<void> accepting = async( std::launch::async, [&]{ acceptor.accept(m_receiver_socket) ; } ) ;
     288              : 
     289            0 :   while ( running_flag.load() && !m_stop_requested.load() ) {
     290            0 :     if ( accepting.wait_for( m_timeout ) == std::future_status::ready ){
     291              :       break ;
     292              :     }
     293              :   }
     294              : 
     295            0 :   TLOG_DEBUG(0) << get_name() <<  ": Connection received: start reading" << std::endl;
     296              : 
     297            0 :   content::tcp_header_t head ;
     298            0 :   head.packet_size = 0;
     299            0 :   content::word::word_t temp_word ;
     300            0 :   bool connection_closed = false ;
     301            0 :   uint64_t ch_stat_beam, ch_stat_crt, ch_stat_pds;  // NOLINT
     302            0 :   uint64_t prev_timestamp = 0;  // NOLINT
     303            0 :   ts_payload prev_hlt, prev_llt, prev_ch_stat;
     304            0 :   ts_payload curr_hlt, curr_llt, curr_ch_stat;
     305              :   // buffers for word matching. buf_a are the trigger words, buf_b are corresponding payloads
     306            0 :   std::queue<content::word::trigger_t> match_buf_a_hlts, match_buf_a_llts;
     307            0 :   std::queue<ts_payload> match_buf_b_llts, match_buf_b_chstatus;
     308              : 
     309            0 :   while (running_flag.load() && !m_stop_requested.load()) {
     310              : 
     311            0 :     update_calibration_file();
     312              : 
     313            0 :     if ( ! read( head ) ) {
     314            0 :       connection_closed = true ;
     315              :       break;
     316              :     }
     317              : 
     318            0 :     n_bytes = head.packet_size ;
     319              :     // extract n_words
     320              : 
     321            0 :     n_words = n_bytes / word_size ;
     322              :     // read n words as requested from the header
     323              :     
     324            0 :     update_buffer_counts(n_words);
     325              : 
     326            0 :     for ( unsigned int i = 0 ; i < n_words ; ++i ) {
     327              :       
     328            0 :       if (!running_flag.load() || m_stop_requested.load()) {
     329              :         break;
     330              :       }
     331              : 
     332              :       //read a word
     333            0 :       if ( ! read( temp_word ) ) {
     334              :         connection_closed = true ;
     335              :         break ;
     336              :       }
     337              :       // put it in the calibration stream
     338            0 :       if ( m_has_calibration_stream ) {
     339            0 :         m_calibration_file.write( reinterpret_cast<const char*>( & temp_word ), word_size ) ;   // NOLINT
     340            0 :         m_calibration_file.flush() ;
     341              :       }          // word printing in calibration stream
     342              :       
     343              :       //check if it is a TS word and increment the counter
     344            0 :       if ( IsTSWord( temp_word ) ) {
     345            0 :         ++m_ts_word_counter;
     346            0 :         TLOG_DEBUG(9) << "Received timestamp word! TS: "+temp_word.timestamp;
     347            0 :         prev_timestamp = temp_word.timestamp;
     348            0 :       } else if ( IsFeedbackWord( temp_word ) ) {
     349            0 :         m_error_state.store( true ) ;
     350            0 :         content::word::feedback_t * feedback = reinterpret_cast<content::word::feedback_t*>( & temp_word ) ;  // NOLINT
     351            0 :         TLOG_DEBUG(7) << "Received feedback word!";
     352              : 
     353            0 :         TLOG_DEBUG(8) << get_name() << ": Feedback word: " << std::endl
     354            0 :                                                   << std::hex 
     355            0 :                                                   << " \t Type -> " << feedback -> word_type << std::endl 
     356            0 :                                                   << " \t TS -> " << feedback -> timestamp << std::endl
     357            0 :                                                   << " \t Code -> " << feedback -> code << std::endl
     358            0 :                                                   << " \t Source -> " << feedback -> source << std::endl
     359            0 :                                                   << " \t Padding -> " << feedback -> padding << std::dec << std::endl ;
     360            0 :       } else if (temp_word.word_type == content::word::t_gt) {
     361            0 :         TLOG_DEBUG(3) << "Received HLT word! TS: " + temp_word.timestamp;
     362            0 :         content::word::trigger_t * hlt_word = reinterpret_cast<content::word::trigger_t*>( & temp_word );   //NOLINT
     363            0 :         curr_hlt = {hlt_word->timestamp, (hlt_word->trigger_word & 0x1FFFFFFFFFFFFFFF)};
     364            0 :         if (check_repeated_word(curr_hlt, prev_hlt, temp_word.word_type)) continue;
     365            0 :         match_buf_a_hlts.push(*hlt_word);
     366              :         // Count the total HLTs and each specific one
     367            0 :         ++m_run_HLT_counter;
     368            0 :         ++m_total_hlt_counter;
     369            0 :         for (auto &hlt : m_hlt_trigger_counter) {
     370            0 :           if( (hlt_word->trigger_word >> hlt.first) & 0x1 )
     371            0 :             ++hlt.second;
     372              :         }
     373            0 :         m_last_readout_hlt_timestamp = temp_word.timestamp;
     374            0 :         prev_hlt = curr_hlt;
     375            0 :       } else if (temp_word.word_type == content::word::t_lt)  {
     376            0 :         TLOG_DEBUG(5) << "Received LLT word! TS: " + temp_word.timestamp;
     377            0 :         content::word::trigger_t * llt_word = reinterpret_cast<content::word::trigger_t*>( & temp_word ) ;   //NOLINT
     378            0 :         curr_llt = {llt_word->timestamp, (llt_word->trigger_word & 0xFFFFFFFF)};
     379            0 :         if (check_repeated_word(curr_llt, prev_llt, temp_word.word_type)) continue;
     380            0 :         match_buf_a_llts.push(*llt_word);
     381            0 :         match_buf_b_llts.push(curr_llt);
     382              : 
     383            0 :         ++m_run_LLT_counter;
     384            0 :         for (auto &llt : m_llt_trigger_counter) {
     385            0 :           if( (llt_word->trigger_word >> llt.first) & 0x1 )
     386            0 :             ++llt.second;
     387              :         }
     388            0 :         prev_llt = curr_llt;
     389            0 :       } else if (temp_word.word_type == content::word::t_ch) {
     390              : 
     391            0 :         content::word::ch_status_t * ch_stat_word = reinterpret_cast<content::word::ch_status_t*>( & temp_word ) ;  // NOLINT 
     392              :         // The channel status only has 60b TS so complete the upper 4b from the TS Word. (fyi 60b rolls over >500yr @ 62.5MHz) 
     393            0 :         uint64_t corrected_ts = ((prev_timestamp & 0xF000000000000000) | ch_stat_word->timestamp);  // NOLINT
     394            0 :         TLOG_DEBUG(6) << "Received Channel Status word! TS: " + corrected_ts;
     395            0 :         ch_stat_beam = ch_stat_word->get_beam();
     396            0 :         ch_stat_crt  = ch_stat_word->get_crt();
     397            0 :         ch_stat_pds  = ch_stat_word->get_pds();
     398            0 :         curr_ch_stat = {
     399              :             corrected_ts, 
     400            0 :             ((ch_stat_pds << 48) | (ch_stat_crt << 16) | ch_stat_beam)
     401            0 :         };
     402            0 :         if (check_repeated_word(curr_ch_stat, prev_ch_stat, temp_word.word_type)) continue;
     403            0 :         match_buf_b_chstatus.push(curr_ch_stat);
     404            0 :         prev_ch_stat = curr_ch_stat;
     405              : 
     406            0 :         ++m_run_channel_status_counter;
     407              :       }  // else if on word types
     408              :       // do matching
     409            0 :       match_between_buffers(match_buf_a_hlts, match_buf_b_llts, 
     410              :           prev_hlt.first, content::word::word_type::t_gt);
     411            0 :       match_between_buffers(match_buf_a_llts, match_buf_b_chstatus, 
     412              :           prev_llt.first, content::word::word_type::t_lt);
     413              : 
     414              :     } // n_words loop
     415              : 
     416            0 :     if ( connection_closed ){
     417              :       break ;
     418              :     }
     419              :   }
     420              : 
     421              :   // Make sure CTB run stops before closing socket
     422            0 :   while ( m_is_running.load() ) {
     423            0 :     std::this_thread::sleep_for(std::chrono::microseconds(100));
     424              :   } 
     425              : 
     426            0 :   boost::system::error_code closing_error;
     427              : 
     428            0 :   if ( m_error_state.load() ) {
     429              : 
     430            0 :     m_receiver_socket.shutdown(boost::asio::ip::tcp::socket::shutdown_send, closing_error);
     431              : 
     432            0 :     if ( closing_error ) {
     433            0 :       std::stringstream msg;
     434            0 :       msg << "Error in shutdown " << closing_error.message();
     435            0 :       ers::error(CTBCommunicationError(ERS_HERE,msg.str())) ;
     436            0 :     }
     437              : 
     438              :   }
     439              :   
     440            0 :   m_receiver_socket.close(closing_error) ;
     441              : 
     442            0 :   if ( closing_error ) {
     443            0 :     std::stringstream msg;
     444            0 :     msg << "Socket closing failed:: " << closing_error.message();
     445            0 :     ers::error(CTBCommunicationError(ERS_HERE,msg.str()));
     446            0 :   }
     447              : 
     448              : 
     449            0 :   TLOG_DEBUG(TLVL_CTB_MODULE) << get_name() << ": End of do_work loop: stop receiving data from the CTB";
     450              :   
     451            0 :   TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting do_work() method";
     452              : 
     453            0 : } // NOLINT
     454              : 
     455            0 : bool CTBModule::check_repeated_word(ts_payload& curr_word, ts_payload& prev_word, uint64_t wtype){  // NOLINT
     456            0 :   if (curr_word.first == prev_word.first) { // words with repeated timestamp. Not good!
     457            0 :     std::stringstream msg;
     458            0 :     msg << "Multiple words have the same timestamp, Using the first one. Word type: ";
     459            0 :     if (wtype == content::word::word_type::t_gt) msg << "HLT";
     460            0 :     else if (wtype == content::word::word_type::t_lt) msg << "LLT";
     461            0 :     else if (wtype == content::word::word_type::t_ch) msg << "Channel Status";
     462            0 :     else msg << wtype;
     463            0 :     msg << ", TS: "<< curr_word.first << ".";
     464            0 :     if (curr_word.second != prev_word.second) { // not only do we have repeated timestamp, they have different payload...
     465            0 :       msg << " Different payload!! Previous payload: 0x" << std::hex << prev_word.second
     466            0 :           << " Current payload: 0x" << curr_word.second;
     467            0 :       ers::warning(CTBRepeatedTimestampWarning(ERS_HERE, msg.str()));
     468              :     } else {
     469            0 :       msg << " Both have payload 0x" << std::hex << prev_word.second;
     470            0 :       TLOG() << msg.str();
     471              :     }
     472            0 :     return true;
     473            0 :   }
     474              :   return false;
     475              : }
     476              : 
     477            0 :   void CTBModule::send_matched_trigger_word(const content::word::trigger_t& word, uint64_t payload) {  // NOLINT
     478              :   // Send HSI data to a DLH
     479            0 :   std::array<uint32_t, 7> hsi_struct;   // NOLINT
     480            0 :   bool is_hlt = word.IsHLT();
     481            0 :   hsi_struct[0] = (is_hlt << 26)            |  // link
     482            0 :                   (m_slot << 22)   |  
     483            0 :                   (m_crate << 12)  | 
     484            0 :                   (m_det << 6)     |
     485              :                   CTB_HSI_FRAME_VERSION
     486              :                   ;
     487            0 :   hsi_struct[1] = word.timestamp;        // ts low
     488            0 :   hsi_struct[2] = word.timestamp >> 32;  // ts high
     489            0 :   hsi_struct[3] = payload;                // lower 32b
     490            0 :   hsi_struct[4] = payload >> 32;          // upper 32b (will be 0x0 for llt payloads)
     491            0 :   hsi_struct[5] = word.trigger_word;     // trigger_map;
     492            0 :   hsi_struct[6] = is_hlt ? m_run_HLT_counter : m_run_LLT_counter; // m_generated_counter;
     493            0 :   int dbg_lvl = is_hlt ? 4 : 6;
     494            0 :   TLOG_DEBUG(dbg_lvl) << get_name() << ": Formed HSI_FRAME_STRUCT for " << (is_hlt? "HLT" : "LLT")
     495            0 :       << std::hex 
     496            0 :       << "0x"   << hsi_struct[0]
     497            0 :       << ", 0x" << hsi_struct[1]
     498            0 :       << ", 0x" << hsi_struct[2]
     499            0 :       << ", 0x" << hsi_struct[3]
     500            0 :       << ", 0x" << hsi_struct[4]
     501            0 :       << ", 0x" << hsi_struct[5]
     502            0 :       << ", 0x" << hsi_struct[6]
     503            0 :       << "\n";
     504            0 :   if (is_hlt) {
     505            0 :     send_raw_hsi_data(hsi_struct, m_hlt_hsi_data_sender.get());
     506            0 :     dfmessages::HSIEvent event(m_det, word.trigger_word, word.timestamp, m_run_HLT_counter, m_run_number);
     507            0 :     send_hsi_event(event);
     508              :   } else {
     509            0 :     send_raw_hsi_data(hsi_struct, m_llt_hsi_data_sender.get());
     510              :   }
     511              :   
     512            0 : }
     513              : 
     514            0 : void CTBModule::match_between_buffers(std::queue<content::word::trigger_t>& buf_a, std::queue<ts_payload>& buf_b,
     515              :                                       uint64_t timeout_reference, content::word::word_type buf_a_wtype) {  // NOLINT
     516            0 :   bool is_hlt = (buf_a_wtype == content::word::word_type::t_gt);
     517            0 :   while (buf_a.size() > 0) {
     518            0 :     content::word::trigger_t trigger = buf_a.front();
     519            0 :     auto trigger_ts = trigger.timestamp;
     520            0 :     auto trigger_word = trigger.trigger_word;
     521            0 :     if (timeout_reference > (trigger_ts + 100)) {
     522            0 :       std::stringstream msg;
     523            0 :       msg << "Time out while waiting for a match for the "<< (is_hlt? "HLT" : "LLT")
     524            0 :           << ": TS = " << trigger_ts << ", trigger word = " << std::hex << "0x"<< trigger_word
     525            0 :           << " Timeout reference: " << std::dec << timeout_reference;
     526            0 :       ers::warning(CTBWordMatchWarning(ERS_HERE, msg.str()));
     527            0 :       buf_a.pop();
     528            0 :       continue;
     529            0 :     }
     530            0 :     if (buf_b.size() == 0) break; // no input to match yet. Return for now, wait for matching input to come
     531            0 :     while (buf_b.size() > 0) {
     532            0 :       auto input_ts = buf_b.front().first;
     533            0 :       if (input_ts < (trigger_ts - 1)) { // word is too early. No longer needed
     534            0 :         if (is_hlt) last_popped_llt = buf_b.front();
     535            0 :         else last_popped_chstatus = buf_b.front();
     536            0 :         buf_b.pop();
     537            0 :       } else if (input_ts == (trigger_ts - 1)) { // match is found
     538            0 :         send_matched_trigger_word(trigger, buf_b.front().second);
     539            0 :         buf_a.pop();
     540            0 :         if (is_hlt) last_popped_llt = buf_b.front();
     541            0 :         else last_popped_chstatus = buf_b.front();
     542            0 :         buf_b.pop();
     543            0 :         break;
     544              :       } else { // buf_b is already past the match window. No matching is found, error!
     545            0 :         if (is_hlt && (trigger_word == 0x1 || trigger_word == (0x1 << 16))) { // Fake HLTs, no matching is OK
     546            0 :           send_matched_trigger_word(trigger, 0);
     547              :         } else{
     548            0 :           std::stringstream msg;
     549            0 :           msg << "No match found for " << (is_hlt? "HLT" : "LLT")
     550            0 :               << ": TS = " << trigger_ts << ", trigger word = 0x" << std::hex << trigger_word
     551            0 :               << " Adjacent input ts: " << std::dec << (is_hlt? last_popped_llt.first : last_popped_chstatus.first) << " "
     552            0 :               << input_ts;
     553            0 :           ers::warning(CTBWordMatchWarning(ERS_HERE, msg.str()));
     554            0 :         }
     555            0 :         buf_a.pop();
     556            0 :         break;
     557              :       } 
     558              :     } // end loop buf_b
     559              :   } // end loop buf_a
     560              :   // Don't let buf_b get too long (e.g. when LLT rate is high but HLT rate is low)
     561            0 :   while (buf_b.size() > 32) {
     562            0 :     if (is_hlt) last_popped_llt = buf_b.front();
     563            0 :     else last_popped_chstatus = buf_b.front();
     564            0 :     buf_b.pop();
     565              :   }
     566            0 : }
     567              : 
     568              : 
     569              : template<typename T>
     570            0 : bool CTBModule::read( T &obj) {
     571              : 
     572            0 :   boost::system::error_code receiving_error;
     573            0 :   boost::asio::read( m_receiver_socket, boost::asio::buffer( &obj, sizeof(T) ), receiving_error ) ;
     574              : 
     575            0 :   if ( ! receiving_error ) {
     576              :     return true ;
     577              :   }
     578              : 
     579            0 :   if ( receiving_error == boost::asio::error::eof) {
     580            0 :     std::string error_message = "Socket closed: " + receiving_error.message();
     581            0 :     ers::error(CTBCommunicationError(ERS_HERE, error_message));
     582              :     return false ;
     583            0 :   }
     584              : 
     585              :   if ( receiving_error ) {
     586            0 :     std::string error_message = "Read failure: " + receiving_error.message();
     587            0 :     ers::error(CTBCommunicationError(ERS_HERE, error_message));
     588              :     return false ;
     589            0 :   }
     590              : 
     591              :   return true ;
     592              : }
     593              : 
     594            0 : bool CTBModule::IsTSWord( const content::word::word_t &w ) noexcept {
     595              : 
     596            0 :   if ( w.word_type == content::word::t_ts ) {
     597            0 :     return true;
     598              :   }
     599              :   return false;
     600              : 
     601              : }
     602              : 
     603            0 : bool CTBModule::IsFeedbackWord( const content::word::word_t &w ) noexcept {
     604              : 
     605            0 :   if ( w.word_type == content::word::t_fback ) {
     606            0 :     return true;
     607              :   }
     608              :   return false;
     609              : 
     610              : }
     611              : 
     612            0 : void CTBModule::init_calibration_file() {
     613              : 
     614            0 :   if ( ! m_has_calibration_stream ){
     615            0 :     return ;
     616              :   } 
     617            0 :   std::array<char, 200> file_name;
     618            0 :   time_t rawtime = time( nullptr ) ;
     619            0 :   struct tm timeinfo;
     620            0 :   localtime_r( & rawtime, & timeinfo) ;
     621            0 :   strftime( file_name.data(), file_name.size(), "%F_%H.%M.%S.calib", & timeinfo );
     622            0 :   std::string global_name = m_calibration_dir + m_calibration_prefix + file_name.data() ;
     623            0 :   m_calibration_file.open( global_name, std::ofstream::binary ) ;
     624            0 :   m_last_calibration_file_update = std::chrono::steady_clock::now();
     625              :   // _calibration_file.setf ( std::ios::hex, std::ios::basefield );
     626              :   // _calibration_file.unsetf ( std::ios::showbase );
     627            0 :   TLOG_DEBUG(0) << get_name() << ": New Calibration Stream file: " << global_name << std::endl ;
     628              : 
     629            0 : }
     630              : 
     631            0 : void CTBModule::update_calibration_file() {
     632              : 
     633            0 :   if ( ! m_has_calibration_stream ) {
     634            0 :     return ;
     635              :   }
     636              :   
     637            0 :   std::chrono::steady_clock::time_point check_point = std::chrono::steady_clock::now();
     638              :   
     639            0 :   if ( check_point - m_last_calibration_file_update < m_calibration_file_interval ) {
     640              :     return ;
     641              :   }
     642              : 
     643            0 :   m_calibration_file.close() ;
     644            0 :   init_calibration_file() ;
     645              : 
     646              : }
     647              : 
     648            0 : bool CTBModule::SetCalibrationStream( const std::string & prefix ) {
     649              : 
     650            0 :   if ( m_calibration_dir.back() != '/' ){
     651            0 :     m_calibration_dir += '/' ;
     652              :   }
     653            0 :   m_calibration_prefix = prefix ; 
     654            0 :   if ( prefix.size() > 0 ){ 
     655            0 :     m_calibration_prefix += '_' ;
     656              :   } 
     657              :   // possibly we could check here if the directory is valid and  writable before assuming the calibration stream is valid
     658            0 :   return true ;
     659              : 
     660              : }
     661              : 
     662              : 
     663            0 : void CTBModule::send_config( const std::string & config ) {
     664              : 
     665            0 :   if ( m_is_configured.load() ) {
     666              : 
     667            0 :     TLOG_DEBUG(1) << get_name() << ": Resetting before configuring" << std::endl;
     668            0 :     send_reset();
     669              : 
     670              :   }
     671              : 
     672            0 :   TLOG_DEBUG(1) << get_name() << ": Sending config" << std::endl;
     673              : 
     674            0 :   if ( send_message( config ) ) {
     675              : 
     676            0 :     m_is_configured.store(true) ;
     677              :     
     678              :   } else {
     679            0 :     throw CTBCommunicationError(ERS_HERE, "Unable to configure CTB");
     680              :   }
     681            0 : }
     682              : 
     683            0 : void CTBModule::send_reset() {
     684              : 
     685            0 :   TLOG_DEBUG(1) << get_name() << ": Sending a reset" << std::endl;
     686              : 
     687            0 :   if(send_message( "{\"command\":\"HardReset\"}" )){
     688              : 
     689            0 :     m_is_running.store(false);
     690            0 :     m_is_configured.store(false);
     691              : 
     692              :   } else {
     693            0 :     ers::error(CTBCommunicationError(ERS_HERE, "Unable to reset CTB"));
     694              :   }
     695              :   
     696            0 : }
     697              : 
     698            0 : bool CTBModule::send_message( const std::string & msg ) {
     699              : 
     700              :   //add error options                                                                                                
     701              : 
     702            0 :   boost::system::error_code error;
     703            0 :   TLOG_DEBUG(1) << get_name() << ": Sending message: " << msg;
     704              : 
     705            0 :   m_num_control_messages_sent++;
     706              : 
     707            0 :   boost::asio::write( m_control_socket, boost::asio::buffer( msg ), error ) ;
     708            0 :   boost::array<char, 4096> reply_buf{" "} ;
     709            0 :   m_control_socket.read_some( boost::asio::buffer(reply_buf ), error);
     710            0 :   std::stringstream raw_answer( std::string(reply_buf .begin(), reply_buf .end() ) ) ;
     711            0 :   TLOG_DEBUG(1) << get_name() << ": Unformatted answer: " << raw_answer.str(); 
     712              : 
     713            0 :   nlohmann::json answer ;
     714            0 :   raw_answer >> answer ;
     715            0 :   nlohmann::json & messages = answer["feedback"] ;
     716            0 :   TLOG_DEBUG(1) << get_name() << ": Received messages: " << messages.size();
     717              : 
     718            0 :   bool ret = true ;
     719            0 :   for (nlohmann::json::size_type i = 0; i != messages.size(); ++i ) {
     720              :     
     721            0 :     m_num_control_responses_received++;
     722              : 
     723            0 :     std::string type = messages[i]["type"].dump() ;
     724            0 :     if ( type.find("error") != std::string::npos || type.find("Error") != std::string::npos || type.find("ERROR") != std::string::npos ) {
     725            0 :       ers::error(CTBMessage(ERS_HERE, messages[i]["message"].dump()));
     726            0 :       ret = false ;
     727            0 :     } else if ( type.find("warning") != std::string::npos || type.find("Warning") != std::string::npos || type.find("WARNING") != std::string::npos ) {
     728            0 :       ers::warning(CTBMessage(ERS_HERE, messages[i]["message"].dump()));
     729            0 :     } else if ( type.find("info") != std::string::npos || type.find("Info") != std::string::npos || type.find("INFO") != std::string::npos) {
     730            0 :       TLOG() << "Message from the board: " << messages[i]["message"].dump();
     731              :     } else {
     732            0 :       std::stringstream blob;
     733            0 :       blob << messages[i] ;
     734            0 :       TLOG() << get_name() << ": Unformatted from the board: " << blob.str();
     735            0 :     }
     736            0 :   }
     737              : 
     738            0 :   return ret;
     739              :   
     740            0 : }
     741              : 
     742              : void
     743            0 : CTBModule::update_buffer_counts(uint new_count) // NOLINT(build/unsigned)
     744              : {
     745            0 :   std::unique_lock mon_data_lock(m_buffer_counts_mutex);
     746            0 :   if (m_buffer_counts.size() > 1000)
     747            0 :     m_buffer_counts.pop_front();
     748            0 :   m_buffer_counts.push_back(new_count);
     749            0 : }
     750              : 
     751              : double
     752            0 : CTBModule::read_average_buffer_counts()
     753              : {
     754            0 :   std::unique_lock mon_data_lock(m_buffer_counts_mutex);
     755              : 
     756            0 :   double total_counts = 0;;
     757            0 :   uint32_t number_of_counts = m_buffer_counts.size(); // NOLINT(build/unsigned)
     758              : 
     759            0 :   if (number_of_counts) {
     760            0 :     for (uint i = 0; i < number_of_counts; ++i) { // NOLINT(build/unsigned)
     761            0 :       total_counts = total_counts + m_buffer_counts.at(i);
     762              :     }
     763            0 :     return total_counts / number_of_counts;
     764              :   } else {
     765              :     return 0;
     766              :   }
     767            0 : }
     768              : 
     769            0 : void CTBModule::generate_opmon_data() 
     770              : {
     771            0 :   dunedaq::ctbmodules::opmon::CTBModuleInfo module_info;
     772              : 
     773            0 :   module_info.set_num_control_messages_sent(m_num_control_messages_sent.load());
     774            0 :   module_info.set_num_control_responses_received(m_num_control_responses_received.load());
     775            0 :   module_info.set_ctb_hardware_running(m_is_running.load()); 
     776            0 :   module_info.set_ctb_hardware_configured(m_is_configured.load());
     777              :     
     778            0 :   module_info.set_last_readout_timestamp(m_last_readout_hlt_timestamp.load());
     779            0 :   module_info.set_failed_to_send_hsi_events_counter( m_failed_to_send_counter.load() );
     780            0 :   module_info.set_last_sent_timestamp(m_last_sent_timestamp.load());
     781            0 :   module_info.set_average_buffer_occupancy( read_average_buffer_counts() );
     782              : 
     783            0 :   module_info.set_total_hlt_count(m_total_hlt_counter.load() );
     784            0 :   module_info.set_ts_word_count(m_ts_word_counter.exchange(0));
     785              : 
     786            0 :   publish( std::move(module_info) );
     787              : 
     788            0 :   for (auto &hlt : m_hlt_trigger_counter) {
     789            0 :     dunedaq::ctbmodules::opmon::TriggerInfo ti;
     790            0 :     ti.set_count(hlt.second.exchange(0));
     791            0 :     publish( std::move(ti), {{ "trigger", "HLT_" + std::to_string(hlt.first)}} );
     792            0 :   }
     793              : 
     794            0 :   for (auto &llt : m_llt_trigger_counter) {
     795            0 :     dunedaq::ctbmodules::opmon::TriggerInfo ti;
     796            0 :     ti.set_count(llt.second.exchange(0));
     797            0 :     publish( std::move(ti), {{ "trigger", "LLT_" + std::to_string(llt.first)}} );
     798            0 :   }
     799              : 
     800            0 : }
     801              : 
     802            0 : DEFINE_DUNE_DAQ_MODULE(dunedaq::ctbmodules::CTBModule)
     803              : 
     804              : // Local Variables:
     805              : // c-basic-offset: 2
     806              : // End:
        

Generated by: LCOV version 2.0-1