Line data Source code
1 : ////////////////////////////////////////////////////////////////////////////
2 : //
3 : // copy from IPC
4 : //
5 : ////////////////////////////////////////////////////////////////////////////
6 :
7 : #include "oks/pipeline.hpp"
8 : #include <boost/bind/bind.hpp>
9 :
10 : namespace dunedaq {
11 : namespace oks {
12 :
13 : bool
14 156 : OksPipeline::Worker::setJob( OksJob * job )
15 : {
16 156 : boost::mutex::scoped_lock lock( m_mutex );
17 156 : if ( !m_running && m_idle )
18 : {
19 116 : m_job = job;
20 116 : m_idle = false;
21 116 : m_condition.notify_one();
22 116 : return true;
23 : }
24 : return false;
25 156 : }
26 :
27 : void
28 9728 : OksPipeline::Worker::run( )
29 : {
30 9728 : {
31 9728 : boost::mutex::scoped_lock lock( m_mutex );
32 9728 : m_pipeline.m_barrier.wait();
33 9693 : }
34 19057 : while ( true )
35 : {
36 28757 : {
37 28757 : boost::mutex::scoped_lock lock( m_mutex );
38 28594 : m_running = true;
39 28594 : }
40 29018 : while ( !m_idle ) {
41 172 : m_job->run();
42 116 : delete m_job;
43 116 : m_idle = !m_pipeline.getJob( m_job );
44 : }
45 28846 : {
46 28846 : boost::mutex::scoped_lock lock( m_mutex );
47 28642 : m_running = false;
48 28642 : if ( m_shutdown )
49 : break;
50 18936 : if ( m_stop ) {
51 9265 : m_pipeline.m_barrier.wait();
52 9699 : m_stop = false;
53 : }
54 19370 : if ( m_idle )
55 19373 : m_condition.wait( lock );
56 28999 : }
57 : }
58 9707 : }
59 :
60 : void
61 9728 : OksPipeline::Worker::shutdown( )
62 : {
63 9728 : boost::mutex::scoped_lock lock( m_mutex );
64 9728 : m_shutdown = true;
65 9728 : m_condition.notify_one( );
66 9728 : }
67 :
68 : void
69 9728 : OksPipeline::Worker::stop( )
70 : {
71 9728 : boost::mutex::scoped_lock lock( m_mutex );
72 9728 : m_stop = true;
73 9728 : m_condition.notify_one( );
74 9728 : }
75 :
76 76 : OksPipeline::OksPipeline( size_t size )
77 76 : : m_barrier( size + 1 )
78 : {
79 9804 : for ( size_t i = 0; i < size; ++i )
80 : {
81 9728 : m_workers.push_back( WorkerPtr( new Worker( *this ) ) );
82 9728 : m_pool.create_thread( boost::bind( &Worker::run, m_workers.back().get() ) );
83 : }
84 76 : m_barrier.wait();
85 76 : }
86 :
87 76 : OksPipeline::~OksPipeline()
88 : {
89 76 : waitForCompletion();
90 9804 : for ( size_t i = 0; i < m_workers.size(); ++i )
91 : {
92 9728 : m_workers[i] -> shutdown();
93 : }
94 76 : m_pool.join_all();
95 76 : }
96 :
97 : void
98 116 : OksPipeline::addJob( OksJob * job )
99 : {
100 156 : for ( size_t i = 0; i < m_workers.size(); ++i )
101 : {
102 156 : if ( m_workers[i] -> setJob( job ) )
103 : {
104 116 : return;
105 : }
106 : }
107 0 : boost::mutex::scoped_lock lock( m_mutex );
108 0 : m_jobs.push( job );
109 0 : }
110 :
111 : void
112 76 : OksPipeline::waitForCompletion()
113 : {
114 76 : {
115 76 : boost::mutex::scoped_lock lock( m_mutex );
116 76 : while ( !m_jobs.empty() )
117 : {
118 0 : m_condition.wait( lock );
119 : }
120 76 : }
121 9804 : for ( size_t i = 0; i < m_workers.size(); ++i )
122 : {
123 9728 : m_workers[i] -> stop();
124 : }
125 76 : m_barrier.wait();
126 76 : }
127 :
128 : bool
129 116 : OksPipeline::getJob( OksJob* & job )
130 : {
131 116 : boost::mutex::scoped_lock lock( m_mutex );
132 116 : if ( !m_jobs.empty() )
133 : {
134 0 : job = m_jobs.front();
135 0 : m_jobs.pop();
136 0 : m_condition.notify_one( );
137 0 : return true;
138 : }
139 : return false;
140 116 : }
141 :
142 : } // namespace oks
143 : } // namespace dunedaq
|