DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
KafkaStream.cpp
Go to the documentation of this file.
1
9#include "KafkaStream.hpp"
10#include <ers/StreamFactory.hpp>
11#include <string>
12#include <iostream>
13#include <chrono>
14#include <boost/crc.hpp>
15#include <vector>
16
18
19
22namespace erskafka
23{
24 erskafka::KafkaStream::KafkaStream(const std::string &param)
25 {
26
27 if(const char* env_p = std::getenv("DUNEDAQ_PARTITION"))
28 m_partition = env_p;
29
30 //Kafka server settings
31 std::string brokers = param;
32 std::string errstr;
33
34 RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
35 conf->set("bootstrap.servers", brokers, errstr);
36 if(errstr != ""){
37 std::cout << "Bootstrap server error : " << errstr << '\n';
38 }
39 if(const char* env_p = std::getenv("DUNEDAQ_APPLICATION_NAME"))
40 conf->set("client.id", env_p, errstr);
41 else
42 conf->set("client.id", "erskafkaproducerdefault", errstr);
43 if(errstr != ""){
44 std::cout << "Producer configuration error : " << errstr << '\n';
45 }
46 //Create producer instance
47 m_producer = RdKafka::Producer::create(conf, errstr);
48
49 if(errstr != ""){
50 std::cout << "Producer creation error : " << errstr << '\n';
51 }
52
53 }
54
55 void erskafka::KafkaStream::ers_to_json(const ers::Issue &issue, size_t chain, std::vector<nlohmann::json> &j_objs)
56 {
57 try
58 {
59 nlohmann::json message;
60 message["partition"] = m_partition.c_str();
61 message["issue_name"] = issue.get_class_name();
62 message["message"] = issue.message().c_str();
63 message["severity"] = ers::to_string(issue.severity());
64 message["usecs_since_epoch"] = std::chrono::duration_cast<std::chrono::microseconds>(issue.ptime().time_since_epoch()).count();
65 message["time"] = std::chrono::duration_cast<std::chrono::milliseconds>(issue.ptime().time_since_epoch()).count();
66
67 message["qualifiers"] = issue.qualifiers();
68 message["params"] = nlohmann::json::array({});
69 for (auto p : issue.parameters())
70 {
71 message["params"].push_back(p.first + ": " + p.second);
72 }
73 message["cwd"] = issue.context().cwd();
74 message["file_name"] = issue.context().file_name();
75 message["function_name"] = issue.context().function_name();
76 message["host_name"] = issue.context().host_name();
77 message["package_name"] = issue.context().package_name();
78 message["user_name"] = issue.context().user_name();
79 message["application_name"] = issue.context().application_name();
80 message["user_id"] = issue.context().user_id();
81 message["process_id"] = issue.context().process_id();
82 message["thread_id"] = issue.context().thread_id();
83 message["line_number"] = issue.context().line_number();
84 message["chain"] = chain;
85
86 if (issue.cause())
87 {
88 ers_to_json(*issue.cause(), 2, j_objs);
89 }
90 j_objs.push_back(message);
91 }
92 catch(const std::exception& e)
93 {
94 std::cout << "Conversion from json error : " << e.what() << '\n';
95 }
96 }
97
98 void erskafka::KafkaStream::kafka_exporter(std::string input, std::string topic)
99 {
100 try
101 {
102 // RdKafka::Producer::RK_MSG_COPY to be investigated
103 RdKafka::ErrorCode err = m_producer->produce(topic, RdKafka::Topic::PARTITION_UA, RdKafka::Producer::RK_MSG_COPY, const_cast<char *>(input.c_str()), input.size(), nullptr, 0, 0, nullptr, nullptr);
104 if (err != RdKafka::ERR_NO_ERROR) { std::cout << "% Failed to produce to topic " << topic << ": " << RdKafka::err2str(err) << std::endl;}
105 }
106 catch(const std::exception& e)
107 {
108 std::cout << "Producer error : " << e.what() << '\n';
109 }
110 }
111
116 {
117 try
118 {
119 std::vector<nlohmann::json> j_objs;
120 if (issue.cause() == nullptr)
121 {
122 ers_to_json(issue, 0, j_objs);
123 }
124 else
125 {
126 ers_to_json(issue, 1, j_objs);
127 }
128
129 // build a unique hash for a group of nested issues
130 std::ostringstream tmpstream(issue.message());
131 tmpstream << issue.context().process_id() << issue.time_t() << issue.context().application_name() << issue.context().host_name() << rand();
132 std::string tmp = tmpstream.str();
133 boost::crc_32_type crc32;
134 crc32.process_bytes(tmp.c_str(), tmp.length());
135
136 for (auto j : j_objs)
137 {
138 j["group_hash"] = crc32.checksum();
139
140 erskafka::KafkaStream::kafka_exporter(j.dump(), "erskafka-reporting");
141 }
142 chained().write(issue);
143 }
144 catch(const std::exception& e)
145 {
146 std::cout << "Producer error : " << e.what() << '\n';
147 }
148 }
149} // namespace erskafka
virtual int line_number() const =0
virtual const char * user_name() const =0
virtual int user_id() const =0
virtual pid_t thread_id() const =0
virtual const char * host_name() const =0
virtual pid_t process_id() const =0
virtual const char * package_name() const =0
virtual const char * application_name() const =0
virtual const char * file_name() const =0
virtual const char * cwd() const =0
virtual const char * function_name() const =0
Base class for any user define issue.
Definition Issue.hpp:69
const Context & context() const
Context of the issue.
Definition Issue.hpp:100
ers::Severity severity() const
severity of the issue
Definition Issue.hpp:112
virtual const char * get_class_name() const =0
Get key for class (used for serialisation)
const system_clock::time_point & ptime() const
original time point of the issue
Definition Issue.hpp:130
const std::vector< std::string > & qualifiers() const
return array of qualifiers
Definition Issue.hpp:106
const std::string & message() const
General cause of the issue.
Definition Issue.hpp:103
const string_map & parameters() const
return array of parameters
Definition Issue.hpp:109
std::time_t time_t() const
seconds since 1 Jan 1970
Definition Issue.cpp:135
const Issue * cause() const
return the cause Issue of this Issue
Definition Issue.hpp:97
ES stream implementation.
RdKafka::Producer * m_producer
void write(const ers::Issue &issue) override
void kafka_exporter(std::string input, std::string topic)
KafkaStream(const std::string &param)
void ers_to_json(const ers::Issue &issue, size_t chain, std::vector< nlohmann::json > &j_objs)
#define ERS_REGISTER_OUTPUT_STREAM(class, name, param)
Definition macro.hpp:16
std::string to_string(severity s)