Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037 #include <moveit/pick_place/manipulation_pipeline.h>
00038 #include <ros/console.h>
00039
00040 namespace pick_place
00041 {
00042 ManipulationPipeline::ManipulationPipeline(const std::string& name, unsigned int nthreads)
00043 : name_(name), nthreads_(nthreads), verbose_(false), stop_processing_(true)
00044 {
00045 processing_threads_.resize(nthreads, NULL);
00046 }
00047
00048 ManipulationPipeline::~ManipulationPipeline()
00049 {
00050 reset();
00051 }
00052
00053 ManipulationPipeline& ManipulationPipeline::addStage(const ManipulationStagePtr& next)
00054 {
00055 next->setVerbose(verbose_);
00056 stages_.push_back(next);
00057 return *this;
00058 }
00059
00060 const ManipulationStagePtr& ManipulationPipeline::getFirstStage() const
00061 {
00062 if (stages_.empty())
00063 {
00064 static const ManipulationStagePtr empty;
00065 return empty;
00066 }
00067 else
00068 return stages_.front();
00069 }
00070
00071 const ManipulationStagePtr& ManipulationPipeline::getLastStage() const
00072 {
00073 if (stages_.empty())
00074 {
00075 static const ManipulationStagePtr empty;
00076 return empty;
00077 }
00078 else
00079 return stages_.back();
00080 }
00081
00082 void ManipulationPipeline::reset()
00083 {
00084 clear();
00085 stages_.clear();
00086 }
00087
00088 void ManipulationPipeline::setVerbose(bool flag)
00089 {
00090 verbose_ = flag;
00091 for (std::size_t i = 0; i < stages_.size(); ++i)
00092 stages_[i]->setVerbose(flag);
00093 }
00094
00095 void ManipulationPipeline::clear()
00096 {
00097 stop();
00098 {
00099 boost::mutex::scoped_lock slock(queue_access_lock_);
00100 queue_.clear();
00101 }
00102 {
00103 boost::mutex::scoped_lock slock(result_lock_);
00104 success_.clear();
00105 failed_.clear();
00106 }
00107 }
00108
00109 void ManipulationPipeline::start()
00110 {
00111 stop_processing_ = false;
00112 empty_queue_threads_ = 0;
00113 for (std::size_t i = 0; i < stages_.size(); ++i)
00114 stages_[i]->resetStopSignal();
00115 for (std::size_t i = 0; i < processing_threads_.size(); ++i)
00116 if (!processing_threads_[i])
00117 processing_threads_[i] = new boost::thread(boost::bind(&ManipulationPipeline::processingThread, this, i));
00118 }
00119
00120 void ManipulationPipeline::signalStop()
00121 {
00122 for (std::size_t i = 0; i < stages_.size(); ++i)
00123 stages_[i]->signalStop();
00124 stop_processing_ = true;
00125 queue_access_cond_.notify_all();
00126 }
00127
00128 void ManipulationPipeline::stop()
00129 {
00130 signalStop();
00131 for (std::size_t i = 0; i < processing_threads_.size(); ++i)
00132 if (processing_threads_[i])
00133 {
00134 processing_threads_[i]->join();
00135 delete processing_threads_[i];
00136 processing_threads_[i] = NULL;
00137 }
00138 }
00139
00140 void ManipulationPipeline::processingThread(unsigned int index)
00141 {
00142 ROS_DEBUG_STREAM_NAMED("manipulation", "Start thread " << index << " for '" << name_ << "'");
00143
00144 while (!stop_processing_)
00145 {
00146 bool inc_queue = false;
00147 boost::unique_lock<boost::mutex> ulock(queue_access_lock_);
00148
00149 if (queue_.empty() && !stop_processing_ && empty_queue_callback_)
00150 {
00151 empty_queue_threads_++;
00152 inc_queue = true;
00153 if (empty_queue_threads_ == processing_threads_.size())
00154 empty_queue_callback_();
00155 }
00156 while (queue_.empty() && !stop_processing_)
00157 queue_access_cond_.wait(ulock);
00158 while (!stop_processing_ && !queue_.empty())
00159 {
00160 ManipulationPlanPtr g = queue_.front();
00161 queue_.pop_front();
00162 if (inc_queue)
00163 {
00164 empty_queue_threads_--;
00165 inc_queue = false;
00166 }
00167
00168 queue_access_lock_.unlock();
00169 try
00170 {
00171 g->error_code_.val = moveit_msgs::MoveItErrorCodes::FAILURE;
00172 for (std::size_t i = 0; !stop_processing_ && i < stages_.size(); ++i)
00173 {
00174 bool res = stages_[i]->evaluate(g);
00175 g->processing_stage_ = i + 1;
00176 if (res == false)
00177 {
00178 boost::mutex::scoped_lock slock(result_lock_);
00179 failed_.push_back(g);
00180 ROS_INFO_STREAM_NAMED("manipulation", "Manipulation plan " << g->id_ << " failed at stage '"
00181 << stages_[i]->getName() << "' on thread "
00182 << index);
00183 break;
00184 }
00185 }
00186 if (g->error_code_.val == moveit_msgs::MoveItErrorCodes::SUCCESS)
00187 {
00188 g->processing_stage_++;
00189 {
00190 boost::mutex::scoped_lock slock(result_lock_);
00191 success_.push_back(g);
00192 }
00193 signalStop();
00194 ROS_INFO_STREAM_NAMED("manipulation", "Found successful manipulation plan!");
00195 if (solution_callback_)
00196 solution_callback_();
00197 }
00198 }
00199 catch (std::runtime_error& ex)
00200 {
00201 ROS_ERROR_NAMED("manipulation", "[%s:%u] %s", name_.c_str(), index, ex.what());
00202 }
00203 catch (...)
00204 {
00205 ROS_ERROR_NAMED("manipulation", "[%s:%u] Caught unknown exception while processing manipulation stage",
00206 name_.c_str(), index);
00207 }
00208 queue_access_lock_.lock();
00209 }
00210 }
00211 }
00212
00213 void ManipulationPipeline::push(const ManipulationPlanPtr& plan)
00214 {
00215 boost::mutex::scoped_lock slock(queue_access_lock_);
00216 queue_.push_back(plan);
00217 ROS_INFO_STREAM_NAMED("manipulation", "Added plan for pipeline '" << name_ << "'. Queue is now of size "
00218 << queue_.size());
00219 queue_access_cond_.notify_all();
00220 }
00221
00222 void ManipulationPipeline::reprocessLastFailure()
00223 {
00224 boost::mutex::scoped_lock slock(queue_access_lock_);
00225 if (failed_.empty())
00226 return;
00227 ManipulationPlanPtr plan = failed_.back();
00228 failed_.pop_back();
00229 plan->clear();
00230 queue_.push_back(plan);
00231 ROS_INFO_STREAM_NAMED("manipulation", "Re-added last failed plan for pipeline '"
00232 << name_ << "'. Queue is now of size " << queue_.size());
00233 queue_access_cond_.notify_all();
00234 }
00235 }