16 RdKafka::Conf * k_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
22 auto it = conf.find(
"bootstrap");
23 if ( it == conf.end() ) {
26 nlohmann::to_string(conf) );
29 k_conf->set(
"bootstrap.servers", *it, errstr);
30 if( ! errstr.empty() ) {
36 std::string client_id;
37 it = conf.find(
"cliend_id" );
38 if ( it != conf.end() )
41 client_id =
"kafkaopmon_default_producer";
43 k_conf->set(
"client.id", client_id, errstr);
44 if ( ! errstr.empty() ) {
49 m_producer.reset(RdKafka::Producer::create(k_conf, errstr));
52 throw FailedProducerCreation(
ERS_HERE, errstr);
55 it = conf.find(
"default_topic");
56 if (it != conf.end())
m_default_topic =
"monitoring." + it->get<std::string>();
66 if ( err == RdKafka::ERR__TIMED_OUT ) {
76 entry.SerializeToString( & binary );
82 RdKafka::Topic::PARTITION_UA,
83 RdKafka::Producer::RK_MSG_COPY,
84 const_cast<char *
>(binary.c_str()), binary.size(),
85 key.c_str(), key.size(),
90 if ( err == RdKafka::ERR_NO_ERROR ) return ;
92 std::string err_cause;
95 case RdKafka::ERR__QUEUE_FULL :
96 err_cause =
"maximum number of outstanding messages reached";
98 case RdKafka::ERR_MSG_SIZE_TOO_LARGE :
99 err_cause =
"message too large";
101 case RdKafka::ERR__UNKNOWN_PARTITION :
102 err_cause =
"Unknown partition";
104 case RdKafka::ERR__UNKNOWN_TOPIC :
105 err_cause =
"Unknown topic (" ;
110 err_cause =
"unknown";
114 throw FailedProduce(
ERS_HERE, key, err_cause) ;
std::unique_ptr< RdKafka::Producer > m_producer
std::string extract_key(const dunedaq::opmon::OpMonEntry &e) const noexcept
std::string extract_topic(const dunedaq::opmon::OpMonEntry &) const noexcept
void publish(dunedaq::opmon::OpMonEntry &&) const
std::string m_default_topic
void warning(const Issue &issue)
void error(const Issue &issue)