manipulation_pipeline.cpp
Go to the documentation of this file.
1 /*********************************************************************
2  * Software License Agreement (BSD License)
3  *
4  * Copyright (c) 2012, Willow Garage, Inc.
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  *
11  * * Redistributions of source code must retain the above copyright
12  * notice, this list of conditions and the following disclaimer.
13  * * Redistributions in binary form must reproduce the above
14  * copyright notice, this list of conditions and the following
15  * disclaimer in the documentation and/or other materials provided
16  * with the distribution.
17  * * Neither the name of Willow Garage nor the names of its
18  * contributors may be used to endorse or promote products derived
19  * from this software without specific prior written permission.
20  *
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
25  * COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26  * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
27  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
29  * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
31  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
32  * POSSIBILITY OF SUCH DAMAGE.
33  *********************************************************************/
34 
35 /* Author: Ioan Sucan */
36 
38 #include <ros/console.h>
39 
40 namespace pick_place
41 {
42 ManipulationPipeline::ManipulationPipeline(const std::string& name, unsigned int nthreads)
43  : name_(name), nthreads_(nthreads), verbose_(false), stop_processing_(true)
44 {
45  processing_threads_.resize(nthreads, nullptr);
46 }
47 
48 ManipulationPipeline::~ManipulationPipeline()
49 {
50  reset();
51 }
52 
53 ManipulationPipeline& ManipulationPipeline::addStage(const ManipulationStagePtr& next)
54 {
55  next->setVerbose(verbose_);
56  stages_.push_back(next);
57  return *this;
58 }
59 
60 const ManipulationStagePtr& ManipulationPipeline::getFirstStage() const
61 {
62  if (stages_.empty())
63  {
64  static const ManipulationStagePtr EMPTY;
65  return EMPTY;
66  }
67  else
68  return stages_.front();
69 }
70 
71 const ManipulationStagePtr& ManipulationPipeline::getLastStage() const
72 {
73  if (stages_.empty())
74  {
75  static const ManipulationStagePtr EMPTY;
76  return EMPTY;
77  }
78  else
79  return stages_.back();
80 }
81 
82 void ManipulationPipeline::reset()
83 {
84  clear();
85  stages_.clear();
86 }
87 
88 void ManipulationPipeline::setVerbose(bool flag)
89 {
90  verbose_ = flag;
91  for (pick_place::ManipulationStagePtr& stage : stages_)
92  stage->setVerbose(flag);
93 }
94 
95 void ManipulationPipeline::clear()
96 {
97  stop();
98  {
99  boost::mutex::scoped_lock slock(queue_access_lock_);
100  queue_.clear();
101  }
102  {
103  boost::mutex::scoped_lock slock(result_lock_);
104  success_.clear();
105  failed_.clear();
106  }
107 }
108 
109 void ManipulationPipeline::start()
110 {
111  stop_processing_ = false;
112  empty_queue_threads_ = 0;
113  for (pick_place::ManipulationStagePtr& stage : stages_)
114  stage->resetStopSignal();
115  for (std::size_t i = 0; i < processing_threads_.size(); ++i)
116  if (!processing_threads_[i])
117  processing_threads_[i] = new boost::thread([this, i] { processingThread(i); });
118 }
119 
120 void ManipulationPipeline::signalStop()
121 {
122  for (pick_place::ManipulationStagePtr& stage : stages_)
123  stage->signalStop();
124  stop_processing_ = true;
125  queue_access_cond_.notify_all();
126 }
127 
128 void ManipulationPipeline::stop()
129 {
130  signalStop();
131  for (boost::thread*& processing_thread : processing_threads_)
132  if (processing_thread)
133  {
134  processing_thread->join();
135  delete processing_thread;
136  processing_thread = nullptr;
137  }
138 }
139 
140 void ManipulationPipeline::processingThread(unsigned int index)
141 {
142  ROS_DEBUG_STREAM_NAMED("manipulation", "Start thread " << index << " for '" << name_ << "'");
143 
144  while (!stop_processing_)
145  {
146  bool inc_queue = false;
147  boost::unique_lock<boost::mutex> ulock(queue_access_lock_);
148  // if the queue is empty, we trigger the corresponding event
149  if (queue_.empty() && !stop_processing_ && empty_queue_callback_)
150  {
151  empty_queue_threads_++;
152  inc_queue = true;
153  if (empty_queue_threads_ == processing_threads_.size())
154  empty_queue_callback_();
155  }
156  while (queue_.empty() && !stop_processing_)
157  queue_access_cond_.wait(ulock);
158  while (!stop_processing_ && !queue_.empty())
159  {
160  ManipulationPlanPtr g = queue_.front();
161  queue_.pop_front();
162  if (inc_queue)
163  {
164  empty_queue_threads_--;
165  inc_queue = false;
166  }
167 
168  queue_access_lock_.unlock();
169  try
170  {
171  g->error_code_.val = moveit_msgs::MoveItErrorCodes::FAILURE;
172  for (std::size_t i = 0; !stop_processing_ && i < stages_.size(); ++i)
173  {
174  bool res = stages_[i]->evaluate(g);
175  g->processing_stage_ = i + 1;
176  if (!res)
177  {
178  boost::mutex::scoped_lock slock(result_lock_);
179  failed_.push_back(g);
180  ROS_INFO_STREAM_NAMED("manipulation", "Manipulation plan " << g->id_ << " failed at stage '"
181  << stages_[i]->getName() << "' on thread "
182  << index);
183  break;
184  }
185  }
186  if (g->error_code_.val == moveit_msgs::MoveItErrorCodes::SUCCESS)
187  {
188  g->processing_stage_++;
189  {
190  boost::mutex::scoped_lock slock(result_lock_);
191  success_.push_back(g);
192  }
193  signalStop();
194  ROS_INFO_STREAM_NAMED("manipulation", "Found successful manipulation plan!");
195  if (solution_callback_)
196  solution_callback_();
197  }
198  }
199  catch (std::exception& ex)
200  {
201  ROS_ERROR_NAMED("manipulation", "[%s:%u] %s", name_.c_str(), index, ex.what());
202  }
203  queue_access_lock_.lock();
204  }
205  }
206 }
207 
208 void ManipulationPipeline::push(const ManipulationPlanPtr& plan)
209 {
210  boost::mutex::scoped_lock slock(queue_access_lock_);
211  queue_.push_back(plan);
212  ROS_INFO_STREAM_NAMED("manipulation",
213  "Added plan for pipeline '" << name_ << "'. Queue is now of size " << queue_.size());
214  queue_access_cond_.notify_all();
215 }
216 
217 void ManipulationPipeline::reprocessLastFailure()
218 {
219  boost::mutex::scoped_lock slock(queue_access_lock_);
220  if (failed_.empty())
221  return;
222  ManipulationPlanPtr plan = failed_.back();
223  failed_.pop_back();
224  plan->clear();
225  queue_.push_back(plan);
226  ROS_INFO_STREAM_NAMED("manipulation", "Re-added last failed plan for pipeline '"
227  << name_ << "'. Queue is now of size " << queue_.size());
228  queue_access_cond_.notify_all();
229 }
230 } // namespace pick_place
pick_place
Definition: approach_and_translate_stage.h:43
ROS_DEBUG_STREAM_NAMED
#define ROS_DEBUG_STREAM_NAMED(name, args)
ROS_ERROR_NAMED
#define ROS_ERROR_NAMED(name,...)
console.h
name
std::string name
next
EndPoint * next[3]
pick_place::ManipulationPipeline::ManipulationPipeline
ManipulationPipeline(const std::string &name, unsigned int nthreads)
Definition: manipulation_pipeline.cpp:74
index
unsigned int index
ROS_INFO_STREAM_NAMED
#define ROS_INFO_STREAM_NAMED(name, args)
manipulation_pipeline.h


manipulation
Author(s): Ioan Sucan , Sachin Chitta
autogenerated on Thu Jan 9 2025 03:25:03