LCOV - code coverage report
Current view: top level - daphnemodules/src - DaphneV3Interface.cpp (source / functions) Coverage Total Hit
Test: code.result Lines: 62.5 % 72 45
Test Date: 2025-12-21 13:07:08 Functions: 62.5 % 8 5

            Line data    Source code
       1              : /** 
       2              :  * @file DaphneV3Interface.cpp
       3              :  *  
       4              :  * Implementations of DaphneV3Interface's functions                                                                    
       5              :  * 
       6              :  * This is part of the DUNE DAQ Software Suite, copyright 2020.                                                      
       7              :  * Licensing/copyright details are in the COPYING file that you should have         
       8              :  * received with this code.
       9              :  *
      10              :  */
      11              : 
      12              : #include "DaphneV3Interface.hpp"
      13              : 
      14              : #include "logging/Logging.hpp"
      15              : #include <fmt/format.h>
      16              : 
      17              : #include <regex>
      18              : #include <string>
      19              : #include <utility>
      20              : 
      21              : using namespace dunedaq::daphnemodules;
      22              : using namespace daphne;
      23              : 
      24            4 : DaphneV3Interface::DaphneV3Interface( std::string address,
      25              :                                       std::string routing, 
      26            4 :                                       std::chrono::milliseconds timeout)
      27            4 :   : m_context(1)
      28            4 :   , m_socket(m_context, zmq::socket_type::dealer)
      29            4 :   , m_timeout(timeout) {
      30              : 
      31              : 
      32            4 :   m_socket.set(zmq::sockopt::routing_id, routing);
      33            4 :   auto value = (int) m_timeout.count();  // NOLINT
      34            8 :   TLOG() << routing << " timeout set to " << value << " ms";
      35            4 :   m_socket.set(zmq::sockopt::rcvtimeo, value);
      36            4 :   m_socket.set(zmq::sockopt::sndtimeo, value);
      37            4 :   m_socket.set(zmq::sockopt::immediate, 1); // Don't queue messages to incomplete connections
      38              :   
      39              :   // find out if the address has a port with a regex
      40            4 :   static const std::regex ip_with_port(R"(^([^\/\s:]+)(?::(\d{1,5}))?$)");
      41            4 :   std::smatch string_values; 
      42            4 :   if (! std::regex_match( address, string_values, ip_with_port ) ) {
      43            2 :     throw InvalidIPAddress(ERS_HERE, address);
      44              :   }
      45              : 
      46            6 :   m_connection = string_values[2].matched ?
      47              :     fmt::format("tcp://{}", address) :
      48            2 :     fmt::format("tcp://{}:{}", address, s_default_control_port) ;
      49            4 :   TLOG() << "Connecting to: " << m_connection;
      50              :   
      51            2 :   m_socket.connect(m_connection);
      52              : 
      53            2 :   auto add = string_values[1];
      54            2 :   auto port = string_values[2].matched ? std::stoi(string_values[2]) : s_default_control_port;
      55              :       
      56            2 :   try {
      57            2 :     if ( ! validate_connection() ) {
      58            0 :       auto add = string_values[1];
      59            0 :       auto port = string_values[2].matched ? std::stoi(string_values[2]) : s_default_control_port;
      60            0 :       throw FailedPing(ERS_HERE, add, port );
      61              :     }
      62            2 :   } catch ( const ers::Issue & e ) {
      63            2 :     throw FailedPing(ERS_HERE, add, port, e );
      64            2 :   }
      65              :     
      66           16 : }
      67              : 
      68              : 
      69            0 : void DaphneV3Interface::close() {
      70              : 
      71            0 :   const std::lock_guard<std::mutex> lock(m_access_mutex);
      72            0 :   m_socket.set(zmq::sockopt::linger, 0);
      73            0 :   m_socket.close();
      74              :   
      75            0 : }
      76              : 
      77              : 
      78            0 : ControlEnvelopeV2 DaphneV3Interface::send( std::string && message, daphne::MessageTypeV2 type) {
      79              : 
      80            0 :   const std::lock_guard<std::mutex> lock(m_access_mutex);
      81              :   
      82            0 :   _send(std::move(message), type);
      83              : 
      84            0 :   return _receive();
      85            0 : }
      86              : 
      87              : 
      88            2 : void DaphneV3Interface::_send( std::string && message, daphne::MessageTypeV2 type) {
      89              : 
      90            2 :   ControlEnvelopeV2 env;
      91            2 :   env.set_version(2);
      92            2 :   env.set_dir(DIR_REQUEST);
      93              : 
      94            2 :   env.set_type(type);
      95            2 :   env.set_payload(message);
      96              : 
      97              :   // additional information
      98            2 :   env.set_msg_id( m_message_counter++ );
      99            2 :   env.set_timestamp_ns(std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::steady_clock::now().time_since_epoch()).count());
     100              : 
     101              :   
     102            2 :   std::string bytes = env.SerializeAsString();
     103              : 
     104            2 :   if (! m_socket.send(zmq::buffer(bytes),  zmq::send_flags::none)) {
     105            2 :     throw FailedSend(ERS_HERE, MessageTypeV2_Name(type) );
     106              :   }
     107            4 : }
     108              : 
     109            0 : ControlEnvelopeV2 DaphneV3Interface::_receive() {
     110              : 
     111            0 :   zmq::message_t reply;
     112            0 :   if (! m_socket.recv(reply, zmq::recv_flags::none)) {
     113              :     // timeout or EAGAIN
     114            0 :     throw FailedReceive(ERS_HERE, m_connection);
     115              :   }
     116            0 :   if (reply.size() <= 0) {
     117            0 :     throw EmptyPayload(ERS_HERE, m_connection);
     118              :   }
     119              :   
     120            0 :   ControlEnvelopeV2 rep;
     121            0 :   if (!rep.ParseFromArray(reply.data(), static_cast<int>(reply.size()) )) {
     122            0 :     throw FailedDecoding(ERS_HERE, rep.GetTypeName(), reply.to_string());
     123              :   }
     124              :   
     125            0 :   if (rep.version() != 2 || rep.dir() != DIR_RESPONSE) {
     126            0 :     ers::warning(UnexpectedDirection(ERS_HERE, rep.version(), Direction_Name(rep.dir()), reply.to_string() )); 
     127              :   }
     128              : 
     129            0 :   return rep;
     130            0 : }
     131              : 
     132              : 
     133            2 : bool DaphneV3Interface::validate_connection()
     134              : {
     135            2 :   static const uint64_t good_value = 0xdeadbeef;  // NOLINT
     136              : 
     137            2 :   TestRegRequest req; // empty
     138            4 :   auto reply = send<TestRegResponse>( req.SerializeAsString(),
     139              :                                       MessageTypeV2::MT2_READ_TEST_REG_REQ,
     140            2 :                                       MessageTypeV2::MT2_READ_TEST_REG_RESP );
     141              :   
     142            0 :   return reply.value() == good_value;
     143            2 : }
     144              : 
     145              : 
     146              : 
        

Generated by: LCOV version 2.0-1