LCOV - code coverage report
Current view: top level - oks/src - pipeline.cpp (source / functions) Coverage Total Hit
Test: code.result Lines: 90.1 % 81 73
Test Date: 2025-12-21 13:07:08 Functions: 100.0 % 9 9

            Line data    Source code
       1              : ////////////////////////////////////////////////////////////////////////////
       2              : //
       3              : //    copy from IPC
       4              : //
       5              : ////////////////////////////////////////////////////////////////////////////
       6              : 
       7              : #include "oks/pipeline.hpp"
       8              : #include <boost/bind/bind.hpp>
       9              : 
      10              : namespace dunedaq {
      11              : namespace oks {
      12              : 
      13              : bool
      14          156 : OksPipeline::Worker::setJob( OksJob * job )
      15              : {
      16          156 :     boost::mutex::scoped_lock lock( m_mutex );
      17          156 :     if ( !m_running && m_idle )
      18              :     {
      19          116 :         m_job = job;
      20          116 :         m_idle = false;
      21          116 :         m_condition.notify_one();
      22          116 :         return true;
      23              :     }
      24              :     return false;
      25          156 : }
      26              : 
      27              : void
      28         9728 : OksPipeline::Worker::run( )
      29              : {
      30         9728 :     {
      31         9728 :         boost::mutex::scoped_lock lock( m_mutex );
      32         9728 :         m_pipeline.m_barrier.wait();
      33         9693 :     }
      34        19057 :     while ( true )
      35              :     {
      36        28757 :         {
      37        28757 :             boost::mutex::scoped_lock lock( m_mutex );
      38        28594 :             m_running = true;
      39        28594 :         }
      40        29018 :         while ( !m_idle ) {
      41          172 :             m_job->run();
      42          116 :             delete m_job;
      43          116 :             m_idle = !m_pipeline.getJob( m_job );
      44              :         }
      45        28846 :         {
      46        28846 :             boost::mutex::scoped_lock lock( m_mutex );
      47        28642 :             m_running = false;
      48        28642 :             if ( m_shutdown ) 
      49              :                 break;
      50        18936 :             if ( m_stop ) {
      51         9265 :                 m_pipeline.m_barrier.wait();
      52         9699 :                 m_stop = false;
      53              :             }
      54        19370 :             if ( m_idle )
      55        19373 :                 m_condition.wait( lock ); 
      56        28999 :         }
      57              :     }
      58         9707 : }
      59              : 
      60              : void
      61         9728 : OksPipeline::Worker::shutdown( )
      62              : {
      63         9728 :     boost::mutex::scoped_lock lock( m_mutex );
      64         9728 :     m_shutdown = true;
      65         9728 :     m_condition.notify_one( );
      66         9728 : }
      67              : 
      68              : void
      69         9728 : OksPipeline::Worker::stop( )
      70              : {
      71         9728 :     boost::mutex::scoped_lock lock( m_mutex );
      72         9728 :     m_stop = true;
      73         9728 :     m_condition.notify_one( );
      74         9728 : }
      75              : 
      76           76 : OksPipeline::OksPipeline( size_t size )
      77           76 :   : m_barrier( size + 1 )
      78              : {
      79         9804 :     for ( size_t i = 0; i < size; ++i )
      80              :     {
      81         9728 :         m_workers.push_back( WorkerPtr( new Worker( *this ) ) );
      82         9728 :         m_pool.create_thread( boost::bind( &Worker::run, m_workers.back().get() ) );
      83              :     }
      84           76 :     m_barrier.wait();
      85           76 : }
      86              :  
      87           76 : OksPipeline::~OksPipeline()
      88              : {
      89           76 :     waitForCompletion();
      90         9804 :     for ( size_t i = 0; i < m_workers.size(); ++i )
      91              :     {
      92         9728 :         m_workers[i] -> shutdown();
      93              :     }
      94           76 :     m_pool.join_all();
      95           76 : }
      96              : 
      97              : void
      98          116 : OksPipeline::addJob( OksJob * job )
      99              : {
     100          156 :     for ( size_t i = 0; i < m_workers.size(); ++i )
     101              :     {
     102          156 :         if ( m_workers[i] -> setJob( job ) )
     103              :         {
     104          116 :             return;
     105              :         }
     106              :     }
     107            0 :     boost::mutex::scoped_lock lock( m_mutex );
     108            0 :     m_jobs.push( job );
     109            0 : }
     110              : 
     111              : void
     112           76 : OksPipeline::waitForCompletion()
     113              : {
     114           76 :     {
     115           76 :         boost::mutex::scoped_lock lock( m_mutex );
     116           76 :         while ( !m_jobs.empty() )
     117              :         {
     118            0 :             m_condition.wait( lock );
     119              :         }
     120           76 :     }
     121         9804 :     for ( size_t i = 0; i < m_workers.size(); ++i )
     122              :     {
     123         9728 :         m_workers[i] -> stop();
     124              :     }
     125           76 :     m_barrier.wait();
     126           76 : }
     127              : 
     128              : bool
     129          116 : OksPipeline::getJob( OksJob* & job )
     130              : {
     131          116 :     boost::mutex::scoped_lock lock( m_mutex );
     132          116 :     if ( !m_jobs.empty() )
     133              :     {
     134            0 :         job = m_jobs.front();
     135            0 :         m_jobs.pop();
     136            0 :         m_condition.notify_one( );
     137            0 :         return true;
     138              :     }
     139              :     return false;
     140          116 : }
     141              : 
     142              : } // namespace oks
     143              : } // namespace dunedaq
        

Generated by: LCOV version 2.0-1