LCOV - code coverage report
Current view: top level - kafkaopmon/src - OpMonPublisher.cpp (source / functions) Coverage Total Hit
Test: code.result Lines: 44.6 % 56 25
Test Date: 2025-12-21 13:07:08 Functions: 66.7 % 3 2

            Line data    Source code
       1              : /**
       2              :  * @file OpMonPublisher.cpp OpMonPublisher Class Implementation
       3              :  *  
       4              :  * This is part of the DUNE DAQ Software Suite, copyright 2020.
       5              :  * Licensing/copyright details are in the COPYING file that you should have
       6              :  * received with this code.
       7              :  */
       8              : 
       9              : 
      10              : #include "kafkaopmon/OpMonPublisher.hpp"
      11              : 
      12              : using namespace dunedaq::kafkaopmon;
      13              : 
      14            3 : OpMonPublisher::OpMonPublisher( const nlohmann::json& conf) {
      15              : 
      16            3 :   RdKafka::Conf * k_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
      17            3 :   std::string errstr;
      18              : 
      19              :   // Good observations on threadsafety here
      20              :   // https://www.confluent.io/blog/modern-cpp-kafka-api-for-safe-easy-messaging/
      21              :   
      22            3 :   auto it = conf.find("bootstrap");
      23            3 :   if ( it == conf.end() ) {
      24            1 :     throw MissingParameter(ERS_HERE,
      25              :                            "bootstrap",
      26            2 :                            nlohmann::to_string(conf) );
      27              :   }
      28              :   
      29            2 :   k_conf->set("bootstrap.servers", *it, errstr);
      30            2 :   if( ! errstr.empty() ) {
      31            0 :     throw FailedConfiguration(ERS_HERE,
      32              :                               "bootstrap.servers",
      33            0 :                               errstr);
      34              :   }
      35              :   
      36            2 :   std::string client_id;
      37            2 :   it = conf.find( "cliend_id" );
      38            2 :   if ( it != conf.end() )
      39            0 :     client_id = *it;
      40              :   else
      41            2 :     client_id = "kafkaopmon_default_producer";
      42              :   
      43            2 :   k_conf->set("client.id", client_id, errstr);    
      44            2 :   if ( ! errstr.empty() ) {
      45            0 :     ers::error( FailedConfiguration(ERS_HERE, "client.id", errstr ) );
      46              :   }
      47              : 
      48              :   // Create producer instance
      49            2 :   m_producer.reset(RdKafka::Producer::create(k_conf, errstr));
      50              :     
      51            2 :   if( ! m_producer ){
      52            0 :     throw FailedProducerCreation(ERS_HERE, errstr);
      53              :   }
      54              :     
      55            2 :   it = conf.find("default_topic");
      56            2 :   if (it != conf.end()) m_default_topic = "monitoring." + it->get<std::string>();
      57              :   
      58            5 : }
      59              : 
      60              : 
      61            2 : OpMonPublisher::~OpMonPublisher() {
      62              : 
      63            2 :   int timeout_ms = 500;
      64            2 :   RdKafka::ErrorCode err = m_producer -> flush( timeout_ms );
      65              : 
      66            2 :   if ( err == RdKafka::ERR__TIMED_OUT ) {
      67            0 :     ers::warning( TimeoutReachedWhileFlushing( ERS_HERE, timeout_ms ) );
      68              :   }
      69              :   
      70            2 : }
      71              : 
      72              : 
      73            0 : void OpMonPublisher::publish( dunedaq::opmon::OpMonEntry && entry ) const {
      74              : 
      75            0 :   std::string binary;
      76            0 :   entry.SerializeToString( & binary );
      77              : 
      78            0 :   auto topic = extract_topic( entry );
      79            0 :   auto key   = extract_key( entry );
      80              : 
      81            0 :   RdKafka::ErrorCode err = m_producer -> produce( topic,
      82              :                                                   RdKafka::Topic::PARTITION_UA,
      83              :                                                   RdKafka::Producer::RK_MSG_COPY,
      84            0 :                                                   const_cast<char *>(binary.c_str()), binary.size(),
      85            0 :                                                   key.c_str(), key.size(),
      86              :                                                   0,
      87              :                                                   nullptr
      88              :                                                   );
      89              : 
      90            0 :   if ( err == RdKafka::ERR_NO_ERROR ) return ;
      91              : 
      92            0 :   std::string err_cause;
      93              :   
      94            0 :   switch( err ) {
      95            0 :   case RdKafka::ERR__QUEUE_FULL :
      96            0 :     err_cause = "maximum number of outstanding messages reached";
      97              :     break;
      98            0 :   case RdKafka::ERR_MSG_SIZE_TOO_LARGE :
      99            0 :     err_cause = "message too large";
     100              :     break;
     101            0 :   case RdKafka::ERR__UNKNOWN_PARTITION :
     102            0 :     err_cause = "Unknown partition";
     103              :     break;
     104            0 :   case RdKafka::ERR__UNKNOWN_TOPIC :
     105            0 :     err_cause = "Unknown topic (" ;
     106            0 :     err_cause += topic;
     107            0 :     err_cause += ')';
     108              :     break;
     109            0 :   default:
     110            0 :     err_cause = "unknown";
     111              :     break;
     112              :   }
     113              : 
     114            0 :   throw FailedProduce(ERS_HERE, key, err_cause) ;
     115              : 
     116            0 : }
        

Generated by: LCOV version 2.0-1