55 std::vector<std::unique_ptr<RdKafka::Message>> msgs;
56 msgs.reserve(batch_size);
58 int64_t end =
now() + batch_tmout;
59 int remaining_timeout = batch_tmout;
61 while (msgs.size() < batch_size) {
62 auto msg = std::unique_ptr<RdKafka::Message>(
consumer.consume(remaining_timeout));
65 ers::error(dunedaq::kafkaopmon::CannotConsumeMessage(
ERS_HERE,
"%% Consumer error: Message is null"));
68 case RdKafka::ERR__TIMED_OUT:
71 case RdKafka::ERR_NO_ERROR:
72 msgs.push_back(std::move(msg));
77 dunedaq::kafkaopmon::CannotConsumeMessage(
ERS_HERE,
"%% Unhandled consumer error: " + msg->errstr()));
82 remaining_timeout = end -
now();
83 if (remaining_timeout < 0)
94 cpr::Response response = cpr::Post(cpr::Url{ adress }, cpr::Body{ cmd });
96 if (response.status_code >= 400) {
97 ers::error(dunedaq::kafkaopmon::CannotPostToDb(
98 ERS_HERE,
"Error [" + std::to_string(response.status_code) +
"] making request"));
99 }
else if (response.status_code == 0) {
147 std::string db_dbname;
148 std::string topic_str;
149 std::vector<std::string> topics;
151 int batch_size = 100;
152 int batch_tmout = 1000;
158 bpo::options_description desc{
"example: -broker 188.185.122.48:9092 -topic kafkaopmon-reporting -dbhost "
159 "188.185.88.195 -dbport 80 -dbpath insert -dbname db1" };
160 desc.add_options()(
"help,h",
"Help screen")(
161 "broker,b", bpo::value<std::string>()->default_value(
"monkafka.cern.ch:30092"),
"Broker")(
162 "topic,t", bpo::value<std::string>()->default_value(
"opmon"),
"Topic")(
163 "dbhost,ho", bpo::value<std::string>()->default_value(
"opmondb.cern.ch"),
"Database host")(
164 "dbport,po", bpo::value<std::string>()->default_value(
"31002"),
"Database port")(
165 "dbpath,pa", bpo::value<std::string>()->default_value(
"write"),
"Database path")(
166 "dbname,n", bpo::value<std::string>()->default_value(
"influxdb"),
"Database name");
168 bpo::variables_map vm;
171 auto parsed = bpo::command_line_parser(argc, argv).options(desc).run();
172 bpo::store(parsed, vm);
173 }
catch (bpo::error
const& e) {
177 if (vm.count(
"help")) {
178 TLOG() << desc << std::endl;
182 broker = vm[
"broker"].as<std::string>();
183 topic = vm[
"topic"].as<std::string>();
184 db_host = vm[
"dbhost"].as<std::string>();
185 db_port = vm[
"dbport"].as<std::string>();
186 db_path = vm[
"dbpath"].as<std::string>();
187 db_dbname = vm[
"dbname"].as<std::string>();
192 auto conf = std::unique_ptr<RdKafka::Conf>(RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL));
194 srand((
unsigned)time(0));
195 std::string group_id =
"opmon_microservices";
197 conf->set(
"bootstrap.servers", broker, errstr);
201 conf->set(
"client.id",
"opmon_microservice-0", errstr);
205 conf->set(
"group.id", group_id, errstr);
209 topics.push_back(topic);
211 auto consumer = std::unique_ptr<RdKafka::KafkaConsumer>(RdKafka::KafkaConsumer::create(conf.get(), errstr));
222 consumerLoop(*
consumer, batch_size, batch_tmout, db_host +
":" + db_port +
"/" + db_path +
"?db=" + db_dbname);
225 }
catch (ers::IssueCatcherAlreadySet& ex) {