Line data Source code
1 : /**
2 : * @file ERSPublisher.cpp ERSPublusher Class Implementation
3 : *
4 : * This is part of the DUNE DAQ Software Suite, copyright 2020.
5 : * Licensing/copyright details are in the COPYING file that you should have
6 : * received with this code.
7 : */
8 :
9 : #include "erskafka/ERSPublisher.hpp"
10 :
11 : #include <iostream>
12 :
13 : #include <stdexcept>
14 :
15 : using namespace dunedaq::erskafka;
16 :
17 :
18 0 : ERSPublisher::ERSPublisher(const nlohmann::json& conf) {
19 :
20 0 : RdKafka::Conf * k_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
21 0 : std::string errstr;
22 :
23 0 : auto it = conf.find("bootstrap");
24 0 : if ( it == conf.end() ) {
25 0 : std::cerr << "Missing bootstrap from json file";
26 0 : throw std::runtime_error( "Missing bootstrap from json file" );
27 : }
28 :
29 0 : k_conf->set("bootstrap.servers", *it, errstr);
30 0 : if(errstr != ""){
31 0 : throw std::runtime_error( errstr );
32 : }
33 :
34 0 : std::string client_id;
35 0 : it = conf.find( "cliend_id" );
36 0 : if ( it != conf.end() )
37 0 : client_id = *it;
38 0 : else if(const char* env_p = std::getenv("DUNEDAQ_APPLICATION_NAME"))
39 0 : client_id = env_p;
40 : else
41 0 : client_id = "erskafkaproducerdefault";
42 :
43 0 : k_conf->set("client.id", client_id, errstr);
44 0 : if(errstr != ""){
45 0 : throw std::runtime_error( errstr );
46 : }
47 :
48 : //Create producer instance
49 0 : m_producer.reset(RdKafka::Producer::create(k_conf, errstr));
50 :
51 0 : if(errstr != ""){
52 0 : throw std::runtime_error( errstr );
53 : }
54 :
55 0 : it = conf.find("default_topic");
56 0 : if (it != conf.end()) m_default_topic = *it;
57 :
58 0 : }
59 :
60 0 : bool ERSPublisher::publish( ersschema::IssueChain && issue ) const {
61 :
62 0 : std::string binary;
63 0 : issue.SerializeToString( & binary );
64 :
65 : // get the topic
66 0 : auto topic = ERSPublisher::topic(issue);
67 :
68 0 : auto key = ERSPublisher::key(issue);
69 :
70 : // RdKafka::Producer::RK_MSG_COPY to be investigated
71 0 : RdKafka::ErrorCode err = m_producer->produce(topic,
72 : RdKafka::Topic::PARTITION_UA,
73 : RdKafka::Producer::RK_MSG_COPY,
74 0 : const_cast<char *>(binary.c_str()), binary.size(),
75 0 : key.c_str(),
76 : key.size(),
77 : 0,
78 : nullptr);
79 0 : if (err != RdKafka::ERR_NO_ERROR) {
80 0 : return false;
81 : }
82 :
83 : return true;
84 0 : }
85 :
|