manipulation_pipeline.cpp
Go to the documentation of this file.
1 /*********************************************************************
2 * Software License Agreement (BSD License)
3 *
4 * Copyright (c) 2012, Willow Garage, Inc.
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions
9 * are met:
10 *
11 * * Redistributions of source code must retain the above copyright
12 * notice, this list of conditions and the following disclaimer.
13 * * Redistributions in binary form must reproduce the above
14 * copyright notice, this list of conditions and the following
15 * disclaimer in the documentation and/or other materials provided
16 * with the distribution.
17 * * Neither the name of Willow Garage nor the names of its
18 * contributors may be used to endorse or promote products derived
19 * from this software without specific prior written permission.
20 *
21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
25 * COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
27 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
29 * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
31 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
32 * POSSIBILITY OF SUCH DAMAGE.
33 *********************************************************************/
34 
35 /* Author: Ioan Sucan */
36 
38 #include <ros/console.h>
39 
40 namespace pick_place
41 {
42 ManipulationPipeline::ManipulationPipeline(const std::string& name, unsigned int nthreads)
43  : name_(name), nthreads_(nthreads), verbose_(false), stop_processing_(true)
44 {
45  processing_threads_.resize(nthreads, NULL);
46 }
47 
49 {
50  reset();
51 }
52 
53 ManipulationPipeline& ManipulationPipeline::addStage(const ManipulationStagePtr& next)
54 {
55  next->setVerbose(verbose_);
56  stages_.push_back(next);
57  return *this;
58 }
59 
60 const ManipulationStagePtr& ManipulationPipeline::getFirstStage() const
61 {
62  if (stages_.empty())
63  {
64  static const ManipulationStagePtr empty;
65  return empty;
66  }
67  else
68  return stages_.front();
69 }
70 
71 const ManipulationStagePtr& ManipulationPipeline::getLastStage() const
72 {
73  if (stages_.empty())
74  {
75  static const ManipulationStagePtr empty;
76  return empty;
77  }
78  else
79  return stages_.back();
80 }
81 
83 {
84  clear();
85  stages_.clear();
86 }
87 
89 {
90  verbose_ = flag;
91  for (std::size_t i = 0; i < stages_.size(); ++i)
92  stages_[i]->setVerbose(flag);
93 }
94 
96 {
97  stop();
98  {
99  boost::mutex::scoped_lock slock(queue_access_lock_);
100  queue_.clear();
101  }
102  {
103  boost::mutex::scoped_lock slock(result_lock_);
104  success_.clear();
105  failed_.clear();
106  }
107 }
108 
110 {
111  stop_processing_ = false;
113  for (std::size_t i = 0; i < stages_.size(); ++i)
114  stages_[i]->resetStopSignal();
115  for (std::size_t i = 0; i < processing_threads_.size(); ++i)
116  if (!processing_threads_[i])
117  processing_threads_[i] = new boost::thread(boost::bind(&ManipulationPipeline::processingThread, this, i));
118 }
119 
121 {
122  for (std::size_t i = 0; i < stages_.size(); ++i)
123  stages_[i]->signalStop();
124  stop_processing_ = true;
125  queue_access_cond_.notify_all();
126 }
127 
129 {
130  signalStop();
131  for (std::size_t i = 0; i < processing_threads_.size(); ++i)
132  if (processing_threads_[i])
133  {
134  processing_threads_[i]->join();
135  delete processing_threads_[i];
136  processing_threads_[i] = NULL;
137  }
138 }
139 
141 {
142  ROS_DEBUG_STREAM_NAMED("manipulation", "Start thread " << index << " for '" << name_ << "'");
143 
144  while (!stop_processing_)
145  {
146  bool inc_queue = false;
147  boost::unique_lock<boost::mutex> ulock(queue_access_lock_);
148  // if the queue is empty, we trigger the corresponding event
149  if (queue_.empty() && !stop_processing_ && empty_queue_callback_)
150  {
152  inc_queue = true;
155  }
156  while (queue_.empty() && !stop_processing_)
157  queue_access_cond_.wait(ulock);
158  while (!stop_processing_ && !queue_.empty())
159  {
160  ManipulationPlanPtr g = queue_.front();
161  queue_.pop_front();
162  if (inc_queue)
163  {
165  inc_queue = false;
166  }
167 
168  queue_access_lock_.unlock();
169  try
170  {
171  g->error_code_.val = moveit_msgs::MoveItErrorCodes::FAILURE;
172  for (std::size_t i = 0; !stop_processing_ && i < stages_.size(); ++i)
173  {
174  bool res = stages_[i]->evaluate(g);
175  g->processing_stage_ = i + 1;
176  if (res == false)
177  {
178  boost::mutex::scoped_lock slock(result_lock_);
179  failed_.push_back(g);
180  ROS_INFO_STREAM_NAMED("manipulation", "Manipulation plan " << g->id_ << " failed at stage '"
181  << stages_[i]->getName() << "' on thread "
182  << index);
183  break;
184  }
185  }
186  if (g->error_code_.val == moveit_msgs::MoveItErrorCodes::SUCCESS)
187  {
188  g->processing_stage_++;
189  {
190  boost::mutex::scoped_lock slock(result_lock_);
191  success_.push_back(g);
192  }
193  signalStop();
194  ROS_INFO_STREAM_NAMED("manipulation", "Found successful manipulation plan!");
195  if (solution_callback_)
197  }
198  }
199  catch (std::exception& ex)
200  {
201  ROS_ERROR_NAMED("manipulation", "[%s:%u] %s", name_.c_str(), index, ex.what());
202  }
203  queue_access_lock_.lock();
204  }
205  }
206 }
207 
208 void ManipulationPipeline::push(const ManipulationPlanPtr& plan)
209 {
210  boost::mutex::scoped_lock slock(queue_access_lock_);
211  queue_.push_back(plan);
212  ROS_INFO_STREAM_NAMED("manipulation", "Added plan for pipeline '" << name_ << "'. Queue is now of size "
213  << queue_.size());
214  queue_access_cond_.notify_all();
215 }
216 
218 {
219  boost::mutex::scoped_lock slock(queue_access_lock_);
220  if (failed_.empty())
221  return;
222  ManipulationPlanPtr plan = failed_.back();
223  failed_.pop_back();
224  plan->clear();
225  queue_.push_back(plan);
226  ROS_INFO_STREAM_NAMED("manipulation", "Re-added last failed plan for pipeline '"
227  << name_ << "'. Queue is now of size " << queue_.size());
228  queue_access_cond_.notify_all();
229 }
230 }
boost::function< void()> empty_queue_callback_
#define ROS_DEBUG_STREAM_NAMED(name, args)
std::vector< ManipulationStagePtr > stages_
void processingThread(unsigned int index)
Represent the sequence of steps that are executed for a manipulation plan.
std::vector< ManipulationPlanPtr > failed_
#define ROS_INFO_STREAM_NAMED(name, args)
const ManipulationStagePtr & getLastStage() const
const ManipulationStagePtr & getFirstStage() const
std::vector< boost::thread * > processing_threads_
std::deque< ManipulationPlanPtr > queue_
ManipulationPipeline(const std::string &name, unsigned int nthreads)
void push(const ManipulationPlanPtr &grasp)
std::vector< ManipulationPlanPtr > success_
boost::function< void()> solution_callback_
#define ROS_ERROR_NAMED(name,...)
ManipulationPipeline & addStage(const ManipulationStagePtr &next)
boost::condition_variable queue_access_cond_


manipulation
Author(s): Ioan Sucan , Sachin Chitta
autogenerated on Sun Oct 18 2020 13:18:23