DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
dunedaq::oks::OksPipeline Class Reference

#include <pipeline.hpp>

Classes

struct  Worker
 

Public Member Functions

 OksPipeline (size_t size)
 
 ~OksPipeline ()
 
void waitForCompletion ()
 
void addJob (OksJob *job)
 

Private Types

typedef std::shared_ptr< WorkerWorkerPtr
 

Private Member Functions

bool getJob (OksJob *&job)
 

Private Attributes

boost::mutex m_mutex
 
boost::condition m_condition
 
boost::barrier m_barrier
 
boost::thread_group m_pool
 
std::vector< WorkerPtrm_workers
 
std::queue< OksJob * > m_jobs
 

Friends

struct Worker
 

Detailed Description

Definition at line 28 of file pipeline.hpp.

Member Typedef Documentation

◆ WorkerPtr

std::shared_ptr<Worker> dunedaq::oks::OksPipeline::WorkerPtr
private

Definition at line 70 of file pipeline.hpp.

Constructor & Destructor Documentation

◆ OksPipeline()

dunedaq::oks::OksPipeline::OksPipeline ( size_t size)

Definition at line 76 of file pipeline.cpp.

77 : m_barrier( size + 1 )
78{
79 for ( size_t i = 0; i < size; ++i )
80 {
81 m_workers.push_back( WorkerPtr( new Worker( *this ) ) );
82 m_pool.create_thread( boost::bind( &Worker::run, m_workers.back().get() ) );
83 }
84 m_barrier.wait();
85}
std::vector< WorkerPtr > m_workers
Definition pipeline.hpp:82
boost::thread_group m_pool
Definition pipeline.hpp:81
std::shared_ptr< Worker > WorkerPtr
Definition pipeline.hpp:70
boost::barrier m_barrier
Definition pipeline.hpp:80
FELIX Initialization std::string initerror FELIX queue timed std::string queuename Unexpected chunk size

◆ ~OksPipeline()

dunedaq::oks::OksPipeline::~OksPipeline ( )

Definition at line 87 of file pipeline.cpp.

88{
90 for ( size_t i = 0; i < m_workers.size(); ++i )
91 {
92 m_workers[i] -> shutdown();
93 }
94 m_pool.join_all();
95}

Member Function Documentation

◆ addJob()

void dunedaq::oks::OksPipeline::addJob ( OksJob * job)

Definition at line 98 of file pipeline.cpp.

99{
100 for ( size_t i = 0; i < m_workers.size(); ++i )
101 {
102 if ( m_workers[i] -> setJob( job ) )
103 {
104 return;
105 }
106 }
107 boost::mutex::scoped_lock lock( m_mutex );
108 m_jobs.push( job );
109}
std::queue< OksJob * > m_jobs
Definition pipeline.hpp:83

◆ getJob()

bool dunedaq::oks::OksPipeline::getJob ( OksJob *& job)
private

Definition at line 129 of file pipeline.cpp.

130{
131 boost::mutex::scoped_lock lock( m_mutex );
132 if ( !m_jobs.empty() )
133 {
134 job = m_jobs.front();
135 m_jobs.pop();
136 m_condition.notify_one( );
137 return true;
138 }
139 return false;
140}
boost::condition m_condition
Definition pipeline.hpp:79

◆ waitForCompletion()

void dunedaq::oks::OksPipeline::waitForCompletion ( )

Definition at line 112 of file pipeline.cpp.

113{
114 {
115 boost::mutex::scoped_lock lock( m_mutex );
116 while ( !m_jobs.empty() )
117 {
118 m_condition.wait( lock );
119 }
120 }
121 for ( size_t i = 0; i < m_workers.size(); ++i )
122 {
123 m_workers[i] -> stop();
124 }
125 m_barrier.wait();
126}

Friends And Related Symbol Documentation

◆ Worker

friend struct Worker
friend

Definition at line 73 of file pipeline.hpp.

Member Data Documentation

◆ m_barrier

boost::barrier dunedaq::oks::OksPipeline::m_barrier
private

Definition at line 80 of file pipeline.hpp.

◆ m_condition

boost::condition dunedaq::oks::OksPipeline::m_condition
private

Definition at line 79 of file pipeline.hpp.

◆ m_jobs

std::queue<OksJob*> dunedaq::oks::OksPipeline::m_jobs
private

Definition at line 83 of file pipeline.hpp.

◆ m_mutex

boost::mutex dunedaq::oks::OksPipeline::m_mutex
private

Definition at line 78 of file pipeline.hpp.

◆ m_pool

boost::thread_group dunedaq::oks::OksPipeline::m_pool
private

Definition at line 81 of file pipeline.hpp.

◆ m_workers

std::vector<WorkerPtr> dunedaq::oks::OksPipeline::m_workers
private

Definition at line 82 of file pipeline.hpp.


The documentation for this class was generated from the following files: