manipulation_pipeline.cpp
Go to the documentation of this file.
00001 /*********************************************************************
00002 * Software License Agreement (BSD License)
00003 *
00004 *  Copyright (c) 2012, Willow Garage, Inc.
00005 *  All rights reserved.
00006 *
00007 *  Redistribution and use in source and binary forms, with or without
00008 *  modification, are permitted provided that the following conditions
00009 *  are met:
00010 *
00011 *   * Redistributions of source code must retain the above copyright
00012 *     notice, this list of conditions and the following disclaimer.
00013 *   * Redistributions in binary form must reproduce the above
00014 *     copyright notice, this list of conditions and the following
00015 *     disclaimer in the documentation and/or other materials provided
00016 *     with the distribution.
00017 *   * Neither the name of Willow Garage nor the names of its
00018 *     contributors may be used to endorse or promote products derived
00019 *     from this software without specific prior written permission.
00020 *
00021 *  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
00022 *  "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
00023 *  LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
00024 *  FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
00025 *  COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
00026 *  INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
00027 *  BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
00028 *  LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
00029 *  CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
00030 *  LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
00031 *  ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00032 *  POSSIBILITY OF SUCH DAMAGE.
00033 *********************************************************************/
00034 
00035 /* Author: Ioan Sucan */
00036 
00037 #include <moveit/pick_place/manipulation_pipeline.h>
00038 #include <ros/console.h>
00039 
00040 namespace pick_place
00041 {
00042 ManipulationPipeline::ManipulationPipeline(const std::string& name, unsigned int nthreads)
00043   : name_(name), nthreads_(nthreads), verbose_(false), stop_processing_(true)
00044 {
00045   processing_threads_.resize(nthreads, NULL);
00046 }
00047 
00048 ManipulationPipeline::~ManipulationPipeline()
00049 {
00050   reset();
00051 }
00052 
00053 ManipulationPipeline& ManipulationPipeline::addStage(const ManipulationStagePtr& next)
00054 {
00055   next->setVerbose(verbose_);
00056   stages_.push_back(next);
00057   return *this;
00058 }
00059 
00060 const ManipulationStagePtr& ManipulationPipeline::getFirstStage() const
00061 {
00062   if (stages_.empty())
00063   {
00064     static const ManipulationStagePtr empty;
00065     return empty;
00066   }
00067   else
00068     return stages_.front();
00069 }
00070 
00071 const ManipulationStagePtr& ManipulationPipeline::getLastStage() const
00072 {
00073   if (stages_.empty())
00074   {
00075     static const ManipulationStagePtr empty;
00076     return empty;
00077   }
00078   else
00079     return stages_.back();
00080 }
00081 
00082 void ManipulationPipeline::reset()
00083 {
00084   clear();
00085   stages_.clear();
00086 }
00087 
00088 void ManipulationPipeline::setVerbose(bool flag)
00089 {
00090   verbose_ = flag;
00091   for (std::size_t i = 0; i < stages_.size(); ++i)
00092     stages_[i]->setVerbose(flag);
00093 }
00094 
00095 void ManipulationPipeline::clear()
00096 {
00097   stop();
00098   {
00099     boost::mutex::scoped_lock slock(queue_access_lock_);
00100     queue_.clear();
00101   }
00102   {
00103     boost::mutex::scoped_lock slock(result_lock_);
00104     success_.clear();
00105     failed_.clear();
00106   }
00107 }
00108 
00109 void ManipulationPipeline::start()
00110 {
00111   stop_processing_ = false;
00112   empty_queue_threads_ = 0;
00113   for (std::size_t i = 0; i < stages_.size(); ++i)
00114     stages_[i]->resetStopSignal();
00115   for (std::size_t i = 0; i < processing_threads_.size(); ++i)
00116     if (!processing_threads_[i])
00117       processing_threads_[i] = new boost::thread(boost::bind(&ManipulationPipeline::processingThread, this, i));
00118 }
00119 
00120 void ManipulationPipeline::signalStop()
00121 {
00122   for (std::size_t i = 0; i < stages_.size(); ++i)
00123     stages_[i]->signalStop();
00124   stop_processing_ = true;
00125   queue_access_cond_.notify_all();
00126 }
00127 
00128 void ManipulationPipeline::stop()
00129 {
00130   signalStop();
00131   for (std::size_t i = 0; i < processing_threads_.size(); ++i)
00132     if (processing_threads_[i])
00133     {
00134       processing_threads_[i]->join();
00135       delete processing_threads_[i];
00136       processing_threads_[i] = NULL;
00137     }
00138 }
00139 
00140 void ManipulationPipeline::processingThread(unsigned int index)
00141 {
00142   ROS_DEBUG_STREAM_NAMED("manipulation", "Start thread " << index << " for '" << name_ << "'");
00143 
00144   while (!stop_processing_)
00145   {
00146     bool inc_queue = false;
00147     boost::unique_lock<boost::mutex> ulock(queue_access_lock_);
00148     // if the queue is empty, we trigger the corresponding event
00149     if (queue_.empty() && !stop_processing_ && empty_queue_callback_)
00150     {
00151       empty_queue_threads_++;
00152       inc_queue = true;
00153       if (empty_queue_threads_ == processing_threads_.size())
00154         empty_queue_callback_();
00155     }
00156     while (queue_.empty() && !stop_processing_)
00157       queue_access_cond_.wait(ulock);
00158     while (!stop_processing_ && !queue_.empty())
00159     {
00160       ManipulationPlanPtr g = queue_.front();
00161       queue_.pop_front();
00162       if (inc_queue)
00163       {
00164         empty_queue_threads_--;
00165         inc_queue = false;
00166       }
00167 
00168       queue_access_lock_.unlock();
00169       try
00170       {
00171         g->error_code_.val = moveit_msgs::MoveItErrorCodes::FAILURE;
00172         for (std::size_t i = 0; !stop_processing_ && i < stages_.size(); ++i)
00173         {
00174           bool res = stages_[i]->evaluate(g);
00175           g->processing_stage_ = i + 1;
00176           if (res == false)
00177           {
00178             boost::mutex::scoped_lock slock(result_lock_);
00179             failed_.push_back(g);
00180             ROS_INFO_STREAM_NAMED("manipulation", "Manipulation plan " << g->id_ << " failed at stage '"
00181                                                                        << stages_[i]->getName() << "' on thread "
00182                                                                        << index);
00183             break;
00184           }
00185         }
00186         if (g->error_code_.val == moveit_msgs::MoveItErrorCodes::SUCCESS)
00187         {
00188           g->processing_stage_++;
00189           {
00190             boost::mutex::scoped_lock slock(result_lock_);
00191             success_.push_back(g);
00192           }
00193           signalStop();
00194           ROS_INFO_STREAM_NAMED("manipulation", "Found successful manipulation plan!");
00195           if (solution_callback_)
00196             solution_callback_();
00197         }
00198       }
00199       catch (std::runtime_error& ex)
00200       {
00201         ROS_ERROR_NAMED("manipulation", "[%s:%u] %s", name_.c_str(), index, ex.what());
00202       }
00203       catch (...)
00204       {
00205         ROS_ERROR_NAMED("manipulation", "[%s:%u] Caught unknown exception while processing manipulation stage",
00206                         name_.c_str(), index);
00207       }
00208       queue_access_lock_.lock();
00209     }
00210   }
00211 }
00212 
00213 void ManipulationPipeline::push(const ManipulationPlanPtr& plan)
00214 {
00215   boost::mutex::scoped_lock slock(queue_access_lock_);
00216   queue_.push_back(plan);
00217   ROS_INFO_STREAM_NAMED("manipulation", "Added plan for pipeline '" << name_ << "'. Queue is now of size "
00218                                                                     << queue_.size());
00219   queue_access_cond_.notify_all();
00220 }
00221 
00222 void ManipulationPipeline::reprocessLastFailure()
00223 {
00224   boost::mutex::scoped_lock slock(queue_access_lock_);
00225   if (failed_.empty())
00226     return;
00227   ManipulationPlanPtr plan = failed_.back();
00228   failed_.pop_back();
00229   plan->clear();
00230   queue_.push_back(plan);
00231   ROS_INFO_STREAM_NAMED("manipulation", "Re-added last failed plan for pipeline '"
00232                                             << name_ << "'. Queue is now of size " << queue_.size());
00233   queue_access_cond_.notify_all();
00234 }
00235 }


manipulation
Author(s): Ioan Sucan , Sachin Chitta
autogenerated on Mon Jul 24 2017 02:21:50