DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
kafka_opmon_consumer.cxx
Go to the documentation of this file.
1// * This is part of the DUNE DAQ Application Framework, copyright 2020.
2// * Licensing/copyright details are in the COPYING file that you should have received with this code.
3#include "JsonInfluxConverter.hpp"
4#include "boost/program_options.hpp"
5#include "cpr/cpr.h"
6#include "curl/curl.h"
7#include "ers/ers.hpp"
8#include "librdkafka/rdkafkacpp.h"
9#include "nlohmann/json.hpp"
10#include <csignal>
11#include <cstdio>
12#include <cstdlib>
13#include <cstring>
14#include <iomanip>
15#include <iostream>
16#include <regex.h>
17#include <sstream>
18#include <string>
19#include <sys/time.h>
20#include <utility>
21#include <vector>
22
23#include <memory>
24
25namespace dunedaq { // namespace dunedaq
26
27ERS_DECLARE_ISSUE(kafkaopmon, CannotPostToDb, "Cannot post to Influx DB : " << error, ((std::string)error))
28
29ERS_DECLARE_ISSUE(kafkaopmon, CannotCreateConsumer, "Cannot create consumer : " << fatal, ((std::string)fatal))
30
31ERS_DECLARE_ISSUE(kafkaopmon, CannotConsumeMessage, "Cannot consume message : " << error, ((std::string)error))
32
33ERS_DECLARE_ISSUE(kafkaopmon, IncorrectParameters, "Incorrect parameters : " << fatal, ((std::string)fatal))
34} // namespace dunedaq
35
36namespace bpo = boost::program_options;
37
38static volatile sig_atomic_t run = 1;
39static dunedaq::influxopmon::JsonConverter m_json_converter;
40static std::vector<std::string> inserts_vectors;
41static uint seed = 0;
42
43static int64_t
45{
46 struct timeval tv;
47 gettimeofday(&tv, nullptr);
48 return ((int64_t)tv.tv_sec * 1000) + (tv.tv_usec / 1000);
49}
50
51static std::vector<std::unique_ptr<RdKafka::Message>>
52consume_batch(RdKafka::KafkaConsumer& consumer, size_t batch_size, int batch_tmout)
53{
54
55 std::vector<std::unique_ptr<RdKafka::Message>> msgs;
56 msgs.reserve(batch_size);
57
58 int64_t end = now() + batch_tmout;
59 int remaining_timeout = batch_tmout;
60
61 while (msgs.size() < batch_size) {
62 auto msg = std::unique_ptr<RdKafka::Message>(consumer.consume(remaining_timeout));
63
64 if (msg == nullptr)
65 ers::error(dunedaq::kafkaopmon::CannotConsumeMessage(ERS_HERE, "%% Consumer error: Message is null"));
66
67 switch (msg->err()) {
68 case RdKafka::ERR__TIMED_OUT:
69 return msgs;
70
71 case RdKafka::ERR_NO_ERROR:
72 msgs.push_back(std::move(msg));
73 break;
74
75 default:
77 dunedaq::kafkaopmon::CannotConsumeMessage(ERS_HERE, "%% Unhandled consumer error: " + msg->errstr()));
78 run = 0;
79 return msgs;
80 }
81
82 remaining_timeout = end - now();
83 if (remaining_timeout < 0)
84 break;
85 }
86
87 return msgs;
88}
89
90void
91execution_command(const std::string& adress, const std::string& cmd)
92{
93
94 cpr::Response response = cpr::Post(cpr::Url{ adress }, cpr::Body{ cmd });
95 // std::cout << cmd << std::endl;
96 if (response.status_code >= 400) {
97 ers::error(dunedaq::kafkaopmon::CannotPostToDb(
98 ERS_HERE, "Error [" + std::to_string(response.status_code) + "] making request"));
99 } else if (response.status_code == 0) {
100 ers::error(dunedaq::kafkaopmon::CannotPostToDb(ERS_HERE, "Query returned 0"));
101 }
102}
103
104void
105consumerLoop(RdKafka::KafkaConsumer& consumer, int batch_size, int batch_tmout, std::string adress)
106{
107
108 std::cout << "Using query: " << adress << std::endl;
109
110 while (run) {
111 auto msgs = consume_batch(consumer, batch_size, batch_tmout);
112 for (auto& msg : msgs) {
113
114 // execution_command(adress, message_text);
115 std::string json_string(static_cast<char*>(msg->payload()), msg->len());
116
117 // std::cout << json_string << std::endl;
118 // json message = json::parse( json_string );
119
120 // std::string query;
121 // query = message["type"].get<std::string>() + ',';
122 // query += ("source_id="+message["source_id"].get<std::string>()+',');
123 // query += ("partition_id="+message["partition_id"].get<std::string>()+' ');
124 // const auto & data = message["__data"];
125 // std::stringstream data_stream;
126 // for ( auto it = data.begin() ; it != data.end() ; ++it ) {
127 // if ( it != data.begin() ) data_stream << ',' ;
128 // data_stream << it.key() << '=' << it.value() ;
129 // }
130 // query += data_stream.str();
131 // query += ' ';
132 // query += std::to_string(message["__time"].get<uint64_t>() * 1000000000);
133
134 // execution_command(adress, query);
135 }
136 }
137}
138
139int
140main(int argc, char* argv[])
141{
142 std::string broker;
143 std::string topic;
144 std::string db_host;
145 std::string db_port;
146 std::string db_path;
147 std::string db_dbname;
148 std::string topic_str;
149 std::vector<std::string> topics;
150 // Bulk consume parameters
151 int batch_size = 100;
152 int batch_tmout = 1000;
153 // Kafka server settings
154 std::string errstr;
155
156 // get parameters
157
158 bpo::options_description desc{ "example: -broker 188.185.122.48:9092 -topic kafkaopmon-reporting -dbhost "
159 "188.185.88.195 -dbport 80 -dbpath insert -dbname db1" };
160 desc.add_options()("help,h", "Help screen")(
161 "broker,b", bpo::value<std::string>()->default_value("monkafka.cern.ch:30092"), "Broker")(
162 "topic,t", bpo::value<std::string>()->default_value("opmon"), "Topic")(
163 "dbhost,ho", bpo::value<std::string>()->default_value("opmondb.cern.ch"), "Database host")(
164 "dbport,po", bpo::value<std::string>()->default_value("31002"), "Database port")(
165 "dbpath,pa", bpo::value<std::string>()->default_value("write"), "Database path")(
166 "dbname,n", bpo::value<std::string>()->default_value("influxdb"), "Database name");
167
168 bpo::variables_map vm;
169
170 try {
171 auto parsed = bpo::command_line_parser(argc, argv).options(desc).run();
172 bpo::store(parsed, vm);
173 } catch (bpo::error const& e) {
174 ers::error(dunedaq::kafkaopmon::IncorrectParameters(ERS_HERE, e.what()));
175 }
176
177 if (vm.count("help")) {
178 TLOG() << desc << std::endl;
179 return 0;
180 }
181
182 broker = vm["broker"].as<std::string>();
183 topic = vm["topic"].as<std::string>();
184 db_host = vm["dbhost"].as<std::string>();
185 db_port = vm["dbport"].as<std::string>();
186 db_path = vm["dbpath"].as<std::string>();
187 db_dbname = vm["dbname"].as<std::string>();
188
189 // Broker parameters
190
191 try {
192 auto conf = std::unique_ptr<RdKafka::Conf>(RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL));
193
194 srand((unsigned)time(0));
195 std::string group_id = "opmon_microservices";
196
197 conf->set("bootstrap.servers", broker, errstr);
198 if (errstr != "") {
199 ers::fatal(dunedaq::kafkaopmon::CannotCreateConsumer(ERS_HERE, errstr));
200 }
201 conf->set("client.id", "opmon_microservice-0", errstr);
202 if (errstr != "") {
203 ers::fatal(dunedaq::kafkaopmon::CannotCreateConsumer(ERS_HERE, errstr));
204 }
205 conf->set("group.id", group_id, errstr);
206 if (errstr != "") {
207 ers::fatal(dunedaq::kafkaopmon::CannotCreateConsumer(ERS_HERE, errstr));
208 }
209 topics.push_back(topic);
210
211 auto consumer = std::unique_ptr<RdKafka::KafkaConsumer>(RdKafka::KafkaConsumer::create(conf.get(), errstr));
212
213 if (errstr != "") {
214 ers::fatal(dunedaq::kafkaopmon::CannotCreateConsumer(ERS_HERE, errstr));
215 }
216
217 if (consumer)
218 consumer->subscribe(topics);
219 else
220 ers::fatal(dunedaq::kafkaopmon::CannotCreateConsumer(ERS_HERE, errstr));
221
222 consumerLoop(*consumer, batch_size, batch_tmout, db_host + ":" + db_port + "/" + db_path + "?db=" + db_dbname);
223
224 consumer->close();
225 } catch (ers::IssueCatcherAlreadySet& ex) {
226 ers::error(ex);
227 }
228
229 return 0;
230}
#define ERS_DECLARE_ISSUE(namespace_name, class_name, message, attributes)
#define ERS_HERE
int main(int argc, char **argv)
static dunedaq::influxopmon::JsonConverter m_json_converter
static std::vector< std::string > inserts_vectors
void execution_command(const std::string &adress, const std::string &cmd)
static uint seed
void consumerLoop(RdKafka::KafkaConsumer &consumer, int batch_size, int batch_tmout, std::string adress)
static int64_t now()
static std::vector< std::unique_ptr< RdKafka::Message > > consume_batch(RdKafka::KafkaConsumer &consumer, size_t batch_size, int batch_tmout)
static volatile sig_atomic_t run
#define TLOG(...)
Definition macro.hpp:22
Definition file.hpp:28
Including Qt Headers.
Cannot create std::string fatal Incorrect parameters
Cannot create consumer
Cannot create std::string fatal IncorrectParameters
void fatal(const Issue &issue)
Definition ers.hpp:88
void error(const Issue &issue)
Definition ers.hpp:81
Factory couldn t std::string alg_name Invalid configuration error
Definition Issues.hpp:34