DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
erskafka::KafkaStream Class Reference

ES stream implementation. More...

#include <KafkaStream.hpp>

Inheritance diagram for erskafka::KafkaStream:
[legend]
Collaboration diagram for erskafka::KafkaStream:
[legend]

Public Member Functions

 KafkaStream (const std::string &param)
void write (const ers::Issue &issue) override
Public Member Functions inherited from ers::OutputStream
virtual ~OutputStream ()
 Sends the issue into this stream.

Private Member Functions

void ers_to_json (const ers::Issue &issue, size_t chain, std::vector< nlohmann::json > &j_objs)
void kafka_exporter (std::string input, std::string topic)

Private Attributes

std::string m_partition
RdKafka::Producer * m_producer

Additional Inherited Members

Protected Member Functions inherited from ers::OutputStream
 OutputStream ()
OutputStreamchained ()
virtual bool isNull () const

Detailed Description

ES stream implementation.

This stream offers capability of publishing Issues to elastic search. A stream configuration is composed of the stream name, that is "erskafka".

Definition at line 28 of file KafkaStream.hpp.

Constructor & Destructor Documentation

◆ KafkaStream()

erskafka::KafkaStream::KafkaStream ( const std::string & param)
explicit

Definition at line 24 of file KafkaStream.cpp.

25 {
26
27 if(const char* env_p = std::getenv("DUNEDAQ_PARTITION"))
28 m_partition = env_p;
29
30 //Kafka server settings
31 std::string brokers = param;
32 std::string errstr;
33
34 RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
35 conf->set("bootstrap.servers", brokers, errstr);
36 if(errstr != ""){
37 std::cout << "Bootstrap server error : " << errstr << '\n';
38 }
39 if(const char* env_p = std::getenv("DUNEDAQ_APPLICATION_NAME"))
40 conf->set("client.id", env_p, errstr);
41 else
42 conf->set("client.id", "erskafkaproducerdefault", errstr);
43 if(errstr != ""){
44 std::cout << "Producer configuration error : " << errstr << '\n';
45 }
46 //Create producer instance
47 m_producer = RdKafka::Producer::create(conf, errstr);
48
49 if(errstr != ""){
50 std::cout << "Producer creation error : " << errstr << '\n';
51 }
52
53 }
RdKafka::Producer * m_producer

Member Function Documentation

◆ ers_to_json()

void erskafka::KafkaStream::ers_to_json ( const ers::Issue & issue,
size_t chain,
std::vector< nlohmann::json > & j_objs )
private

Definition at line 55 of file KafkaStream.cpp.

56 {
57 try
58 {
59 nlohmann::json message;
60 message["partition"] = m_partition.c_str();
61 message["issue_name"] = issue.get_class_name();
62 message["message"] = issue.message().c_str();
63 message["severity"] = ers::to_string(issue.severity());
64 message["usecs_since_epoch"] = std::chrono::duration_cast<std::chrono::microseconds>(issue.ptime().time_since_epoch()).count();
65 message["time"] = std::chrono::duration_cast<std::chrono::milliseconds>(issue.ptime().time_since_epoch()).count();
66
67 message["qualifiers"] = issue.qualifiers();
68 message["params"] = nlohmann::json::array({});
69 for (auto p : issue.parameters())
70 {
71 message["params"].push_back(p.first + ": " + p.second);
72 }
73 message["cwd"] = issue.context().cwd();
74 message["file_name"] = issue.context().file_name();
75 message["function_name"] = issue.context().function_name();
76 message["host_name"] = issue.context().host_name();
77 message["package_name"] = issue.context().package_name();
78 message["user_name"] = issue.context().user_name();
79 message["application_name"] = issue.context().application_name();
80 message["user_id"] = issue.context().user_id();
81 message["process_id"] = issue.context().process_id();
82 message["thread_id"] = issue.context().thread_id();
83 message["line_number"] = issue.context().line_number();
84 message["chain"] = chain;
85
86 if (issue.cause())
87 {
88 ers_to_json(*issue.cause(), 2, j_objs);
89 }
90 j_objs.push_back(message);
91 }
92 catch(const std::exception& e)
93 {
94 std::cout << "Conversion from json error : " << e.what() << '\n';
95 }
96 }
virtual int line_number() const =0
virtual const char * user_name() const =0
virtual int user_id() const =0
virtual pid_t thread_id() const =0
virtual const char * host_name() const =0
virtual pid_t process_id() const =0
virtual const char * package_name() const =0
virtual const char * application_name() const =0
virtual const char * file_name() const =0
virtual const char * cwd() const =0
virtual const char * function_name() const =0
const Context & context() const
Context of the issue.
Definition Issue.hpp:100
ers::Severity severity() const
severity of the issue
Definition Issue.hpp:112
virtual const char * get_class_name() const =0
Get key for class (used for serialisation).
const system_clock::time_point & ptime() const
original time point of the issue
Definition Issue.hpp:130
const std::vector< std::string > & qualifiers() const
return array of qualifiers
Definition Issue.hpp:106
const std::string & message() const
General cause of the issue.
Definition Issue.hpp:103
const string_map & parameters() const
return array of parameters
Definition Issue.hpp:109
const Issue * cause() const
return the cause Issue of this Issue
Definition Issue.hpp:97
void ers_to_json(const ers::Issue &issue, size_t chain, std::vector< nlohmann::json > &j_objs)
Only Configuration DB opened by rdbconfig or oksconflibs plug in can be message
std::string to_string(severity s)

