LCOV - code coverage report
Current view: top level - erskafka/src - ERSPublisher.cpp (source / functions) Coverage Total Hit
Test: code.result Lines: 0.0 % 37 0
Test Date: 2025-12-21 13:07:08 Functions: 0.0 % 2 0

            Line data    Source code
       1              : /**
       2              :  * @file ERSPublisher.cpp ERSPublusher Class Implementation
       3              :  *  
       4              :  * This is part of the DUNE DAQ Software Suite, copyright 2020.
       5              :  * Licensing/copyright details are in the COPYING file that you should have
       6              :  * received with this code.
       7              :  */
       8              : 
       9              : #include "erskafka/ERSPublisher.hpp"
      10              : 
      11              : #include <iostream>
      12              : 
      13              : #include <stdexcept>
      14              : 
      15              : using namespace dunedaq::erskafka;
      16              : 
      17              : 
      18            0 : ERSPublisher::ERSPublisher(const nlohmann::json& conf) {
      19              : 
      20            0 :     RdKafka::Conf * k_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
      21            0 :     std::string errstr;
      22              : 
      23            0 :     auto it = conf.find("bootstrap");
      24            0 :     if ( it == conf.end() ) {
      25            0 :       std::cerr << "Missing bootstrap from json file";
      26            0 :       throw std::runtime_error( "Missing bootstrap from json file" );
      27              :     }
      28              : 
      29            0 :     k_conf->set("bootstrap.servers", *it, errstr);
      30            0 :     if(errstr != ""){
      31            0 :       throw std::runtime_error( errstr );
      32              :     }
      33              : 
      34            0 :     std::string client_id;
      35            0 :     it = conf.find( "cliend_id" );
      36            0 :     if ( it != conf.end() )
      37            0 :         client_id = *it;
      38            0 :     else if(const char* env_p = std::getenv("DUNEDAQ_APPLICATION_NAME")) 
      39            0 :         client_id = env_p;
      40              :     else
      41            0 :         client_id = "erskafkaproducerdefault";
      42              : 
      43            0 :     k_conf->set("client.id", client_id, errstr);    
      44            0 :     if(errstr != ""){
      45            0 :       throw std::runtime_error( errstr );
      46              :     }
      47              : 
      48              :     //Create producer instance
      49            0 :     m_producer.reset(RdKafka::Producer::create(k_conf, errstr));
      50              : 
      51            0 :     if(errstr != ""){
      52            0 :       throw std::runtime_error( errstr );
      53              :     }
      54              : 
      55            0 :     it = conf.find("default_topic");
      56            0 :     if (it != conf.end()) m_default_topic = *it;
      57              : 
      58            0 : }
      59              : 
      60            0 : bool ERSPublisher::publish( ersschema::IssueChain && issue ) const {
      61              : 
      62            0 :   std::string binary;
      63            0 :   issue.SerializeToString( & binary );
      64              :   
      65              :   // get the topic
      66            0 :   auto topic = ERSPublisher::topic(issue);
      67              : 
      68            0 :   auto key = ERSPublisher::key(issue);
      69              :   
      70              :   //      RdKafka::Producer::RK_MSG_COPY to be investigated
      71            0 :   RdKafka::ErrorCode err = m_producer->produce(topic, 
      72              :                                                RdKafka::Topic::PARTITION_UA, 
      73              :                                                RdKafka::Producer::RK_MSG_COPY, 
      74            0 :                                                const_cast<char *>(binary.c_str()), binary.size(), 
      75            0 :                                                key.c_str(),
      76              :                                                key.size(),
      77              :                                                0,
      78              :                                                nullptr);
      79            0 :   if (err != RdKafka::ERR_NO_ERROR) {
      80            0 :     return false;
      81              :   }
      82              : 
      83              :   return true;
      84            0 : }
      85              : 
        

Generated by: LCOV version 2.0-1