LCOV - code coverage report
Current view: top level - oks/src - pipeline.cpp (source / functions) Coverage Total Hit
Test: code.result Lines: 90.1 % 81 73
Test Date: 2026-03-29 15:29:34 Functions: 100.0 % 9 9

            Line data    Source code
       1              : ////////////////////////////////////////////////////////////////////////////
       2              : //
       3              : //    copy from IPC
       4              : //
       5              : ////////////////////////////////////////////////////////////////////////////
       6              : 
       7              : #include "oks/pipeline.hpp"
       8              : #include <boost/bind/bind.hpp>
       9              : 
      10              : namespace dunedaq {
      11              : namespace oks {
      12              : 
      13              : bool
      14          157 : OksPipeline::Worker::setJob( OksJob * job )
      15              : {
      16          157 :     boost::mutex::scoped_lock lock( m_mutex );
      17          157 :     if ( !m_running && m_idle )
      18              :     {
      19          117 :         m_job = job;
      20          117 :         m_idle = false;
      21          117 :         m_condition.notify_one();
      22          117 :         return true;
      23              :     }
      24              :     return false;
      25          157 : }
      26              : 
      27              : void
      28         9856 : OksPipeline::Worker::run( )
      29              : {
      30         9856 :     {
      31         9856 :         boost::mutex::scoped_lock lock( m_mutex );
      32         9856 :         m_pipeline.m_barrier.wait();
      33         9848 :     }
      34        19669 :     while ( true )
      35              :     {
      36        29517 :         {
      37        29517 :             boost::mutex::scoped_lock lock( m_mutex );
      38        29486 :             m_running = true;
      39        29486 :         }
      40        29647 :         while ( !m_idle ) {
      41          121 :             m_job->run();
      42          117 :             delete m_job;
      43          117 :             m_idle = !m_pipeline.getJob( m_job );
      44              :         }
      45        29526 :         {
      46        29526 :             boost::mutex::scoped_lock lock( m_mutex );
      47        29503 :             m_running = false;
      48        29503 :             if ( m_shutdown ) 
      49              :                 break;
      50        19670 :             if ( m_stop ) {
      51         9818 :                 m_pipeline.m_barrier.wait();
      52         9846 :                 m_stop = false;
      53              :             }
      54        19698 :             if ( m_idle )
      55        19672 :                 m_condition.wait( lock ); 
      56        29531 :         }
      57              :     }
      58         9834 : }
      59              : 
      60              : void
      61         9856 : OksPipeline::Worker::shutdown( )
      62              : {
      63         9856 :     boost::mutex::scoped_lock lock( m_mutex );
      64         9856 :     m_shutdown = true;
      65         9856 :     m_condition.notify_one( );
      66         9856 : }
      67              : 
      68              : void
      69         9856 : OksPipeline::Worker::stop( )
      70              : {
      71         9856 :     boost::mutex::scoped_lock lock( m_mutex );
      72         9856 :     m_stop = true;
      73         9856 :     m_condition.notify_one( );
      74         9856 : }
      75              : 
      76           77 : OksPipeline::OksPipeline( size_t size )
      77           77 :   : m_barrier( size + 1 )
      78              : {
      79         9933 :     for ( size_t i = 0; i < size; ++i )
      80              :     {
      81         9856 :         m_workers.push_back( WorkerPtr( new Worker( *this ) ) );
      82         9856 :         m_pool.create_thread( boost::bind( &Worker::run, m_workers.back().get() ) );
      83              :     }
      84           77 :     m_barrier.wait();
      85           77 : }
      86              :  
      87           77 : OksPipeline::~OksPipeline()
      88              : {
      89           77 :     waitForCompletion();
      90         9933 :     for ( size_t i = 0; i < m_workers.size(); ++i )
      91              :     {
      92         9856 :         m_workers[i] -> shutdown();
      93              :     }
      94           77 :     m_pool.join_all();
      95           77 : }
      96              : 
      97              : void
      98          117 : OksPipeline::addJob( OksJob * job )
      99              : {
     100          157 :     for ( size_t i = 0; i < m_workers.size(); ++i )
     101              :     {
     102          157 :         if ( m_workers[i] -> setJob( job ) )
     103              :         {
     104          117 :             return;
     105              :         }
     106              :     }
     107            0 :     boost::mutex::scoped_lock lock( m_mutex );
     108            0 :     m_jobs.push( job );
     109            0 : }
     110              : 
     111              : void
     112           77 : OksPipeline::waitForCompletion()
     113              : {
     114           77 :     {
     115           77 :         boost::mutex::scoped_lock lock( m_mutex );
     116           77 :         while ( !m_jobs.empty() )
     117              :         {
     118            0 :             m_condition.wait( lock );
     119              :         }
     120           77 :     }
     121         9933 :     for ( size_t i = 0; i < m_workers.size(); ++i )
     122              :     {
     123         9856 :         m_workers[i] -> stop();
     124              :     }
     125           77 :     m_barrier.wait();
     126           77 : }
     127              : 
     128              : bool
     129          117 : OksPipeline::getJob( OksJob* & job )
     130              : {
     131          117 :     boost::mutex::scoped_lock lock( m_mutex );
     132          117 :     if ( !m_jobs.empty() )
     133              :     {
     134            0 :         job = m_jobs.front();
     135            0 :         m_jobs.pop();
     136            0 :         m_condition.notify_one( );
     137            0 :         return true;
     138              :     }
     139              :     return false;
     140          117 : }
     141              : 
     142              : } // namespace oks
     143              : } // namespace dunedaq
        

Generated by: LCOV version 2.0-1