Line data Source code
1 : ////////////////////////////////////////////////////////////////////////////
2 : //
3 : // copy from IPC
4 : //
5 : ////////////////////////////////////////////////////////////////////////////
6 :
7 : #include "oks/pipeline.hpp"
8 : #include <boost/bind/bind.hpp>
9 :
10 : namespace dunedaq {
11 : namespace oks {
12 :
13 : bool
14 157 : OksPipeline::Worker::setJob( OksJob * job )
15 : {
16 157 : boost::mutex::scoped_lock lock( m_mutex );
17 157 : if ( !m_running && m_idle )
18 : {
19 117 : m_job = job;
20 117 : m_idle = false;
21 117 : m_condition.notify_one();
22 117 : return true;
23 : }
24 : return false;
25 157 : }
26 :
27 : void
28 9856 : OksPipeline::Worker::run( )
29 : {
30 9856 : {
31 9856 : boost::mutex::scoped_lock lock( m_mutex );
32 9856 : m_pipeline.m_barrier.wait();
33 9848 : }
34 19669 : while ( true )
35 : {
36 29517 : {
37 29517 : boost::mutex::scoped_lock lock( m_mutex );
38 29486 : m_running = true;
39 29486 : }
40 29647 : while ( !m_idle ) {
41 121 : m_job->run();
42 117 : delete m_job;
43 117 : m_idle = !m_pipeline.getJob( m_job );
44 : }
45 29526 : {
46 29526 : boost::mutex::scoped_lock lock( m_mutex );
47 29503 : m_running = false;
48 29503 : if ( m_shutdown )
49 : break;
50 19670 : if ( m_stop ) {
51 9818 : m_pipeline.m_barrier.wait();
52 9846 : m_stop = false;
53 : }
54 19698 : if ( m_idle )
55 19672 : m_condition.wait( lock );
56 29531 : }
57 : }
58 9834 : }
59 :
60 : void
61 9856 : OksPipeline::Worker::shutdown( )
62 : {
63 9856 : boost::mutex::scoped_lock lock( m_mutex );
64 9856 : m_shutdown = true;
65 9856 : m_condition.notify_one( );
66 9856 : }
67 :
68 : void
69 9856 : OksPipeline::Worker::stop( )
70 : {
71 9856 : boost::mutex::scoped_lock lock( m_mutex );
72 9856 : m_stop = true;
73 9856 : m_condition.notify_one( );
74 9856 : }
75 :
76 77 : OksPipeline::OksPipeline( size_t size )
77 77 : : m_barrier( size + 1 )
78 : {
79 9933 : for ( size_t i = 0; i < size; ++i )
80 : {
81 9856 : m_workers.push_back( WorkerPtr( new Worker( *this ) ) );
82 9856 : m_pool.create_thread( boost::bind( &Worker::run, m_workers.back().get() ) );
83 : }
84 77 : m_barrier.wait();
85 77 : }
86 :
87 77 : OksPipeline::~OksPipeline()
88 : {
89 77 : waitForCompletion();
90 9933 : for ( size_t i = 0; i < m_workers.size(); ++i )
91 : {
92 9856 : m_workers[i] -> shutdown();
93 : }
94 77 : m_pool.join_all();
95 77 : }
96 :
97 : void
98 117 : OksPipeline::addJob( OksJob * job )
99 : {
100 157 : for ( size_t i = 0; i < m_workers.size(); ++i )
101 : {
102 157 : if ( m_workers[i] -> setJob( job ) )
103 : {
104 117 : return;
105 : }
106 : }
107 0 : boost::mutex::scoped_lock lock( m_mutex );
108 0 : m_jobs.push( job );
109 0 : }
110 :
111 : void
112 77 : OksPipeline::waitForCompletion()
113 : {
114 77 : {
115 77 : boost::mutex::scoped_lock lock( m_mutex );
116 77 : while ( !m_jobs.empty() )
117 : {
118 0 : m_condition.wait( lock );
119 : }
120 77 : }
121 9933 : for ( size_t i = 0; i < m_workers.size(); ++i )
122 : {
123 9856 : m_workers[i] -> stop();
124 : }
125 77 : m_barrier.wait();
126 77 : }
127 :
128 : bool
129 117 : OksPipeline::getJob( OksJob* & job )
130 : {
131 117 : boost::mutex::scoped_lock lock( m_mutex );
132 117 : if ( !m_jobs.empty() )
133 : {
134 0 : job = m_jobs.front();
135 0 : m_jobs.pop();
136 0 : m_condition.notify_one( );
137 0 : return true;
138 : }
139 : return false;
140 117 : }
141 :
142 : } // namespace oks
143 : } // namespace dunedaq
|