141{
142 std::string broker;
143 std::string topic;
144 std::string db_host;
145 std::string db_port;
146 std::string db_path;
147 std::string db_dbname;
148 std::string topic_str;
149 std::vector<std::string> topics;
150
151 int batch_size = 100;
152 int batch_tmout = 1000;
153
154 std::string errstr;
155
156
157
158 bpo::options_description desc{ "example: -broker 188.185.122.48:9092 -topic kafkaopmon-reporting -dbhost "
159 "188.185.88.195 -dbport 80 -dbpath insert -dbname db1" };
160 desc.add_options()("help,h", "Help screen")(
161 "broker,b", bpo::value<std::string>()->default_value("monkafka.cern.ch:30092"), "Broker")(
162 "topic,t", bpo::value<std::string>()->default_value("opmon"), "Topic")(
163 "dbhost,ho", bpo::value<std::string>()->default_value("opmondb.cern.ch"), "Database host")(
164 "dbport,po", bpo::value<std::string>()->default_value("31002"), "Database port")(
165 "dbpath,pa", bpo::value<std::string>()->default_value("write"), "Database path")(
166 "dbname,n", bpo::value<std::string>()->default_value("influxdb"), "Database name");
167
168 bpo::variables_map vm;
169
170 try {
171 auto parsed = bpo::command_line_parser(argc, argv).options(desc).run();
172 bpo::store(parsed, vm);
173 } catch (bpo::error const& e) {
175 }
176
177 if (vm.count("help")) {
178 TLOG() << desc << std::endl;
179 return 0;
180 }
181
182 broker = vm["broker"].as<std::string>();
183 topic = vm["topic"].as<std::string>();
184 db_host = vm["dbhost"].as<std::string>();
185 db_port = vm["dbport"].as<std::string>();
186 db_path = vm["dbpath"].as<std::string>();
187 db_dbname = vm["dbname"].as<std::string>();
188
189
190
191 try {
192 auto conf = std::unique_ptr<RdKafka::Conf>(RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL));
193
194 srand((unsigned)time(0));
195 std::string group_id = "opmon_microservices";
196
197 conf->set("bootstrap.servers", broker, errstr);
198 if (errstr != "") {
200 }
201 conf->set("client.id", "opmon_microservice-0", errstr);
202 if (errstr != "") {
204 }
205 conf->set("group.id", group_id, errstr);
206 if (errstr != "") {
208 }
209 topics.push_back(topic);
210
211 auto consumer = std::unique_ptr<RdKafka::KafkaConsumer>(RdKafka::KafkaConsumer::create(conf.get(), errstr));
212
213 if (errstr != "") {
215 }
216
217 if (consumer)
219 else
221
222 consumerLoop(*consumer, batch_size, batch_tmout, db_host +
":" + db_port +
"/" + db_path +
"?db=" + db_dbname);
223
225 } catch (ers::IssueCatcherAlreadySet& ex) {
227 }
228
229 return 0;
230}
void consumerLoop(RdKafka::KafkaConsumer &consumer, int batch_size, int batch_tmout, std::string adress)
void fatal(const Issue &issue)