Line data Source code
1 : /**
2 : * @file streamOpMonFacility.cpp kafkaopmon class implementation
3 : *
4 : * This is part of the DUNE DAQ software, copyright 2020.
5 : * Licensing/copyright details are in the COPYING file that you should have
6 : * received with this code.
7 : */
8 :
9 : #include "streamOpMonFacility.hpp"
10 :
11 : using json = nlohmann::json;
12 :
13 : namespace dunedaq::kafkaopmon { // namespace dunedaq
14 :
15 2 : streamOpMonFacility::streamOpMonFacility(std::string uri, dunedaq::opmonlib::OptionalOrigin o)
16 2 : : dunedaq::opmonlib::OpMonFacility(uri) {
17 :
18 2 : std::regex uri_re(R"(([a-zA-Z]+):\/\/([^:\/?#\s]+):(\d+)\/([^:\/?#\s]+))");
19 : //* 1st Capturing Group `([a-zA-Z])`: Matches protocol
20 : //* 2nd Capturing Group `([^:\/?#\s])+`: Matches hostname
21 : //* 3rd Capturing Group `(\d)`: Matches port
22 : //* 4th Capturing Group `([^\/?#]+)?`: Matches kafka topic
23 :
24 2 : std::smatch uri_match;
25 2 : if (!std::regex_match(uri, uri_match, uri_re)) {
26 1 : throw WrongURI(ERS_HERE, uri);
27 : }
28 :
29 1 : json config;
30 1 : std::string bootstrap = uri_match[2];
31 1 : bootstrap += ':' ;
32 1 : bootstrap += uri_match[3];
33 1 : config["bootstrap"] = bootstrap;
34 :
35 : // optionally set the ID of the application
36 : // But really this is temporary, and in the future we should avoid env variables
37 1 : if ( o ) {
38 0 : config["cliend_id"] = dunedaq::opmonlib::to_string( o.value() ) ;
39 : }
40 :
41 1 : config["default_topic"] = uri_match[4];
42 :
43 1 : m_publisher.reset( new OpMonPublisher(config) );
44 5 : }
45 :
46 : } // namespace dunedaq::kafkaopmon
47 :
48 :
49 4 : DEFINE_DUNE_OPMON_FACILITY(dunedaq::kafkaopmon::streamOpMonFacility)
50 :
|