DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
ERSPublisher.cpp
Go to the documentation of this file.
1
10
11#include <iostream>
12
13#include <stdexcept>
14
15using namespace dunedaq::erskafka;
16
17
18ERSPublisher::ERSPublisher(const nlohmann::json& conf) {
19
20 RdKafka::Conf * k_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
21 std::string errstr;
22
23 auto it = conf.find("bootstrap");
24 if ( it == conf.end() ) {
25 std::cerr << "Missing bootstrap from json file";
26 throw std::runtime_error( "Missing bootstrap from json file" );
27 }
28
29 k_conf->set("bootstrap.servers", *it, errstr);
30 if(errstr != ""){
31 throw std::runtime_error( errstr );
32 }
33
34 std::string client_id;
35 it = conf.find( "cliend_id" );
36 if ( it != conf.end() )
37 client_id = *it;
38 else if(const char* env_p = std::getenv("DUNEDAQ_APPLICATION_NAME"))
39 client_id = env_p;
40 else
41 client_id = "erskafkaproducerdefault";
42
43 k_conf->set("client.id", client_id, errstr);
44 if(errstr != ""){
45 throw std::runtime_error( errstr );
46 }
47
48 //Create producer instance
49 m_producer.reset(RdKafka::Producer::create(k_conf, errstr));
50
51 if(errstr != ""){
52 throw std::runtime_error( errstr );
53 }
54
55 it = conf.find("default_topic");
56 if (it != conf.end()) m_default_topic = *it;
57
58}
59
60bool ERSPublisher::publish( ersschema::IssueChain && issue ) const {
61
62 std::string binary;
63 issue.SerializeToString( & binary );
64
65 // get the topic
66 auto topic = ERSPublisher::topic(issue);
67
68 auto key = ERSPublisher::key(issue);
69
70 // RdKafka::Producer::RK_MSG_COPY to be investigated
71 RdKafka::ErrorCode err = m_producer->produce(topic,
72 RdKafka::Topic::PARTITION_UA,
73 RdKafka::Producer::RK_MSG_COPY,
74 const_cast<char *>(binary.c_str()), binary.size(),
75 key.c_str(),
76 key.size(),
77 0,
78 nullptr);
79 if (err != RdKafka::ERR_NO_ERROR) {
80 return false;
81 }
82
83 return true;
84}
85
bool publish(dunedaq::ersschema::IssueChain &&) const
std::unique_ptr< RdKafka::Producer > m_producer
std::string key(const dunedaq::ersschema::IssueChain &i) const
std::string topic(const dunedaq::ersschema::IssueChain &) const