DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
kafka_opmon_consumer.cxx File Reference
#include "JsonInfluxConverter.hpp"
#include "boost/program_options.hpp"
#include "cpr/cpr.h"
#include "curl/curl.h"
#include "ers/ers.hpp"
#include "librdkafka/rdkafkacpp.h"
#include "nlohmann/json.hpp"
#include <csignal>
#include <cstdio>
#include <cstdlib>
#include <cstring>
#include <iomanip>
#include <iostream>
#include <regex.h>
#include <sstream>
#include <string>
#include <sys/time.h>
#include <utility>
#include <vector>
#include <memory>
Include dependency graph for kafka_opmon_consumer.cxx:

Go to the source code of this file.

Namespaces

namespace  dunedaq
 Including Qt Headers.
 

Functions

 dunedaq::ERS_DECLARE_ISSUE (kafkaopmon, CannotPostToDb, "Cannot post to Influx DB : "<< error,((std::string) error)) ERS_DECLARE_ISSUE(kafkaopmon
 
Cannot create std::string fatal dunedaq::ERS_DECLARE_ISSUE (kafkaopmon, CannotConsumeMessage, "Cannot consume message : "<< error,((std::string) error)) ERS_DECLARE_ISSUE(kafkaopmon
 
static int64_t now ()
 
static std::vector< std::unique_ptr< RdKafka::Message > > consume_batch (RdKafka::KafkaConsumer &consumer, size_t batch_size, int batch_tmout)
 
void execution_command (const std::string &adress, const std::string &cmd)
 
void consumerLoop (RdKafka::KafkaConsumer &consumer, int batch_size, int batch_tmout, std::string adress)
 
int main (int argc, char *argv[])
 

Variables

 dunedaq::CannotCreateConsumer
 
Cannot create dunedaq::consumer
 
Cannot create std::string fatal dunedaq::IncorrectParameters
 
Cannot create std::string fatal Incorrect dunedaq::parameters
 
static volatile sig_atomic_t run = 1
 
static dunedaq::influxopmon::JsonConverter m_json_converter
 
static std::vector< std::string > inserts_vectors
 
static uint seed = 0
 

Function Documentation

◆ consume_batch()

static std::vector< std::unique_ptr< RdKafka::Message > > consume_batch ( RdKafka::KafkaConsumer & consumer,
size_t batch_size,
int batch_tmout )
static

Definition at line 52 of file kafka_opmon_consumer.cxx.

53{
54
55 std::vector<std::unique_ptr<RdKafka::Message>> msgs;
56 msgs.reserve(batch_size);
57
58 int64_t end = now() + batch_tmout;
59 int remaining_timeout = batch_tmout;
60
61 while (msgs.size() < batch_size) {
62 auto msg = std::unique_ptr<RdKafka::Message>(consumer.consume(remaining_timeout));
63
64 if (msg == nullptr)
65 ers::error(dunedaq::kafkaopmon::CannotConsumeMessage(ERS_HERE, "%% Consumer error: Message is null"));
66
67 switch (msg->err()) {
68 case RdKafka::ERR__TIMED_OUT:
69 return msgs;
70
71 case RdKafka::ERR_NO_ERROR:
72 msgs.push_back(std::move(msg));
73 break;
74
75 default:
77 dunedaq::kafkaopmon::CannotConsumeMessage(ERS_HERE, "%% Unhandled consumer error: " + msg->errstr()));
78 run = 0;
79 return msgs;
80 }
81
82 remaining_timeout = end - now();
83 if (remaining_timeout < 0)
84 break;
85 }
86
87 return msgs;
88}
#define ERS_HERE
static int64_t now()
static volatile sig_atomic_t run
Cannot create consumer
void error(const Issue &issue)
Definition ers.hpp:81

◆ consumerLoop()

void consumerLoop ( RdKafka::KafkaConsumer & consumer,
int batch_size,
int batch_tmout,
std::string adress )

Definition at line 105 of file kafka_opmon_consumer.cxx.

106{
107
108 std::cout << "Using query: " << adress << std::endl;
109
110 while (run) {
111 auto msgs = consume_batch(consumer, batch_size, batch_tmout);
112 for (auto& msg : msgs) {
113
114 // execution_command(adress, message_text);
115 std::string json_string(static_cast<char*>(msg->payload()), msg->len());
116
117 // std::cout << json_string << std::endl;
118 // json message = json::parse( json_string );
119
120 // std::string query;
121 // query = message["type"].get<std::string>() + ',';
122 // query += ("source_id="+message["source_id"].get<std::string>()+',');
123 // query += ("partition_id="+message["partition_id"].get<std::string>()+' ');
124 // const auto & data = message["__data"];
125 // std::stringstream data_stream;
126 // for ( auto it = data.begin() ; it != data.end() ; ++it ) {
127 // if ( it != data.begin() ) data_stream << ',' ;
128 // data_stream << it.key() << '=' << it.value() ;
129 // }
130 // query += data_stream.str();
131 // query += ' ';
132 // query += std::to_string(message["__time"].get<uint64_t>() * 1000000000);
133
134 // execution_command(adress, query);
135 }
136 }
137}
static std::vector< std::unique_ptr< RdKafka::Message > > consume_batch(RdKafka::KafkaConsumer &consumer, size_t batch_size, int batch_tmout)

◆ execution_command()

void execution_command ( const std::string & adress,
const std::string & cmd )

Definition at line 91 of file kafka_opmon_consumer.cxx.

92{
93
94 cpr::Response response = cpr::Post(cpr::Url{ adress }, cpr::Body{ cmd });
95 // std::cout << cmd << std::endl;
96 if (response.status_code >= 400) {
97 ers::error(dunedaq::kafkaopmon::CannotPostToDb(
98 ERS_HERE, "Error [" + std::to_string(response.status_code) + "] making request"));
99 } else if (response.status_code == 0) {
100 ers::error(dunedaq::kafkaopmon::CannotPostToDb(ERS_HERE, "Query returned 0"));
101 }
102}

◆ main()

int main ( int argc,
char * argv[] )

Definition at line 140 of file kafka_opmon_consumer.cxx.

141{
142 std::string broker;
143 std::string topic;
144 std::string db_host;
145 std::string db_port;
146 std::string db_path;
147 std::string db_dbname;
148 std::string topic_str;
149 std::vector<std::string> topics;
150 // Bulk consume parameters
151 int batch_size = 100;
152 int batch_tmout = 1000;
153 // Kafka server settings
154 std::string errstr;
155
156 // get parameters
157
158 bpo::options_description desc{ "example: -broker 188.185.122.48:9092 -topic kafkaopmon-reporting -dbhost "
159 "188.185.88.195 -dbport 80 -dbpath insert -dbname db1" };
160 desc.add_options()("help,h", "Help screen")(
161 "broker,b", bpo::value<std::string>()->default_value("monkafka.cern.ch:30092"), "Broker")(
162 "topic,t", bpo::value<std::string>()->default_value("opmon"), "Topic")(
163 "dbhost,ho", bpo::value<std::string>()->default_value("opmondb.cern.ch"), "Database host")(
164 "dbport,po", bpo::value<std::string>()->default_value("31002"), "Database port")(
165 "dbpath,pa", bpo::value<std::string>()->default_value("write"), "Database path")(
166 "dbname,n", bpo::value<std::string>()->default_value("influxdb"), "Database name");
167
168 bpo::variables_map vm;
169
170 try {
171 auto parsed = bpo::command_line_parser(argc, argv).options(desc).run();
172 bpo::store(parsed, vm);
173 } catch (bpo::error const& e) {
174 ers::error(dunedaq::kafkaopmon::IncorrectParameters(ERS_HERE, e.what()));
175 }
176
177 if (vm.count("help")) {
178 TLOG() << desc << std::endl;
179 return 0;
180 }
181
182 broker = vm["broker"].as<std::string>();
183 topic = vm["topic"].as<std::string>();
184 db_host = vm["dbhost"].as<std::string>();
185 db_port = vm["dbport"].as<std::string>();
186 db_path = vm["dbpath"].as<std::string>();
187 db_dbname = vm["dbname"].as<std::string>();
188
189 // Broker parameters
190
191 try {
192 auto conf = std::unique_ptr<RdKafka::Conf>(RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL));
193
194 srand((unsigned)time(0));
195 std::string group_id = "opmon_microservices";
196
197 conf->set("bootstrap.servers", broker, errstr);
198 if (errstr != "") {
199 ers::fatal(dunedaq::kafkaopmon::CannotCreateConsumer(ERS_HERE, errstr));
200 }
201 conf->set("client.id", "opmon_microservice-0", errstr);
202 if (errstr != "") {
203 ers::fatal(dunedaq::kafkaopmon::CannotCreateConsumer(ERS_HERE, errstr));
204 }
205 conf->set("group.id", group_id, errstr);
206 if (errstr != "") {
207 ers::fatal(dunedaq::kafkaopmon::CannotCreateConsumer(ERS_HERE, errstr));
208 }
209 topics.push_back(topic);
210
211 auto consumer = std::unique_ptr<RdKafka::KafkaConsumer>(RdKafka::KafkaConsumer::create(conf.get(), errstr));
212
213 if (errstr != "") {
214 ers::fatal(dunedaq::kafkaopmon::CannotCreateConsumer(ERS_HERE, errstr));
215 }
216
217 if (consumer)
218 consumer->subscribe(topics);
219 else
220 ers::fatal(dunedaq::kafkaopmon::CannotCreateConsumer(ERS_HERE, errstr));
221
222 consumerLoop(*consumer, batch_size, batch_tmout, db_host + ":" + db_port + "/" + db_path + "?db=" + db_dbname);
223
224 consumer->close();
225 } catch (ers::IssueCatcherAlreadySet& ex) {
226 ers::error(ex);
227 }
228
229 return 0;
230}
void consumerLoop(RdKafka::KafkaConsumer &consumer, int batch_size, int batch_tmout, std::string adress)
#define TLOG(...)
Definition macro.hpp:22
void fatal(const Issue &issue)
Definition ers.hpp:88

◆ now()

static int64_t now ( )
static

Definition at line 44 of file kafka_opmon_consumer.cxx.

45{
46 struct timeval tv;
47 gettimeofday(&tv, nullptr);
48 return ((int64_t)tv.tv_sec * 1000) + (tv.tv_usec / 1000);
49}

Variable Documentation

◆ inserts_vectors

std::vector<std::string> inserts_vectors
static

Definition at line 40 of file kafka_opmon_consumer.cxx.

◆ m_json_converter

dunedaq::influxopmon::JsonConverter m_json_converter
static

Definition at line 39 of file kafka_opmon_consumer.cxx.

◆ run

volatile sig_atomic_t run = 1
static

Definition at line 38 of file kafka_opmon_consumer.cxx.

◆ seed

uint seed = 0
static

Definition at line 41 of file kafka_opmon_consumer.cxx.