LinearPipeline.hpp
Go to the documentation of this file.
1 /*
2  * Software License Agreement (BSD License)
3  *
4  * Redistribution and use in source and binary forms, with or without
5  * modification, are permitted provided that the following conditions
6  * are met:
7  *
8  * * Redistributions of source code must retain the above copyright
9  * notice, this list of conditions and the following disclaimer.
10  * * Redistributions in binary form must reproduce the above
11  * copyright notice, this list of conditions and the following
12  * disclaimer in the documentation and/or other materials provided
13  * with the distribution.
14  * * Neither the name of Willow Garage, Inc. nor the names of its
15  * contributors may be used to endorse or promote products derived
16  * from this software without specific prior written permission.
17  *
18  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
19  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
20  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
21  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
22  * COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
23  * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
24  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
25  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
26  * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
27  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
28  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29  * POSSIBILITY OF SUCH DAMAGE.
30  *
31  */
32 /*
33  * LinearPipeline.hpp
34  *
35  * @date 13.11.2015
36  * @author Tristan Igelbrink (Tristan@Igelbrink.com)
37  */
38 
39 #ifndef LINEAR_PIPELINE_HPP__
40 #define LINEAR_PIPELINE_HPP__
41 
42 #include "BlockingQueue.hpp"
43 #include "AbstractStage.hpp"
44 #include <boost/bind.hpp>
45 #include <boost/thread.hpp>
46 #include <boost/shared_ptr.hpp>
47 
48 // A class that represents a pipeline that holds numerous stages, and
49 // each stage can be executed concurrently.
50 template<typename WorkTypeA, typename WorkTypeB>
52 {
53 public:
54 
55  // Add a stage to the pipeline. The stages are inserted in order, such
56  // that the first added stage will be the first stage, and so on. The
57  // work queues for each stage will be updated automatically.
58  void AddStage(boost::shared_ptr<AbstractStage > stage)
59  {
60  // add a stage
61  m_stages.push_back(stage);
62  size_t numStage = m_stages.size();
63 
64  // resize the queue accordingly
65  m_queues.resize(numStage+1);
66 
67  // special case for the first stage, where queue[0] needs to
68  // be allocated.
69  if(m_queues[numStage-1] == 0)
70  {
71  m_queues[numStage-1] =
72  boost::shared_ptr<BlockingQueue >(
73  new BlockingQueue()
74  );
75  }
76  // allocate a queue for the new stage
77  m_queues[numStage] =
78  boost::shared_ptr<BlockingQueue >(
79  new BlockingQueue()
80  );
81 
82  // initialize the stage with the in and out queue
83  m_stages[numStage-1]->InitQueues(
84  m_queues[numStage-1], m_queues[numStage]
85  );
86  }
87 
88 
89  // Add work to the first queue, which is the in-queue for the first
90  // stage.
91  void AddWork(WorkTypeA work)
92  {
93  m_queues[0]->Add(work);
94  }
95 
96  // Extract the result from the out-queue of the last stage
97  WorkTypeB GetResult()
98  {
99  return boost::any_cast<WorkTypeB>(m_queues[m_queues.size()-1]->Take());
100  }
101 
102  // Start all stages by spinning up one thread per stage.
103  void Start()
104  {
105  for(size_t i=0; i<m_stages.size(); ++i)
106  {
107  m_threads.push_back(
108  boost::shared_ptr<boost::thread>(new boost::thread(
110  )));
111  }
112  }
113 
114  // join all stages
115  void join()
116  {
117  for(size_t i=0; i<m_stages.size(); ++i)
118  {
119  m_threads[i]->join();
120  }
121  }
122 
123 private:
124 
125  void StartStage(size_t index)
126  {
127  m_stages[index]->Run();
128  }
129 
130  std::vector<
131  boost::shared_ptr<AbstractStage >
133 
134  std::vector<
135  boost::shared_ptr<BlockingQueue >
137 
138  std::vector<
139  boost::shared_ptr<boost::thread>
141 };
142 #endif // LinearPipeline_h__
LinearPipeline::join
void join()
Definition: LinearPipeline.hpp:115
LinearPipeline::GetResult
WorkTypeB GetResult()
Definition: LinearPipeline.hpp:97
LinearPipeline::StartStage
void StartStage(size_t index)
Definition: LinearPipeline.hpp:125
LinearPipeline::Start
void Start()
Definition: LinearPipeline.hpp:103
LinearPipeline::AddStage
void AddStage(boost::shared_ptr< AbstractStage > stage)
Definition: LinearPipeline.hpp:58
LinearPipeline::AddWork
void AddWork(WorkTypeA work)
Definition: LinearPipeline.hpp:91
BlockingQueue.hpp
BlockingQueue
Definition: BlockingQueue.hpp:51
LinearPipeline
Definition: LinearPipeline.hpp:51
LinearPipeline::m_stages
std::vector< boost::shared_ptr< AbstractStage > > m_stages
Definition: LinearPipeline.hpp:132
LinearPipeline::m_queues
std::vector< boost::shared_ptr< BlockingQueue > > m_queues
Definition: LinearPipeline.hpp:136
LinearPipeline::m_threads
std::vector< boost::shared_ptr< boost::thread > > m_threads
Definition: LinearPipeline.hpp:140
AbstractStage.hpp


lvr2
Author(s): Thomas Wiemann , Sebastian Pütz , Alexander Mock , Lars Kiesow , Lukas Kalbertodt , Tristan Igelbrink , Johan M. von Behren , Dominik Feldschnieders , Alexander Löhr
autogenerated on Wed Mar 2 2022 00:37:24