43 : name_(
name), nthreads_(nthreads), verbose_(false), stop_processing_(true)
45 processing_threads_.resize(nthreads,
nullptr);
48 ManipulationPipeline::~ManipulationPipeline()
53 ManipulationPipeline& ManipulationPipeline::addStage(
const ManipulationStagePtr& next)
55 next->setVerbose(verbose_);
56 stages_.push_back(next);
60 const ManipulationStagePtr& ManipulationPipeline::getFirstStage()
const
64 static const ManipulationStagePtr EMPTY;
68 return stages_.front();
71 const ManipulationStagePtr& ManipulationPipeline::getLastStage()
const
75 static const ManipulationStagePtr EMPTY;
79 return stages_.back();
82 void ManipulationPipeline::reset()
88 void ManipulationPipeline::setVerbose(
bool flag)
91 for (pick_place::ManipulationStagePtr& stage : stages_)
92 stage->setVerbose(flag);
95 void ManipulationPipeline::clear()
99 boost::mutex::scoped_lock slock(queue_access_lock_);
103 boost::mutex::scoped_lock slock(result_lock_);
109 void ManipulationPipeline::start()
111 stop_processing_ =
false;
112 empty_queue_threads_ = 0;
113 for (pick_place::ManipulationStagePtr& stage : stages_)
114 stage->resetStopSignal();
115 for (std::size_t i = 0; i < processing_threads_.size(); ++i)
116 if (!processing_threads_[i])
117 processing_threads_[i] =
new boost::thread([
this, i] { processingThread(i); });
120 void ManipulationPipeline::signalStop()
122 for (pick_place::ManipulationStagePtr& stage : stages_)
124 stop_processing_ =
true;
125 queue_access_cond_.notify_all();
128 void ManipulationPipeline::stop()
131 for (boost::thread*& processing_thread : processing_threads_)
132 if (processing_thread)
134 processing_thread->join();
135 delete processing_thread;
136 processing_thread =
nullptr;
140 void ManipulationPipeline::processingThread(
unsigned int index)
144 while (!stop_processing_)
146 bool inc_queue =
false;
147 boost::unique_lock<boost::mutex> ulock(queue_access_lock_);
149 if (queue_.empty() && !stop_processing_ && empty_queue_callback_)
151 empty_queue_threads_++;
153 if (empty_queue_threads_ == processing_threads_.size())
154 empty_queue_callback_();
156 while (queue_.empty() && !stop_processing_)
157 queue_access_cond_.wait(ulock);
158 while (!stop_processing_ && !queue_.empty())
160 ManipulationPlanPtr g = queue_.front();
164 empty_queue_threads_--;
168 queue_access_lock_.unlock();
171 g->error_code_.val = moveit_msgs::MoveItErrorCodes::FAILURE;
172 for (std::size_t i = 0; !stop_processing_ && i < stages_.size(); ++i)
174 bool res = stages_[i]->evaluate(g);
175 g->processing_stage_ = i + 1;
178 boost::mutex::scoped_lock slock(result_lock_);
179 failed_.push_back(g);
181 << stages_[i]->getName() <<
"' on thread "
186 if (g->error_code_.val == moveit_msgs::MoveItErrorCodes::SUCCESS)
188 g->processing_stage_++;
190 boost::mutex::scoped_lock slock(result_lock_);
191 success_.push_back(g);
195 if (solution_callback_)
196 solution_callback_();
199 catch (std::exception& ex)
201 ROS_ERROR_NAMED(
"manipulation",
"[%s:%u] %s", name_.c_str(), index, ex.what());
203 queue_access_lock_.lock();
208 void ManipulationPipeline::push(
const ManipulationPlanPtr& plan)
210 boost::mutex::scoped_lock slock(queue_access_lock_);
211 queue_.push_back(plan);
213 "Added plan for pipeline '" << name_ <<
"'. Queue is now of size " << queue_.size());
214 queue_access_cond_.notify_all();
217 void ManipulationPipeline::reprocessLastFailure()
219 boost::mutex::scoped_lock slock(queue_access_lock_);
222 ManipulationPlanPtr plan = failed_.back();
225 queue_.push_back(plan);
227 << name_ <<
"'. Queue is now of size " << queue_.size());
228 queue_access_cond_.notify_all();