DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
dunedaq
sourcecode
oks
src
pipeline.cpp
Go to the documentation of this file.
1
2
//
3
// copy from IPC
4
//
6
7
#include "
oks/pipeline.hpp
"
8
#include <boost/bind/bind.hpp>
9
10
namespace
dunedaq
{
11
namespace
oks
{
12
13
bool
14
OksPipeline::Worker::setJob
(
OksJob
* job )
15
{
16
boost::mutex::scoped_lock lock(
m_mutex
);
17
if
( !
m_running
&&
m_idle
)
18
{
19
m_job
= job;
20
m_idle
=
false
;
21
m_condition
.notify_one();
22
return
true
;
23
}
24
return
false
;
25
}
26
27
void
28
OksPipeline::Worker::run
( )
29
{
30
{
31
boost::mutex::scoped_lock lock(
m_mutex
);
32
m_pipeline.m_barrier.wait();
33
}
34
while
(
true
)
35
{
36
{
37
boost::mutex::scoped_lock lock(
m_mutex
);
38
m_running =
true
;
39
}
40
while
( !m_idle ) {
41
m_job->run();
42
delete
m_job;
43
m_idle = !m_pipeline.getJob( m_job );
44
}
45
{
46
boost::mutex::scoped_lock lock(
m_mutex
);
47
m_running =
false
;
48
if
( m_shutdown )
49
break
;
50
if
( m_stop ) {
51
m_pipeline.m_barrier.wait();
52
m_stop =
false
;
53
}
54
if
( m_idle )
55
m_condition
.wait( lock );
56
}
57
}
58
}
59
60
void
61
OksPipeline::Worker::shutdown
( )
62
{
63
boost::mutex::scoped_lock lock(
m_mutex
);
64
m_shutdown =
true
;
65
m_condition
.notify_one( );
66
}
67
68
void
69
OksPipeline::Worker::stop
( )
70
{
71
boost::mutex::scoped_lock lock(
m_mutex
);
72
m_stop =
true
;
73
m_condition
.notify_one( );
74
}
75
76
OksPipeline::OksPipeline
(
size_t
size
)
77
:
m_barrier
(
size
+ 1 )
78
{
79
for
(
size_t
i = 0; i <
size
; ++i )
80
{
81
m_workers
.push_back(
WorkerPtr
(
new
Worker
( *
this
) ) );
82
m_pool
.create_thread( boost::bind( &
Worker::run
,
m_workers
.back().get() ) );
83
}
84
m_barrier
.wait();
85
}
86
87
OksPipeline::~OksPipeline
()
88
{
89
waitForCompletion
();
90
for
(
size_t
i = 0; i <
m_workers
.size(); ++i )
91
{
92
m_workers
[i] -> shutdown();
93
}
94
m_pool
.join_all();
95
}
96
97
void
98
OksPipeline::addJob
(
OksJob
* job )
99
{
100
for
(
size_t
i = 0; i <
m_workers
.size(); ++i )
101
{
102
if
(
m_workers
[i] -> setJob( job ) )
103
{
104
return
;
105
}
106
}
107
boost::mutex::scoped_lock lock(
m_mutex
);
108
m_jobs
.push( job );
109
}
110
111
void
112
OksPipeline::waitForCompletion
()
113
{
114
{
115
boost::mutex::scoped_lock lock(
m_mutex
);
116
while
( !
m_jobs
.empty() )
117
{
118
m_condition
.wait( lock );
119
}
120
}
121
for
(
size_t
i = 0; i <
m_workers
.size(); ++i )
122
{
123
m_workers
[i] -> stop();
124
}
125
m_barrier
.wait();
126
}
127
128
bool
129
OksPipeline::getJob
(
OksJob
* & job )
130
{
131
boost::mutex::scoped_lock lock(
m_mutex
);
132
if
( !
m_jobs
.empty() )
133
{
134
job =
m_jobs
.front();
135
m_jobs
.pop();
136
m_condition
.notify_one( );
137
return
true
;
138
}
139
return
false
;
140
}
141
142
}
// namespace oks
143
}
// namespace dunedaq
dunedaq::oks::OksJob
Definition
pipeline.hpp:20
dunedaq::oks::OksPipeline::m_condition
boost::condition m_condition
Definition
pipeline.hpp:79
dunedaq::oks::OksPipeline::waitForCompletion
void waitForCompletion()
Definition
pipeline.cpp:112
dunedaq::oks::OksPipeline::Worker
friend struct Worker
Definition
pipeline.hpp:73
dunedaq::oks::OksPipeline::m_jobs
std::queue< OksJob * > m_jobs
Definition
pipeline.hpp:83
dunedaq::oks::OksPipeline::m_workers
std::vector< WorkerPtr > m_workers
Definition
pipeline.hpp:82
dunedaq::oks::OksPipeline::m_mutex
boost::mutex m_mutex
Definition
pipeline.hpp:78
dunedaq::oks::OksPipeline::getJob
bool getJob(OksJob *&job)
Definition
pipeline.cpp:129
dunedaq::oks::OksPipeline::m_pool
boost::thread_group m_pool
Definition
pipeline.hpp:81
dunedaq::oks::OksPipeline::~OksPipeline
~OksPipeline()
Definition
pipeline.cpp:87
dunedaq::oks::OksPipeline::WorkerPtr
std::shared_ptr< Worker > WorkerPtr
Definition
pipeline.hpp:70
dunedaq::oks::OksPipeline::addJob
void addJob(OksJob *job)
Definition
pipeline.cpp:98
dunedaq::oks::OksPipeline::OksPipeline
OksPipeline(size_t size)
Definition
pipeline.cpp:76
dunedaq::oks::OksPipeline::m_barrier
boost::barrier m_barrier
Definition
pipeline.hpp:80
dunedaq
Including Qt Headers.
Definition
TimingController.hxx:1
dunedaq::size
FELIX Initialization std::string initerror FELIX queue timed std::string queuename Unexpected chunk size
Definition
FelixIssues.hpp:28
oks
Definition
__init__.py:1
pipeline.hpp
dunedaq::oks::OksPipeline::Worker::shutdown
void shutdown()
Definition
pipeline.cpp:61
dunedaq::oks::OksPipeline::Worker::stop
void stop()
Definition
pipeline.cpp:69
dunedaq::oks::OksPipeline::Worker::m_running
bool m_running
Definition
pipeline.hpp:66
dunedaq::oks::OksPipeline::Worker::m_mutex
boost::mutex m_mutex
Definition
pipeline.hpp:61
dunedaq::oks::OksPipeline::Worker::setJob
bool setJob(OksJob *job)
Definition
pipeline.cpp:14
dunedaq::oks::OksPipeline::Worker::m_condition
boost::condition m_condition
Definition
pipeline.hpp:62
dunedaq::oks::OksPipeline::Worker::m_idle
bool m_idle
Definition
pipeline.hpp:63
dunedaq::oks::OksPipeline::Worker::m_job
OksJob * m_job
Definition
pipeline.hpp:67
dunedaq::oks::OksPipeline::Worker::run
void run()
Definition
pipeline.cpp:28
Generated on Sat Jun 28 2025 for DUNE-DAQ by
1.12.0