Line data Source code
1 : /**
2 : * @file KafkaStream.cpp KafkaStream class implementation
3 : *
4 : * This is part of the DUNE DAQ software, copyright 2020.
5 : * Licensing/copyright details are in the COPYING file that you should have
6 : * received with this code.
7 : */
8 :
9 : #include "KafkaStream.hpp"
10 : #include <ers/StreamFactory.hpp>
11 : #include <string>
12 : #include <iostream>
13 : #include <chrono>
14 : #include <boost/crc.hpp>
15 : #include <vector>
16 :
17 0 : ERS_REGISTER_OUTPUT_STREAM(erskafka::KafkaStream, "erskafka", param)
18 :
19 : /** Constructor that creates a new instance of the erskafka stream with the given configuration.
20 : * \param format elastic search connection string.
21 : */
22 : namespace erskafka
23 : {
24 0 : erskafka::KafkaStream::KafkaStream(const std::string ¶m)
25 : {
26 :
27 0 : if(const char* env_p = std::getenv("DUNEDAQ_PARTITION"))
28 0 : m_partition = env_p;
29 :
30 : //Kafka server settings
31 0 : std::string brokers = param;
32 0 : std::string errstr;
33 :
34 0 : RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
35 0 : conf->set("bootstrap.servers", brokers, errstr);
36 0 : if(errstr != ""){
37 0 : std::cout << "Bootstrap server error : " << errstr << '\n';
38 : }
39 0 : if(const char* env_p = std::getenv("DUNEDAQ_APPLICATION_NAME"))
40 0 : conf->set("client.id", env_p, errstr);
41 : else
42 0 : conf->set("client.id", "erskafkaproducerdefault", errstr);
43 0 : if(errstr != ""){
44 0 : std::cout << "Producer configuration error : " << errstr << '\n';
45 : }
46 : //Create producer instance
47 0 : m_producer = RdKafka::Producer::create(conf, errstr);
48 :
49 0 : if(errstr != ""){
50 0 : std::cout << "Producer creation error : " << errstr << '\n';
51 : }
52 :
53 0 : }
54 :
55 0 : void erskafka::KafkaStream::ers_to_json(const ers::Issue &issue, size_t chain, std::vector<nlohmann::json> &j_objs)
56 : {
57 0 : try
58 : {
59 0 : nlohmann::json message;
60 0 : message["partition"] = m_partition.c_str();
61 0 : message["issue_name"] = issue.get_class_name();
62 0 : message["message"] = issue.message().c_str();
63 0 : message["severity"] = ers::to_string(issue.severity());
64 0 : message["usecs_since_epoch"] = std::chrono::duration_cast<std::chrono::microseconds>(issue.ptime().time_since_epoch()).count();
65 0 : message["time"] = std::chrono::duration_cast<std::chrono::milliseconds>(issue.ptime().time_since_epoch()).count();
66 :
67 0 : message["qualifiers"] = issue.qualifiers();
68 0 : message["params"] = nlohmann::json::array({});
69 0 : for (auto p : issue.parameters())
70 : {
71 0 : message["params"].push_back(p.first + ": " + p.second);
72 0 : }
73 0 : message["cwd"] = issue.context().cwd();
74 0 : message["file_name"] = issue.context().file_name();
75 0 : message["function_name"] = issue.context().function_name();
76 0 : message["host_name"] = issue.context().host_name();
77 0 : message["package_name"] = issue.context().package_name();
78 0 : message["user_name"] = issue.context().user_name();
79 0 : message["application_name"] = issue.context().application_name();
80 0 : message["user_id"] = issue.context().user_id();
81 0 : message["process_id"] = issue.context().process_id();
82 0 : message["thread_id"] = issue.context().thread_id();
83 0 : message["line_number"] = issue.context().line_number();
84 0 : message["chain"] = chain;
85 :
86 0 : if (issue.cause())
87 : {
88 0 : ers_to_json(*issue.cause(), 2, j_objs);
89 : }
90 0 : j_objs.push_back(message);
91 0 : }
92 0 : catch(const std::exception& e)
93 : {
94 0 : std::cout << "Conversion from json error : " << e.what() << '\n';
95 0 : }
96 0 : }
97 :
98 0 : void erskafka::KafkaStream::kafka_exporter(std::string input, std::string topic)
99 : {
100 0 : try
101 : {
102 : // RdKafka::Producer::RK_MSG_COPY to be investigated
103 0 : RdKafka::ErrorCode err = m_producer->produce(topic, RdKafka::Topic::PARTITION_UA, RdKafka::Producer::RK_MSG_COPY, const_cast<char *>(input.c_str()), input.size(), nullptr, 0, 0, nullptr, nullptr);
104 0 : if (err != RdKafka::ERR_NO_ERROR) { std::cout << "% Failed to produce to topic " << topic << ": " << RdKafka::err2str(err) << std::endl;}
105 : }
106 0 : catch(const std::exception& e)
107 : {
108 0 : std::cout << "Producer error : " << e.what() << '\n';
109 0 : }
110 0 : }
111 :
112 : /** Write method
113 : * \param issue issue to be sent.
114 : */
115 0 : void erskafka::KafkaStream::write(const ers::Issue &issue)
116 : {
117 0 : try
118 : {
119 0 : std::vector<nlohmann::json> j_objs;
120 0 : if (issue.cause() == nullptr)
121 : {
122 0 : ers_to_json(issue, 0, j_objs);
123 : }
124 : else
125 : {
126 0 : ers_to_json(issue, 1, j_objs);
127 : }
128 :
129 : // build a unique hash for a group of nested issues
130 0 : std::ostringstream tmpstream(issue.message());
131 0 : tmpstream << issue.context().process_id() << issue.time_t() << issue.context().application_name() << issue.context().host_name() << rand();
132 0 : std::string tmp = tmpstream.str();
133 0 : boost::crc_32_type crc32;
134 0 : crc32.process_bytes(tmp.c_str(), tmp.length());
135 :
136 0 : for (auto j : j_objs)
137 : {
138 0 : j["group_hash"] = crc32.checksum();
139 :
140 0 : erskafka::KafkaStream::kafka_exporter(j.dump(), "erskafka-reporting");
141 0 : }
142 0 : chained().write(issue);
143 0 : }
144 0 : catch(const std::exception& e)
145 : {
146 0 : std::cout << "Producer error : " << e.what() << '\n';
147 0 : }
148 0 : }
149 : } // namespace erskafka
|