DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
OpMonPublisher.hpp
Go to the documentation of this file.
1
12#ifndef KAFKAOPMON_INCLUDE_KAFKAOPMON_OPMONPUBLISHER_HPP_
13#define KAFKAOPMON_INCLUDE_KAFKAOPMON_OPMONPUBLISHER_HPP_
14
15#include <librdkafka/rdkafkacpp.h>
16#include <nlohmann/json.hpp>
17#include <ers/ers.hpp>
18
19#include <string>
20#include <memory>
21
23#include "opmonlib/Utils.hpp"
24
25namespace dunedaq {
26
27 ERS_DECLARE_ISSUE( kafkaopmon,
28 MissingParameter,
29 "No " << parameter << " in " << conf,
30 ((std::string)parameter)((std::string)conf)
31 )
32
33 ERS_DECLARE_ISSUE( kafkaopmon,
35 "Invalid " << parameter << ", cause: " << reason,
36 ((std::string)parameter)((std::string)reason)
37 )
38
39 ERS_DECLARE_ISSUE( kafkaopmon,
40 FailedProducerCreation,
41 "Failed creation of a Kafka producer, cause: " << reason,
42 ((std::string)reason)
43 )
44
45 ERS_DECLARE_ISSUE( kafkaopmon,
46 FailedProduce,
47 "Failed produce of message with key " << key << ", cause: " << reason,
48 ((std::string)key)((std::string)reason)
49 )
50
51 ERS_DECLARE_ISSUE( kafkaopmon,
52 TimeoutReachedWhileFlushing,
53 "Publisher destroyed before all messages were completed, timeout: " << timeout << " ms",
54 ((int)timeout)
55 )
56
57
58} // dunedaq namespace
59
60
61
62
64
66
67 public:
68 OpMonPublisher( const nlohmann::json& conf );
69
70 OpMonPublisher() = delete;
71 OpMonPublisher( const OpMonPublisher & ) = delete;
75
77
79
80 protected:
81 std::string extract_topic( const dunedaq::opmon::OpMonEntry & ) const noexcept {
82 return m_default_topic;
83 }
84 std::string extract_key( const dunedaq::opmon::OpMonEntry & e) const noexcept {
85 return dunedaq::opmonlib::to_string(e.origin()) + '/' + e.measurement() ;
86 }
87
88 private:
89 std::unique_ptr<RdKafka::Producer> m_producer;
90 std::string m_default_topic = "monitoring.opmon_stream";
91
92 };
93
94} // namespace dunedaq::kafkaopmon
95
96#endif //KAFKAOPMON_INCLUDE_KAFKAOPMON_OPMONPUBLISHER_HPP_
#define ERS_DECLARE_ISSUE(namespace_name, class_name, message, attributes)
std::unique_ptr< RdKafka::Producer > m_producer
OpMonPublisher & operator=(const OpMonPublisher &)=delete
std::string extract_key(const dunedaq::opmon::OpMonEntry &e) const noexcept
OpMonPublisher(const OpMonPublisher &)=delete
std::string extract_topic(const dunedaq::opmon::OpMonEntry &) const noexcept
OpMonPublisher(const nlohmann::json &conf)
OpMonPublisher(OpMonPublisher &&)=delete
void publish(dunedaq::opmon::OpMonEntry &&) const
std::string to_string(const dunedaq::opmon::OpMonId &)
Definition Utils.cpp:167
Including Qt Headers.