20 RdKafka::Conf * k_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
23 auto it = conf.find(
"bootstrap");
24 if ( it == conf.end() ) {
25 std::cerr <<
"Missing bootstrap from json file";
26 throw std::runtime_error(
"Missing bootstrap from json file" );
29 k_conf->set(
"bootstrap.servers", *it, errstr);
31 throw std::runtime_error( errstr );
34 std::string client_id;
35 it = conf.find(
"cliend_id" );
36 if ( it != conf.end() )
38 else if(
const char* env_p = std::getenv(
"DUNEDAQ_APPLICATION_NAME"))
41 client_id =
"erskafkaproducerdefault";
43 k_conf->set(
"client.id", client_id, errstr);
45 throw std::runtime_error( errstr );
49 m_producer.reset(RdKafka::Producer::create(k_conf, errstr));
52 throw std::runtime_error( errstr );
55 it = conf.find(
"default_topic");
63 issue.SerializeToString( & binary );
72 RdKafka::Topic::PARTITION_UA,
73 RdKafka::Producer::RK_MSG_COPY,
74 const_cast<char *
>(binary.c_str()), binary.size(),
79 if (err != RdKafka::ERR_NO_ERROR) {
bool publish(dunedaq::ersschema::IssueChain &&) const
std::unique_ptr< RdKafka::Producer > m_producer
std::string m_default_topic
std::string key(const dunedaq::ersschema::IssueChain &i) const
std::string topic(const dunedaq::ersschema::IssueChain &) const