◆ kafka_exporter()

void erskafka::KafkaStream::kafka_exporter ( std::string input,
std::string topic )
private

Definition at line 98 of file KafkaStream.cpp.

99 {
100 try
101 {
102 // RdKafka::Producer::RK_MSG_COPY to be investigated
103 RdKafka::ErrorCode err = m_producer->produce(topic, RdKafka::Topic::PARTITION_UA, RdKafka::Producer::RK_MSG_COPY, const_cast<char *>(input.c_str()), input.size(), nullptr, 0, 0, nullptr, nullptr);
104 if (err != RdKafka::ERR_NO_ERROR) { std::cout << "% Failed to produce to topic " << topic << ": " << RdKafka::err2str(err) << std::endl;}
105 }
106 catch(const std::exception& e)
107 {
108 std::cout << "Producer error : " << e.what() << '\n';
109 }
110 }

◆ write()

void erskafka::KafkaStream::write ( const ers::Issue & issue)
overridevirtual

Write method

Parameters
issueissue to be sent.

Implements ers::OutputStream.

Definition at line 115 of file KafkaStream.cpp.

116 {
117 try
118 {
119 std::vector<nlohmann::json> j_objs;
120 if (issue.cause() == nullptr)
121 {
122 ers_to_json(issue, 0, j_objs);
123 }
124 else
125 {
126 ers_to_json(issue, 1, j_objs);
127 }
128
129 // build a unique hash for a group of nested issues
130 std::ostringstream tmpstream(issue.message());
131 tmpstream << issue.context().process_id() << issue.time_t() << issue.context().application_name() << issue.context().host_name() << rand();
132 std::string tmp = tmpstream.str();
133 boost::crc_32_type crc32;
134 crc32.process_bytes(tmp.c_str(), tmp.length());
135
136 for (auto j : j_objs)
137 {
138 j["group_hash"] = crc32.checksum();
139
140 erskafka::KafkaStream::kafka_exporter(j.dump(), "erskafka-reporting");
141 }
142 chained().write(issue);
143 }
144 catch(const std::exception& e)
145 {
146 std::cout << "Producer error : " << e.what() << '\n';
147 }
148 }
std::time_t time_t() const
seconds since 1 Jan 1970
Definition Issue.cpp:135
virtual void write(const Issue &issue)=0
OutputStream & chained()
void kafka_exporter(std::string input, std::string topic)

Member Data Documentation

◆ m_partition

std::string erskafka::KafkaStream::m_partition
private

Definition at line 35 of file KafkaStream.hpp.

◆ m_producer

RdKafka::Producer* erskafka::KafkaStream::m_producer
private

Definition at line 36 of file KafkaStream.hpp.


The documentation for this class was generated from the following files: