DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
pipeline.cpp
Go to the documentation of this file.
1
2//
3// copy from IPC
4//
6
7#include "oks/pipeline.hpp"
8#include <boost/bind/bind.hpp>
9
10namespace dunedaq {
11namespace oks {
12
13bool
15{
16 boost::mutex::scoped_lock lock( m_mutex );
17 if ( !m_running && m_idle )
18 {
19 m_job = job;
20 m_idle = false;
21 m_condition.notify_one();
22 return true;
23 }
24 return false;
25}
26
27void
29{
30 {
31 boost::mutex::scoped_lock lock( m_mutex );
32 m_pipeline.m_barrier.wait();
33 }
34 while ( true )
35 {
36 {
37 boost::mutex::scoped_lock lock( m_mutex );
38 m_running = true;
39 }
40 while ( !m_idle ) {
41 m_job->run();
42 delete m_job;
43 m_idle = !m_pipeline.getJob( m_job );
44 }
45 {
46 boost::mutex::scoped_lock lock( m_mutex );
47 m_running = false;
48 if ( m_shutdown )
49 break;
50 if ( m_stop ) {
51 m_pipeline.m_barrier.wait();
52 m_stop = false;
53 }
54 if ( m_idle )
55 m_condition.wait( lock );
56 }
57 }
58}
59
60void
62{
63 boost::mutex::scoped_lock lock( m_mutex );
64 m_shutdown = true;
65 m_condition.notify_one( );
66}
67
68void
70{
71 boost::mutex::scoped_lock lock( m_mutex );
72 m_stop = true;
73 m_condition.notify_one( );
74}
75
77 : m_barrier( size + 1 )
78{
79 for ( size_t i = 0; i < size; ++i )
80 {
81 m_workers.push_back( WorkerPtr( new Worker( *this ) ) );
82 m_pool.create_thread( boost::bind( &Worker::run, m_workers.back().get() ) );
83 }
84 m_barrier.wait();
85}
86
88{
90 for ( size_t i = 0; i < m_workers.size(); ++i )
91 {
92 m_workers[i] -> shutdown();
93 }
94 m_pool.join_all();
95}
96
97void
99{
100 for ( size_t i = 0; i < m_workers.size(); ++i )
101 {
102 if ( m_workers[i] -> setJob( job ) )
103 {
104 return;
105 }
106 }
107 boost::mutex::scoped_lock lock( m_mutex );
108 m_jobs.push( job );
109}
110
111void
113{
114 {
115 boost::mutex::scoped_lock lock( m_mutex );
116 while ( !m_jobs.empty() )
117 {
118 m_condition.wait( lock );
119 }
120 }
121 for ( size_t i = 0; i < m_workers.size(); ++i )
122 {
123 m_workers[i] -> stop();
124 }
125 m_barrier.wait();
126}
127
128bool
130{
131 boost::mutex::scoped_lock lock( m_mutex );
132 if ( !m_jobs.empty() )
133 {
134 job = m_jobs.front();
135 m_jobs.pop();
136 m_condition.notify_one( );
137 return true;
138 }
139 return false;
140}
141
142} // namespace oks
143} // namespace dunedaq
boost::condition m_condition
Definition pipeline.hpp:79
std::queue< OksJob * > m_jobs
Definition pipeline.hpp:83
std::vector< WorkerPtr > m_workers
Definition pipeline.hpp:82
bool getJob(OksJob *&job)
Definition pipeline.cpp:129
boost::thread_group m_pool
Definition pipeline.hpp:81
std::shared_ptr< Worker > WorkerPtr
Definition pipeline.hpp:70
void addJob(OksJob *job)
Definition pipeline.cpp:98
boost::barrier m_barrier
Definition pipeline.hpp:80
Including Qt Headers.
FELIX Initialization std::string initerror FELIX queue timed std::string queuename Unexpected chunk size
Definition __init__.py:1