Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037 #include <moveit/pick_place/manipulation_pipeline.h>
00038 #include <ros/console.h>
00039
00040 namespace pick_place
00041 {
00042
00043 ManipulationPipeline::ManipulationPipeline(const std::string &name, unsigned int nthreads) :
00044 name_(name),
00045 nthreads_(nthreads),
00046 verbose_(false),
00047 stop_processing_(true)
00048 {
00049 processing_threads_.resize(nthreads, NULL);
00050 }
00051
00052 ManipulationPipeline::~ManipulationPipeline()
00053 {
00054 reset();
00055 }
00056
00057 ManipulationPipeline& ManipulationPipeline::addStage(const ManipulationStagePtr &next)
00058 {
00059 next->setVerbose(verbose_);
00060 stages_.push_back(next);
00061 return *this;
00062 }
00063
00064 const ManipulationStagePtr& ManipulationPipeline::getFirstStage() const
00065 {
00066 if (stages_.empty())
00067 {
00068 static const ManipulationStagePtr empty;
00069 return empty;
00070 }
00071 else
00072 return stages_.front();
00073 }
00074
00075 const ManipulationStagePtr& ManipulationPipeline::getLastStage() const
00076 {
00077 if (stages_.empty())
00078 {
00079 static const ManipulationStagePtr empty;
00080 return empty;
00081 }
00082 else
00083 return stages_.back();
00084 }
00085
00086 void ManipulationPipeline::reset()
00087 {
00088 clear();
00089 stages_.clear();
00090 }
00091
00092 void ManipulationPipeline::setVerbose(bool flag)
00093 {
00094 verbose_ = flag;
00095 for (std::size_t i = 0 ; i < stages_.size() ; ++i)
00096 stages_[i]->setVerbose(flag);
00097 }
00098
00099 void ManipulationPipeline::clear()
00100 {
00101 stop();
00102 {
00103 boost::mutex::scoped_lock slock(queue_access_lock_);
00104 queue_.clear();
00105 }
00106 {
00107 boost::mutex::scoped_lock slock(result_lock_);
00108 success_.clear();
00109 failed_.clear();
00110 }
00111 }
00112
00113 void ManipulationPipeline::start()
00114 {
00115 stop_processing_ = false;
00116 empty_queue_threads_ = 0;
00117 for (std::size_t i = 0 ; i < stages_.size() ; ++i)
00118 stages_[i]->resetStopSignal();
00119 for (std::size_t i = 0; i < processing_threads_.size() ; ++i)
00120 if (!processing_threads_[i])
00121 processing_threads_[i] = new boost::thread(boost::bind(&ManipulationPipeline::processingThread, this, i));
00122 }
00123
00124 void ManipulationPipeline::signalStop()
00125 {
00126 for (std::size_t i = 0 ; i < stages_.size() ; ++i)
00127 stages_[i]->signalStop();
00128 stop_processing_ = true;
00129 queue_access_cond_.notify_all();
00130 }
00131
00132 void ManipulationPipeline::stop()
00133 {
00134 signalStop();
00135 for (std::size_t i = 0; i < processing_threads_.size() ; ++i)
00136 if (processing_threads_[i])
00137 {
00138 processing_threads_[i]->join();
00139 delete processing_threads_[i];
00140 processing_threads_[i] = NULL;
00141 }
00142 }
00143
00144 void ManipulationPipeline::processingThread(unsigned int index)
00145 {
00146 ROS_DEBUG_STREAM("Start thread " << index << " for '" << name_ << "'");
00147
00148 while (!stop_processing_)
00149 {
00150 bool inc_queue = false;
00151 boost::unique_lock<boost::mutex> ulock(queue_access_lock_);
00152
00153 if (queue_.empty() && !stop_processing_ && empty_queue_callback_)
00154 {
00155 empty_queue_threads_++;
00156 inc_queue = true;
00157 if (empty_queue_threads_ == processing_threads_.size())
00158 empty_queue_callback_();
00159 }
00160 while (queue_.empty() && !stop_processing_)
00161 queue_access_cond_.wait(ulock);
00162 while (!stop_processing_ && !queue_.empty())
00163 {
00164 ManipulationPlanPtr g = queue_.front();
00165 queue_.pop_front();
00166 if (inc_queue)
00167 {
00168 empty_queue_threads_--;
00169 inc_queue = false;
00170 }
00171
00172 queue_access_lock_.unlock();
00173 try
00174 {
00175 g->error_code_.val = moveit_msgs::MoveItErrorCodes::FAILURE;
00176 for (std::size_t i = 0 ; !stop_processing_ && i < stages_.size() ; ++i)
00177 {
00178 bool res = stages_[i]->evaluate(g);
00179 g->processing_stage_ = i + 1;
00180 if (res == false)
00181 {
00182 boost::mutex::scoped_lock slock(result_lock_);
00183 failed_.push_back(g);
00184 ROS_INFO_STREAM("Manipulation plan " << g->id_ << " failed at stage '" << stages_[i]->getName() << "' on thread " << index);
00185 break;
00186 }
00187 }
00188 if (g->error_code_.val == moveit_msgs::MoveItErrorCodes::SUCCESS)
00189 {
00190 g->processing_stage_++;
00191 {
00192 boost::mutex::scoped_lock slock(result_lock_);
00193 success_.push_back(g);
00194 }
00195 signalStop();
00196 ROS_INFO_STREAM("Found successful manipulation plan!");
00197 if (solution_callback_)
00198 solution_callback_();
00199 }
00200 }
00201 catch (std::runtime_error &ex)
00202 {
00203 ROS_ERROR("[%s:%u] %s", name_.c_str(), index, ex.what());
00204 }
00205 catch (...)
00206 {
00207 ROS_ERROR("[%s:%u] Caught unknown exception while processing manipulation stage", name_.c_str(), index);
00208 }
00209 queue_access_lock_.lock();
00210 }
00211 }
00212 }
00213
00214 void ManipulationPipeline::push(const ManipulationPlanPtr &plan)
00215 {
00216 boost::mutex::scoped_lock slock(queue_access_lock_);
00217 queue_.push_back(plan);
00218 ROS_INFO_STREAM("Added plan for pipeline '" << name_ << "'. Queue is now of size " << queue_.size());
00219 queue_access_cond_.notify_all();
00220 }
00221
00222 void ManipulationPipeline::reprocessLastFailure()
00223 {
00224 boost::mutex::scoped_lock slock(queue_access_lock_);
00225 if (failed_.empty())
00226 return;
00227 ManipulationPlanPtr plan = failed_.back();
00228 failed_.pop_back();
00229 plan->clear();
00230 queue_.push_back(plan);
00231 ROS_INFO_STREAM("Re-added last failed plan for pipeline '" << name_ << "'. Queue is now of size " << queue_.size());
00232 queue_access_cond_.notify_all();
00233 }
00234
00235 }