DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
OpMonPublisher.cpp
Go to the documentation of this file.
1
11
12using namespace dunedaq::kafkaopmon;
13
14OpMonPublisher::OpMonPublisher( const nlohmann::json& conf) {
15
16 RdKafka::Conf * k_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
17 std::string errstr;
18
19 // Good observations on threadsafety here
20 // https://www.confluent.io/blog/modern-cpp-kafka-api-for-safe-easy-messaging/
21
22 auto it = conf.find("bootstrap");
23 if ( it == conf.end() ) {
24 throw MissingParameter(ERS_HERE,
25 "bootstrap",
26 nlohmann::to_string(conf) );
27 }
28
29 k_conf->set("bootstrap.servers", *it, errstr);
30 if( ! errstr.empty() ) {
32 "bootstrap.servers",
33 errstr);
34 }
35
36 std::string client_id;
37 it = conf.find( "cliend_id" );
38 if ( it != conf.end() )
39 client_id = *it;
40 else
41 client_id = "kafkaopmon_default_producer";
42
43 k_conf->set("client.id", client_id, errstr);
44 if ( ! errstr.empty() ) {
45 ers::error( FailedConfiguration(ERS_HERE, "client.id", errstr ) );
46 }
47
48 // Create producer instance
49 m_producer.reset(RdKafka::Producer::create(k_conf, errstr));
50
51 if( ! m_producer ){
52 throw FailedProducerCreation(ERS_HERE, errstr);
53 }
54
55 it = conf.find("default_topic");
56 if (it != conf.end()) m_default_topic = "monitoring." + it->get<std::string>();
57
58}
59
60
62
63 int timeout_ms = 500;
64 RdKafka::ErrorCode err = m_producer -> flush( timeout_ms );
65
66 if ( err == RdKafka::ERR__TIMED_OUT ) {
67 ers::warning( TimeoutReachedWhileFlushing( ERS_HERE, timeout_ms ) );
68 }
69
70}
71
72
74
75 std::string binary;
76 entry.SerializeToString( & binary );
77
78 auto topic = extract_topic( entry );
79 auto key = extract_key( entry );
80
81 RdKafka::ErrorCode err = m_producer -> produce( topic,
82 RdKafka::Topic::PARTITION_UA,
83 RdKafka::Producer::RK_MSG_COPY,
84 const_cast<char *>(binary.c_str()), binary.size(),
85 key.c_str(), key.size(),
86 0,
87 nullptr
88 );
89
90 if ( err == RdKafka::ERR_NO_ERROR ) return ;
91
92 std::string err_cause;
93
94 switch( err ) {
95 case RdKafka::ERR__QUEUE_FULL :
96 err_cause = "maximum number of outstanding messages reached";
97 break;
98 case RdKafka::ERR_MSG_SIZE_TOO_LARGE :
99 err_cause = "message too large";
100 break;
101 case RdKafka::ERR__UNKNOWN_PARTITION :
102 err_cause = "Unknown partition";
103 break;
104 case RdKafka::ERR__UNKNOWN_TOPIC :
105 err_cause = "Unknown topic (" ;
106 err_cause += topic;
107 err_cause += ')';
108 break;
109 default:
110 err_cause = "unknown";
111 break;
112 }
113
114 throw FailedProduce(ERS_HERE, key, err_cause) ;
115
116}
#define ERS_HERE
std::unique_ptr< RdKafka::Producer > m_producer
std::string extract_key(const dunedaq::opmon::OpMonEntry &e) const noexcept
std::string extract_topic(const dunedaq::opmon::OpMonEntry &) const noexcept
void publish(dunedaq::opmon::OpMonEntry &&) const
void warning(const Issue &issue)
Definition ers.hpp:115
void error(const Issue &issue)
Definition ers.hpp:81