Line data Source code
1 : /**
2 : * @brief test application for the the whole chain from MonitorableObject to kafka
3 : */
4 :
5 : #include <chrono>
6 : #include <thread>
7 : #include <future>
8 :
9 : #include "opmonlib/opmon/test.pb.h"
10 :
11 : #include <boost/program_options.hpp>
12 :
13 : #include "opmonlib/OpMonManager.hpp"
14 :
15 : namespace po = boost::program_options;
16 :
17 : using namespace dunedaq::opmonlib;
18 :
19 :
20 : class TestObject : public MonitorableObject {
21 :
22 : public:
23 : using MonitorableObject::register_node;
24 : using MonitorableObject::publish;
25 0 : TestObject() : MonitorableObject() {;}
26 : };
27 :
28 :
29 :
30 : int
31 0 : main(int argc, char const* argv[])
32 : {
33 0 : po::options_description desc("Allowed options");
34 0 : desc.add_options()
35 0 : ("help,h", "produce help message")
36 0 : ("bootstrap", po::value<std::string>()->default_value("monkafka.cern.ch:30092"), "kafka bootstrap server")
37 0 : ("topic,t", po::value<std::string>()->default_value("opmon_stream"), "Optional specification of a topic" )
38 0 : ("n_threads,n", po::value<unsigned int>()->default_value(10), "Number of threads used for test")
39 : ;
40 :
41 0 : po::variables_map input_map;
42 0 : po::store(po::parse_command_line(argc, argv, desc), input_map);
43 0 : po::notify(input_map);
44 :
45 0 : if ( input_map.count("help") ) {
46 0 : std::cout << desc << std::endl;
47 : return 0;
48 : }
49 :
50 0 : std::string uri = "stream://";
51 0 : uri += input_map["bootstrap"].as<std::string>();
52 0 : uri += '/';
53 0 : uri += input_map["topic"].as<std::string>();
54 :
55 0 : OpMonManager man( "test",
56 : "application",
57 0 : uri );
58 :
59 0 : const auto n = input_map["n_threads"].as<unsigned int>() ;
60 0 : std::vector<std::shared_ptr<TestObject>> objs(n);
61 0 : for ( size_t i = 0; i < n; ++i ) {
62 0 : auto p = objs[i] = std::make_shared<TestObject>();
63 0 : man.register_node( "element_" + std::to_string(i), p );
64 0 : }
65 :
66 0 : auto pub_func = [&](int i, std::shared_ptr<TestObject> p){
67 :
68 0 : for (auto j = 0; j < 20; ++j ) {
69 0 : dunedaq::opmon::TestInfo ti;
70 0 : ti.set_int_example( j*1000 + i );
71 0 : ti.set_string_example( "test" );
72 0 : p->publish( std::move(ti) );
73 0 : }
74 0 : };
75 :
76 0 : std::vector<std::future<void>> threads(n);
77 :
78 0 : for( size_t i = 0 ; i < n; ++i ) {
79 0 : threads[i] = async(std::launch::async, pub_func, i, objs[i]);
80 : }
81 :
82 0 : for ( auto & t : threads ) {
83 0 : t.get();
84 : }
85 :
86 0 : std::this_thread::sleep_for(std::chrono::seconds(2));
87 :
88 0 : return 0;
89 0 : }
|