LCOV - code coverage report
Current view: top level - erskafka/src - KafkaStream.cpp (source / functions) Coverage Total Hit
Test: code.result Lines: 0.0 % 82 0
Test Date: 2025-12-21 13:07:08 Functions: 0.0 % 6 0

            Line data    Source code
       1              : /**
       2              : * @file KafkaStream.cpp KafkaStream class implementation
       3              : * 
       4              : * This is part of the DUNE DAQ software, copyright 2020.
       5              : * Licensing/copyright details are in the COPYING file that you should have
       6              : * received with this code.
       7              : */
       8              : 
       9              : #include "KafkaStream.hpp"
      10              : #include <ers/StreamFactory.hpp>
      11              : #include <string>
      12              : #include <iostream>
      13              : #include <chrono>
      14              : #include <boost/crc.hpp>
      15              : #include <vector>
      16              : 
      17            0 : ERS_REGISTER_OUTPUT_STREAM(erskafka::KafkaStream, "erskafka", param)
      18              : 
      19              : /** Constructor that creates a new instance of the erskafka stream with the given configuration.
      20              :   * \param format elastic search connection string.
      21              :   */
      22              : namespace erskafka
      23              : {   
      24            0 :   erskafka::KafkaStream::KafkaStream(const std::string &param)
      25              :   {
      26              : 
      27            0 :     if(const char* env_p = std::getenv("DUNEDAQ_PARTITION")) 
      28            0 :       m_partition = env_p;
      29              :   
      30              :     //Kafka server settings
      31            0 :     std::string brokers = param;
      32            0 :     std::string errstr;
      33              : 
      34            0 :     RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
      35            0 :     conf->set("bootstrap.servers", brokers, errstr);
      36            0 :     if(errstr != ""){
      37            0 :       std::cout << "Bootstrap server error : " << errstr << '\n';
      38              :     }
      39            0 :     if(const char* env_p = std::getenv("DUNEDAQ_APPLICATION_NAME")) 
      40            0 :       conf->set("client.id", env_p, errstr);
      41              :     else
      42            0 :       conf->set("client.id", "erskafkaproducerdefault", errstr);
      43            0 :     if(errstr != ""){
      44            0 :       std::cout << "Producer configuration error : " << errstr << '\n';
      45              :     }
      46              :     //Create producer instance
      47            0 :     m_producer = RdKafka::Producer::create(conf, errstr);
      48              : 
      49            0 :     if(errstr != ""){
      50            0 :       std::cout << "Producer creation error : " << errstr << '\n';
      51              :     }
      52              : 
      53            0 :   }
      54              : 
      55            0 :   void erskafka::KafkaStream::ers_to_json(const ers::Issue &issue, size_t chain, std::vector<nlohmann::json> &j_objs)
      56              :   {
      57            0 :     try
      58              :     {
      59            0 :       nlohmann::json message;
      60            0 :       message["partition"] = m_partition.c_str();
      61            0 :       message["issue_name"] = issue.get_class_name();
      62            0 :       message["message"] = issue.message().c_str();
      63            0 :       message["severity"] = ers::to_string(issue.severity());
      64            0 :       message["usecs_since_epoch"] = std::chrono::duration_cast<std::chrono::microseconds>(issue.ptime().time_since_epoch()).count();
      65            0 :       message["time"] = std::chrono::duration_cast<std::chrono::milliseconds>(issue.ptime().time_since_epoch()).count();
      66              : 
      67            0 :       message["qualifiers"] = issue.qualifiers();
      68            0 :       message["params"] = nlohmann::json::array({});
      69            0 :       for (auto p : issue.parameters())
      70              :       {
      71            0 :         message["params"].push_back(p.first + ": " + p.second);
      72            0 :       }
      73            0 :       message["cwd"] = issue.context().cwd();
      74            0 :       message["file_name"] = issue.context().file_name();
      75            0 :       message["function_name"] = issue.context().function_name();
      76            0 :       message["host_name"] = issue.context().host_name();
      77            0 :       message["package_name"] = issue.context().package_name();
      78            0 :       message["user_name"] = issue.context().user_name();
      79            0 :       message["application_name"] = issue.context().application_name();
      80            0 :       message["user_id"] = issue.context().user_id();
      81            0 :       message["process_id"] = issue.context().process_id();
      82            0 :       message["thread_id"] = issue.context().thread_id();
      83            0 :       message["line_number"] = issue.context().line_number();
      84            0 :       message["chain"] = chain;
      85              : 
      86            0 :       if (issue.cause())
      87              :       {
      88            0 :         ers_to_json(*issue.cause(), 2, j_objs); 
      89              :       }
      90            0 :       j_objs.push_back(message);
      91            0 :     }
      92            0 :     catch(const std::exception& e)
      93              :     {
      94            0 :       std::cout << "Conversion from json error : " << e.what() << '\n';
      95            0 :     }
      96            0 :   }
      97              : 
      98            0 :   void erskafka::KafkaStream::kafka_exporter(std::string input, std::string topic)
      99              :   {
     100            0 :     try
     101              :     {
     102              :       // RdKafka::Producer::RK_MSG_COPY to be investigated
     103            0 :       RdKafka::ErrorCode err = m_producer->produce(topic, RdKafka::Topic::PARTITION_UA, RdKafka::Producer::RK_MSG_COPY, const_cast<char *>(input.c_str()), input.size(), nullptr, 0, 0, nullptr, nullptr);
     104            0 :       if (err != RdKafka::ERR_NO_ERROR) { std::cout << "% Failed to produce to topic " << topic << ": " << RdKafka::err2str(err) << std::endl;}
     105              :     }
     106            0 :     catch(const std::exception& e)
     107              :     {
     108            0 :       std::cout << "Producer error : " << e.what() << '\n';
     109            0 :     }
     110            0 :   }
     111              : 
     112              :   /** Write method 
     113              :     * \param issue issue to be sent.
     114              :     */
     115            0 :   void erskafka::KafkaStream::write(const ers::Issue &issue)
     116              :   {
     117            0 :     try
     118              :     {
     119            0 :       std::vector<nlohmann::json> j_objs;
     120            0 :       if (issue.cause() == nullptr)
     121              :       {
     122            0 :         ers_to_json(issue, 0, j_objs);
     123              :       }
     124              :       else
     125              :       {
     126            0 :         ers_to_json(issue, 1, j_objs);
     127              :       }
     128              : 
     129              :       // build a unique hash for a group of nested issues
     130            0 :       std::ostringstream tmpstream(issue.message());
     131            0 :       tmpstream << issue.context().process_id() << issue.time_t() << issue.context().application_name() << issue.context().host_name() << rand();
     132            0 :       std::string tmp = tmpstream.str();
     133            0 :       boost::crc_32_type crc32;
     134            0 :       crc32.process_bytes(tmp.c_str(), tmp.length());
     135              : 
     136            0 :       for (auto j : j_objs)
     137              :       {
     138            0 :         j["group_hash"] = crc32.checksum();
     139              : 
     140            0 :         erskafka::KafkaStream::kafka_exporter(j.dump(), "erskafka-reporting");
     141            0 :       }
     142            0 :       chained().write(issue);
     143            0 :     }
     144            0 :     catch(const std::exception& e)
     145              :     {
     146            0 :       std::cout << "Producer error : " << e.what() << '\n';
     147            0 :     }
     148            0 :   }
     149              : } // namespace erskafka
        

Generated by: LCOV version 2.0-1