DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
erskafka::KafkaStream Class Reference

ES stream implementation. More...

#include <KafkaStream.hpp>

Inheritance diagram for erskafka::KafkaStream:
[legend]
Collaboration diagram for erskafka::KafkaStream:
[legend]

Public Member Functions

 KafkaStream (const std::string &param)
 
void write (const ers::Issue &issue) override
 
- Public Member Functions inherited from ers::OutputStream
virtual ~OutputStream ()
 Sends the issue into this stream.
 

Private Member Functions

void ers_to_json (const ers::Issue &issue, size_t chain, std::vector< nlohmann::json > &j_objs)
 
void kafka_exporter (std::string input, std::string topic)
 

Private Attributes

std::string m_partition
 
RdKafka::Producer * m_producer
 

Additional Inherited Members

- Protected Member Functions inherited from ers::OutputStream
 OutputStream ()
 
OutputStreamchained ()
 
virtual bool isNull () const
 

Detailed Description

ES stream implementation.

This stream offers capability of publishing Issues to elastic search. A stream configuration is composed of the stream name, that is "erskafka".

Definition at line 28 of file KafkaStream.hpp.

Constructor & Destructor Documentation

◆ KafkaStream()

erskafka::KafkaStream::KafkaStream ( const std::string & param)
explicit

Definition at line 24 of file KafkaStream.cpp.

25 {
26
27 if(const char* env_p = std::getenv("DUNEDAQ_PARTITION"))
28 m_partition = env_p;
29
30 //Kafka server settings
31 std::string brokers = param;
32 std::string errstr;
33
34 RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
35 conf->set("bootstrap.servers", brokers, errstr);
36 if(errstr != ""){
37 std::cout << "Bootstrap server error : " << errstr << '\n';
38 }
39 if(const char* env_p = std::getenv("DUNEDAQ_APPLICATION_NAME"))
40 conf->set("client.id", env_p, errstr);
41 else
42 conf->set("client.id", "erskafkaproducerdefault", errstr);
43 if(errstr != ""){
44 std::cout << "Producer configuration error : " << errstr << '\n';
45 }
46 //Create producer instance
47 m_producer = RdKafka::Producer::create(conf, errstr);
48
49 if(errstr != ""){
50 std::cout << "Producer creation error : " << errstr << '\n';
51 }
52
53 }
RdKafka::Producer * m_producer

Member Function Documentation

◆ ers_to_json()

void erskafka::KafkaStream::ers_to_json ( const ers::Issue & issue,
size_t chain,
std::vector< nlohmann::json > & j_objs )
private

Definition at line 55 of file KafkaStream.cpp.

56 {
57 try
58 {
59 nlohmann::json message;
60 message["partition"] = m_partition.c_str();
61 message["issue_name"] = issue.get_class_name();
62 message["message"] = issue.message().c_str();
63 message["severity"] = ers::to_string(issue.severity());
64 message["usecs_since_epoch"] = std::chrono::duration_cast<std::chrono::microseconds>(issue.ptime().time_since_epoch()).count();
65 message["time"] = std::chrono::duration_cast<std::chrono::milliseconds>(issue.ptime().time_since_epoch()).count();
66
67 message["qualifiers"] = issue.qualifiers();
68 message["params"] = nlohmann::json::array({});
69 for (auto p : issue.parameters())
70 {
71 message["params"].push_back(p.first + ": " + p.second);
72 }
73 message["cwd"] = issue.context().cwd();
74 message["file_name"] = issue.context().file_name();
75 message["function_name"] = issue.context().function_name();
76 message["host_name"] = issue.context().host_name();
77 message["package_name"] = issue.context().package_name();
78 message["user_name"] = issue.context().user_name();
79 message["application_name"] = issue.context().application_name();
80 message["user_id"] = issue.context().user_id();
81 message["process_id"] = issue.context().process_id();
82 message["thread_id"] = issue.context().thread_id();
83 message["line_number"] = issue.context().line_number();
84 message["chain"] = chain;
85
86 if (issue.cause())
87 {
88 ers_to_json(*issue.cause(), 2, j_objs);
89 }
90 j_objs.push_back(message);
91 }
92 catch(const std::exception& e)
93 {
94 std::cout << "Conversion from json error : " << e.what() << '\n';
95 }
96 }
virtual int line_number() const =0
virtual const char * user_name() const =0
virtual int user_id() const =0
virtual pid_t thread_id() const =0
virtual const char * host_name() const =0
virtual pid_t process_id() const =0
virtual const char * package_name() const =0
virtual const char * application_name() const =0
virtual const char * file_name() const =0
virtual const char * cwd() const =0
virtual const char * function_name() const =0
const Context & context() const
Context of the issue.
Definition Issue.hpp:100
ers::Severity severity() const
severity of the issue
Definition Issue.hpp:112
virtual const char * get_class_name() const =0
Get key for class (used for serialisation)
const system_clock::time_point & ptime() const
original time point of the issue
Definition Issue.hpp:130
const std::vector< std::string > & qualifiers() const
return array of qualifiers
Definition Issue.hpp:106
const std::string & message() const
General cause of the issue.
Definition Issue.hpp:103
const string_map & parameters() const
return array of parameters
Definition Issue.hpp:109
const Issue * cause() const
return the cause Issue of this Issue
Definition Issue.hpp:97
void ers_to_json(const ers::Issue &issue, size_t chain, std::vector< nlohmann::json > &j_objs)
std::string to_string(severity s)

◆ kafka_exporter()

void erskafka::KafkaStream::kafka_exporter ( std::string input,
std::string topic )
private

Definition at line 98 of file KafkaStream.cpp.

99 {
100 try
101 {
102 // RdKafka::Producer::RK_MSG_COPY to be investigated
103 RdKafka::ErrorCode err = m_producer->produce(topic, RdKafka::Topic::PARTITION_UA, RdKafka::Producer::RK_MSG_COPY, const_cast<char *>(input.c_str()), input.size(), nullptr, 0, 0, nullptr, nullptr);
104 if (err != RdKafka::ERR_NO_ERROR) { std::cout << "% Failed to produce to topic " << topic << ": " << RdKafka::err2str(err) << std::endl;}
105 }
106 catch(const std::exception& e)
107 {
108 std::cout << "Producer error : " << e.what() << '\n';
109 }
110 }

◆ write()

void erskafka::KafkaStream::write ( const ers::Issue & issue)
overridevirtual

Implements ers::OutputStream.

Member Data Documentation

◆ m_partition

std::string erskafka::KafkaStream::m_partition
private

Definition at line 35 of file KafkaStream.hpp.

◆ m_producer

RdKafka::Producer* erskafka::KafkaStream::m_producer
private

Definition at line 36 of file KafkaStream.hpp.


The documentation for this class was generated from the following files: