Line data Source code
1 : /**
2 : * @file OpMonPublisher.cpp OpMonPublisher Class Implementation
3 : *
4 : * This is part of the DUNE DAQ Software Suite, copyright 2020.
5 : * Licensing/copyright details are in the COPYING file that you should have
6 : * received with this code.
7 : */
8 :
9 :
10 : #include "kafkaopmon/OpMonPublisher.hpp"
11 :
12 : using namespace dunedaq::kafkaopmon;
13 :
14 3 : OpMonPublisher::OpMonPublisher( const nlohmann::json& conf) {
15 :
16 3 : RdKafka::Conf * k_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
17 3 : std::string errstr;
18 :
19 : // Good observations on threadsafety here
20 : // https://www.confluent.io/blog/modern-cpp-kafka-api-for-safe-easy-messaging/
21 :
22 3 : auto it = conf.find("bootstrap");
23 3 : if ( it == conf.end() ) {
24 1 : throw MissingParameter(ERS_HERE,
25 : "bootstrap",
26 2 : nlohmann::to_string(conf) );
27 : }
28 :
29 2 : k_conf->set("bootstrap.servers", *it, errstr);
30 2 : if( ! errstr.empty() ) {
31 0 : throw FailedConfiguration(ERS_HERE,
32 : "bootstrap.servers",
33 0 : errstr);
34 : }
35 :
36 2 : std::string client_id;
37 2 : it = conf.find( "cliend_id" );
38 2 : if ( it != conf.end() )
39 0 : client_id = *it;
40 : else
41 2 : client_id = "kafkaopmon_default_producer";
42 :
43 2 : k_conf->set("client.id", client_id, errstr);
44 2 : if ( ! errstr.empty() ) {
45 0 : ers::error( FailedConfiguration(ERS_HERE, "client.id", errstr ) );
46 : }
47 :
48 : // Create producer instance
49 2 : m_producer.reset(RdKafka::Producer::create(k_conf, errstr));
50 :
51 2 : if( ! m_producer ){
52 0 : throw FailedProducerCreation(ERS_HERE, errstr);
53 : }
54 :
55 2 : it = conf.find("default_topic");
56 2 : if (it != conf.end()) m_default_topic = "monitoring." + it->get<std::string>();
57 :
58 5 : }
59 :
60 :
61 2 : OpMonPublisher::~OpMonPublisher() {
62 :
63 2 : int timeout_ms = 500;
64 2 : RdKafka::ErrorCode err = m_producer -> flush( timeout_ms );
65 :
66 2 : if ( err == RdKafka::ERR__TIMED_OUT ) {
67 0 : ers::warning( TimeoutReachedWhileFlushing( ERS_HERE, timeout_ms ) );
68 : }
69 :
70 2 : }
71 :
72 :
73 0 : void OpMonPublisher::publish( dunedaq::opmon::OpMonEntry && entry ) const {
74 :
75 0 : std::string binary;
76 0 : entry.SerializeToString( & binary );
77 :
78 0 : auto topic = extract_topic( entry );
79 0 : auto key = extract_key( entry );
80 :
81 0 : RdKafka::ErrorCode err = m_producer -> produce( topic,
82 : RdKafka::Topic::PARTITION_UA,
83 : RdKafka::Producer::RK_MSG_COPY,
84 0 : const_cast<char *>(binary.c_str()), binary.size(),
85 0 : key.c_str(), key.size(),
86 : 0,
87 : nullptr
88 : );
89 :
90 0 : if ( err == RdKafka::ERR_NO_ERROR ) return ;
91 :
92 0 : std::string err_cause;
93 :
94 0 : switch( err ) {
95 0 : case RdKafka::ERR__QUEUE_FULL :
96 0 : err_cause = "maximum number of outstanding messages reached";
97 : break;
98 0 : case RdKafka::ERR_MSG_SIZE_TOO_LARGE :
99 0 : err_cause = "message too large";
100 : break;
101 0 : case RdKafka::ERR__UNKNOWN_PARTITION :
102 0 : err_cause = "Unknown partition";
103 : break;
104 0 : case RdKafka::ERR__UNKNOWN_TOPIC :
105 0 : err_cause = "Unknown topic (" ;
106 0 : err_cause += topic;
107 0 : err_cause += ')';
108 : break;
109 0 : default:
110 0 : err_cause = "unknown";
111 : break;
112 : }
113 :
114 0 : throw FailedProduce(ERS_HERE, key, err_cause) ;
115 :
116 0 : }
|