LCOV - code coverage report
Current view: top level - kafkaopmon/test/apps - opmon_publish_to_kafka_test.cxx (source / functions) Coverage Total Hit
Test: code.result Lines: 0.0 % 41 0
Test Date: 2025-12-21 13:07:08 Functions: 0.0 % 3 0

            Line data    Source code
       1              : /**
       2              :  * @brief test application for the the whole chain from MonitorableObject to kafka
       3              :  */
       4              : 
       5              : #include <chrono>
       6              : #include <thread>
       7              : #include <future>        
       8              : 
       9              : #include "opmonlib/opmon/test.pb.h"
      10              : 
      11              : #include <boost/program_options.hpp>
      12              : 
      13              : #include "opmonlib/OpMonManager.hpp"
      14              : 
      15              : namespace po = boost::program_options;
      16              : 
      17              : using namespace dunedaq::opmonlib;
      18              : 
      19              : 
      20              : class TestObject : public MonitorableObject {
      21              : 
      22              :   public:
      23              :     using MonitorableObject::register_node;
      24              :     using MonitorableObject::publish;
      25            0 :     TestObject() : MonitorableObject() {;}
      26              : };
      27              : 
      28              : 
      29              : 
      30              : int
      31            0 : main(int argc, char const* argv[])
      32              : {
      33            0 :   po::options_description desc("Allowed options");
      34            0 :   desc.add_options()
      35            0 :     ("help,h", "produce help message")
      36            0 :     ("bootstrap", po::value<std::string>()->default_value("monkafka.cern.ch:30092"), "kafka bootstrap server")
      37            0 :     ("topic,t", po::value<std::string>()->default_value("opmon_stream"), "Optional specification of a topic" )
      38            0 :     ("n_threads,n", po::value<unsigned int>()->default_value(10), "Number of threads used for test") 
      39              :   ;
      40              : 
      41            0 :   po::variables_map input_map;
      42            0 :   po::store(po::parse_command_line(argc, argv, desc), input_map);
      43            0 :   po::notify(input_map);
      44              :   
      45            0 :   if ( input_map.count("help") ) {
      46            0 :     std::cout << desc << std::endl;
      47              :     return 0;
      48              :   }
      49              : 
      50            0 :   std::string uri = "stream://";
      51            0 :   uri += input_map["bootstrap"].as<std::string>();
      52            0 :   uri += '/';
      53            0 :   uri += input_map["topic"].as<std::string>();
      54              :   
      55            0 :   OpMonManager man( "test",
      56              :                     "application",
      57            0 :                     uri );
      58              : 
      59            0 :   const auto n = input_map["n_threads"].as<unsigned int>() ;
      60            0 :   std::vector<std::shared_ptr<TestObject>> objs(n);
      61            0 :   for ( size_t i = 0; i < n; ++i ) {
      62            0 :     auto p = objs[i] = std::make_shared<TestObject>();
      63            0 :     man.register_node( "element_" + std::to_string(i), p );
      64            0 :   }
      65              : 
      66            0 :   auto pub_func = [&](int i, std::shared_ptr<TestObject> p){
      67              :     
      68            0 :     for (auto j = 0; j < 20; ++j ) {
      69            0 :       dunedaq::opmon::TestInfo ti;
      70            0 :       ti.set_int_example( j*1000 + i );
      71            0 :       ti.set_string_example( "test" );
      72            0 :       p->publish( std::move(ti) );
      73            0 :     }
      74            0 :   };
      75              : 
      76            0 :   std::vector<std::future<void>> threads(n);
      77              :   
      78            0 :   for( size_t i = 0 ; i < n; ++i ) {
      79            0 :     threads[i] = async(std::launch::async, pub_func, i, objs[i]);
      80              :   }
      81              : 
      82            0 :   for ( auto & t : threads ) {
      83            0 :     t.get();
      84              :   }
      85              :   
      86            0 :   std::this_thread::sleep_for(std::chrono::seconds(2));
      87              :   
      88            0 :   return 0;
      89            0 : }
        

Generated by: LCOV version 2.0-1