manipulation_pipeline.cpp
Go to the documentation of this file.
00001 /*********************************************************************
00002 * Software License Agreement (BSD License)
00003 *
00004 *  Copyright (c) 2012, Willow Garage, Inc.
00005 *  All rights reserved.
00006 *
00007 *  Redistribution and use in source and binary forms, with or without
00008 *  modification, are permitted provided that the following conditions
00009 *  are met:
00010 *
00011 *   * Redistributions of source code must retain the above copyright
00012 *     notice, this list of conditions and the following disclaimer.
00013 *   * Redistributions in binary form must reproduce the above
00014 *     copyright notice, this list of conditions and the following
00015 *     disclaimer in the documentation and/or other materials provided
00016 *     with the distribution.
00017 *   * Neither the name of Willow Garage nor the names of its
00018 *     contributors may be used to endorse or promote products derived
00019 *     from this software without specific prior written permission.
00020 *
00021 *  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
00022 *  "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
00023 *  LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
00024 *  FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
00025 *  COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
00026 *  INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
00027 *  BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
00028 *  LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
00029 *  CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
00030 *  LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
00031 *  ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00032 *  POSSIBILITY OF SUCH DAMAGE.
00033 *********************************************************************/
00034 
00035 /* Author: Ioan Sucan */
00036 
00037 #include <moveit/pick_place/manipulation_pipeline.h>
00038 #include <ros/console.h>
00039 
00040 namespace pick_place
00041 {
00042 
00043 ManipulationPipeline::ManipulationPipeline(const std::string &name, unsigned int nthreads) :
00044   name_(name),
00045   nthreads_(nthreads),
00046   verbose_(false),
00047   stop_processing_(true)
00048 {
00049   processing_threads_.resize(nthreads, NULL);
00050 }
00051 
00052 ManipulationPipeline::~ManipulationPipeline()
00053 {
00054   reset();
00055 }
00056 
00057 ManipulationPipeline& ManipulationPipeline::addStage(const ManipulationStagePtr &next)
00058 {
00059   next->setVerbose(verbose_);
00060   stages_.push_back(next);
00061   return *this;
00062 }
00063 
00064 const ManipulationStagePtr& ManipulationPipeline::getFirstStage() const
00065 {
00066   if (stages_.empty())
00067   {
00068     static const ManipulationStagePtr empty;
00069     return empty;
00070   }
00071   else
00072     return stages_.front();
00073 }
00074 
00075 const ManipulationStagePtr& ManipulationPipeline::getLastStage() const
00076 {
00077   if (stages_.empty())
00078   {
00079     static const ManipulationStagePtr empty;
00080     return empty;
00081   }
00082   else
00083     return stages_.back();
00084 }
00085 
00086 void ManipulationPipeline::reset()
00087 {
00088   clear();
00089   stages_.clear();
00090 }
00091 
00092 void ManipulationPipeline::setVerbose(bool flag)
00093 {
00094   verbose_ = flag;
00095   for (std::size_t i = 0 ; i < stages_.size() ; ++i)
00096     stages_[i]->setVerbose(flag);
00097 }
00098 
00099 void ManipulationPipeline::clear()
00100 {
00101   stop();
00102   {
00103     boost::mutex::scoped_lock slock(queue_access_lock_);
00104     queue_.clear();
00105   }
00106   {
00107     boost::mutex::scoped_lock slock(result_lock_);
00108     success_.clear();
00109     failed_.clear();
00110   }
00111 }
00112 
00113 void ManipulationPipeline::start()
00114 {
00115   stop_processing_ = false;
00116   empty_queue_threads_ = 0;
00117   for (std::size_t i = 0 ; i < stages_.size() ; ++i)
00118     stages_[i]->resetStopSignal();
00119   for (std::size_t i = 0; i < processing_threads_.size() ; ++i)
00120     if (!processing_threads_[i])
00121       processing_threads_[i] = new boost::thread(boost::bind(&ManipulationPipeline::processingThread, this, i));
00122 }
00123 
00124 void ManipulationPipeline::signalStop()
00125 {
00126   for (std::size_t i = 0 ; i < stages_.size() ; ++i)
00127     stages_[i]->signalStop();
00128   stop_processing_ = true;
00129   queue_access_cond_.notify_all();
00130 }
00131 
00132 void ManipulationPipeline::stop()
00133 {
00134   signalStop();
00135   for (std::size_t i = 0; i < processing_threads_.size() ; ++i)
00136     if (processing_threads_[i])
00137     {
00138       processing_threads_[i]->join();
00139       delete processing_threads_[i];
00140       processing_threads_[i] = NULL;
00141     }
00142 }
00143 
00144 void ManipulationPipeline::processingThread(unsigned int index)
00145 {
00146   ROS_DEBUG_STREAM("Start thread " << index << " for '" << name_ << "'");
00147   
00148   while (!stop_processing_)
00149   {
00150     bool inc_queue = false;
00151     boost::unique_lock<boost::mutex> ulock(queue_access_lock_);
00152     // if the queue is empty, we trigger the corresponding event
00153     if (queue_.empty() && !stop_processing_ && empty_queue_callback_)
00154     {
00155       empty_queue_threads_++;
00156       inc_queue = true;
00157       if (empty_queue_threads_ == processing_threads_.size())
00158         empty_queue_callback_();
00159     }
00160     while (queue_.empty() && !stop_processing_)
00161       queue_access_cond_.wait(ulock);
00162     while (!stop_processing_ && !queue_.empty())
00163     {
00164       ManipulationPlanPtr g = queue_.front();
00165       queue_.pop_front();
00166       if (inc_queue)
00167       {
00168         empty_queue_threads_--;
00169         inc_queue = false;
00170       }
00171       
00172       queue_access_lock_.unlock();
00173       try
00174       {
00175         g->error_code_.val = moveit_msgs::MoveItErrorCodes::FAILURE;
00176         for (std::size_t i = 0 ; !stop_processing_ && i < stages_.size() ; ++i)
00177         {
00178           bool res = stages_[i]->evaluate(g);
00179           g->processing_stage_ = i + 1;
00180           if (res == false)
00181           {
00182             boost::mutex::scoped_lock slock(result_lock_);
00183             failed_.push_back(g);
00184             ROS_INFO_STREAM("Manipulation plan " << g->id_ << " failed at stage '" << stages_[i]->getName() << "' on thread " << index);
00185             break;
00186           }
00187         }
00188         if (g->error_code_.val == moveit_msgs::MoveItErrorCodes::SUCCESS)
00189         {
00190           g->processing_stage_++;
00191           {
00192             boost::mutex::scoped_lock slock(result_lock_);
00193             success_.push_back(g);
00194           }
00195           signalStop();
00196           ROS_INFO_STREAM("Found successful manipulation plan!");
00197           if (solution_callback_)
00198             solution_callback_();
00199         }
00200       }
00201       catch (std::runtime_error &ex)
00202       {
00203         ROS_ERROR("[%s:%u] %s", name_.c_str(), index, ex.what());
00204       }
00205       catch (...)
00206       {
00207         ROS_ERROR("[%s:%u] Caught unknown exception while processing manipulation stage", name_.c_str(), index);
00208       }
00209       queue_access_lock_.lock();
00210     }
00211   }
00212 }
00213 
00214 void ManipulationPipeline::push(const ManipulationPlanPtr &plan)
00215 {
00216   boost::mutex::scoped_lock slock(queue_access_lock_);
00217   queue_.push_back(plan);
00218   ROS_INFO_STREAM("Added plan for pipeline '" << name_ << "'. Queue is now of size " << queue_.size());
00219   queue_access_cond_.notify_all();
00220 }
00221 
00222 void ManipulationPipeline::reprocessLastFailure()
00223 {
00224   boost::mutex::scoped_lock slock(queue_access_lock_);
00225   if (failed_.empty())
00226     return;
00227   ManipulationPlanPtr plan = failed_.back();
00228   failed_.pop_back();
00229   plan->clear();
00230   queue_.push_back(plan);
00231   ROS_INFO_STREAM("Re-added last failed plan for pipeline '" << name_ << "'. Queue is now of size " << queue_.size());
00232   queue_access_cond_.notify_all();
00233 }
00234 
00235 }


manipulation
Author(s): Ioan Sucan , Sachin Chitta , Sachin Chitta
autogenerated on Wed Aug 26 2015 12:44:04