73 {
74
75 std::string binary;
76 entry.SerializeToString( & binary );
77
80
82 RdKafka::Topic::PARTITION_UA,
83 RdKafka::Producer::RK_MSG_COPY,
84 const_cast<char *>(binary.c_str()), binary.size(),
85 key.c_str(), key.size(),
86 0,
87 nullptr
88 );
89
90 if ( err == RdKafka::ERR_NO_ERROR ) return ;
91
92 std::string err_cause;
93
94 switch( err ) {
95 case RdKafka::ERR__QUEUE_FULL :
96 err_cause = "maximum number of outstanding messages reached";
97 break;
98 case RdKafka::ERR_MSG_SIZE_TOO_LARGE :
99 err_cause = "message too large";
100 break;
101 case RdKafka::ERR__UNKNOWN_PARTITION :
102 err_cause = "Unknown partition";
103 break;
104 case RdKafka::ERR__UNKNOWN_TOPIC :
105 err_cause = "Unknown topic (" ;
106 err_cause += topic;
107 err_cause += ')';
108 break;
109 default:
110 err_cause = "unknown";
111 break;
112 }
113
114 throw FailedProduce(
ERS_HERE, key, err_cause) ;
115
116}
std::string extract_key(const dunedaq::opmon::OpMonEntry &e) const noexcept
std::string extract_topic(const dunedaq::opmon::OpMonEntry &) const noexcept