Line data Source code
1 : /**
2 : * @brief test application for the opmon publisher
3 : */
4 :
5 : #include <chrono>
6 : #include <thread>
7 : #include <future>
8 :
9 : #include <kafkaopmon/OpMonPublisher.hpp>
10 : #include "opmonlib/Utils.hpp"
11 : #include "opmonlib/opmon/test.pb.h"
12 :
13 : #include <boost/program_options.hpp>
14 :
15 : using nlohmann::json;
16 : namespace po = boost::program_options;
17 :
18 : using namespace dunedaq::kafkaopmon;
19 : using namespace dunedaq::opmonlib;
20 :
21 :
22 :
23 :
24 :
25 : int
26 0 : main(int argc, char const* argv[])
27 : {
28 0 : po::options_description desc("Allowed options");
29 0 : desc.add_options()
30 0 : ("help,h", "produce help message")
31 0 : ("bootstrap", po::value<std::string>()->default_value("monkafka.cern.ch:30092"), "kafka bootstrap server")
32 0 : ("topic,t", po::value<std::string>(), "Optional specification of a topic" )
33 0 : ("n_threads,n", po::value<unsigned int>()->default_value(10), "Number of threads used for test")
34 : ;
35 :
36 0 : po::variables_map input_map;
37 0 : po::store(po::parse_command_line(argc, argv, desc), input_map);
38 0 : po::notify(input_map);
39 :
40 0 : if ( input_map.count("help") ) {
41 0 : std::cout << desc << std::endl;
42 : return 0;
43 : }
44 :
45 0 : json conf;
46 0 : conf["bootstrap"] = input_map["bootstrap"].as<std::string>();
47 0 : conf["client_id"] = "opmon_publisher_test";
48 0 : if ( input_map.count("topic") ) {
49 0 : conf["default_topic"] = input_map["topic"].as<std::string>() ;
50 : }
51 :
52 0 : OpMonPublisher p(conf);
53 :
54 0 : auto pub_func = [&](int i){
55 0 : dunedaq::opmon::OpMonId id;
56 0 : id += "test";
57 0 : id += "app";
58 0 : id += "thread_" + std::to_string(i);
59 0 : for (auto j = 0; j < 20; ++j ) {
60 0 : dunedaq::opmon::TestInfo ti;
61 0 : ti.set_int_example( j*1000 + i );
62 0 : ti.set_string_example( "test" );
63 0 : auto e = to_entry( ti, {} );
64 0 : *e.mutable_origin() = id;
65 0 : p.publish( std::move(e) );
66 0 : }
67 0 : };
68 :
69 0 : auto n = input_map["n_threads"].as<unsigned int>() ;
70 0 : std::vector<std::future<void>> threads(n);
71 :
72 0 : for( size_t i = 0 ; i < n; ++i ) {
73 0 : threads[i] = async(std::launch::async, pub_func, i);
74 : }
75 :
76 0 : for ( auto & t : threads ) {
77 0 : t.get();
78 : }
79 :
80 0 : std::this_thread::sleep_for(std::chrono::seconds(2));
81 :
82 0 : return 0;
83 0 : }
|