14#include <boost/crc.hpp>
27 if(
const char* env_p = std::getenv(
"DUNEDAQ_PARTITION"))
31 std::string brokers = param;
34 RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
35 conf->set(
"bootstrap.servers", brokers, errstr);
37 std::cout <<
"Bootstrap server error : " << errstr <<
'\n';
39 if(
const char* env_p = std::getenv(
"DUNEDAQ_APPLICATION_NAME"))
40 conf->set(
"client.id", env_p, errstr);
42 conf->set(
"client.id",
"erskafkaproducerdefault", errstr);
44 std::cout <<
"Producer configuration error : " << errstr <<
'\n';
47 m_producer = RdKafka::Producer::create(conf, errstr);
50 std::cout <<
"Producer creation error : " << errstr <<
'\n';
59 nlohmann::json message;
60 message[
"partition"] = m_partition.c_str();
62 message[
"message"] = issue.
message().c_str();
64 message[
"usecs_since_epoch"] = std::chrono::duration_cast<std::chrono::microseconds>(issue.
ptime().time_since_epoch()).count();
65 message[
"time"] = std::chrono::duration_cast<std::chrono::milliseconds>(issue.
ptime().time_since_epoch()).count();
68 message[
"params"] = nlohmann::json::array({});
71 message[
"params"].push_back(p.first +
": " + p.second);
84 message[
"chain"] = chain;
88 ers_to_json(*issue.
cause(), 2, j_objs);
90 j_objs.push_back(message);
92 catch(
const std::exception& e)
94 std::cout <<
"Conversion from json error : " << e.what() <<
'\n';
103 RdKafka::ErrorCode err = m_producer->produce(topic, RdKafka::Topic::PARTITION_UA, RdKafka::Producer::RK_MSG_COPY,
const_cast<char *
>(input.c_str()), input.size(),
nullptr, 0, 0,
nullptr,
nullptr);
104 if (err != RdKafka::ERR_NO_ERROR) { std::cout <<
"% Failed to produce to topic " << topic <<
": " << RdKafka::err2str(err) << std::endl;}
106 catch(
const std::exception& e)
108 std::cout <<
"Producer error : " << e.what() <<
'\n';
119 std::vector<nlohmann::json> j_objs;
120 if (issue.
cause() ==
nullptr)
122 ers_to_json(issue, 0, j_objs);
126 ers_to_json(issue, 1, j_objs);
130 std::ostringstream tmpstream(issue.
message());
132 std::string tmp = tmpstream.str();
133 boost::crc_32_type crc32;
134 crc32.process_bytes(tmp.c_str(), tmp.length());
136 for (
auto j : j_objs)
138 j[
"group_hash"] = crc32.checksum();
142 chained().write(issue);
144 catch(
const std::exception& e)
146 std::cout <<
"Producer error : " << e.what() <<
'\n';
virtual int line_number() const =0
virtual const char * user_name() const =0
virtual int user_id() const =0
virtual pid_t thread_id() const =0
virtual const char * host_name() const =0
virtual pid_t process_id() const =0
virtual const char * package_name() const =0
virtual const char * application_name() const =0
virtual const char * file_name() const =0
virtual const char * cwd() const =0
virtual const char * function_name() const =0
Base class for any user define issue.
const Context & context() const
Context of the issue.
ers::Severity severity() const
severity of the issue
virtual const char * get_class_name() const =0
Get key for class (used for serialisation)
const system_clock::time_point & ptime() const
original time point of the issue
const std::vector< std::string > & qualifiers() const
return array of qualifiers
const std::string & message() const
General cause of the issue.
const string_map & parameters() const
return array of parameters
std::time_t time_t() const
seconds since 1 Jan 1970
const Issue * cause() const
return the cause Issue of this Issue
ES stream implementation.
RdKafka::Producer * m_producer
void write(const ers::Issue &issue) override
void kafka_exporter(std::string input, std::string topic)
KafkaStream(const std::string ¶m)
void ers_to_json(const ers::Issue &issue, size_t chain, std::vector< nlohmann::json > &j_objs)
#define ERS_REGISTER_OUTPUT_STREAM(class, name, param)
std::string to_string(severity s)