worker.h
Go to the documentation of this file.
00001 //=================================================================================================
00002 // Copyright (c) 2017, Alexander Stumpf, TU Darmstadt
00003 // All rights reserved.
00004 
00005 // Redistribution and use in source and binary forms, with or without
00006 // modification, are permitted provided that the following conditions are met:
00007 //     * Redistributions of source code must retain the above copyright
00008 //       notice, this list of conditions and the following disclaimer.
00009 //     * Redistributions in binary form must reproduce the above copyright
00010 //       notice, this list of conditions and the following disclaimer in the
00011 //       documentation and/or other materials provided with the distribution.
00012 //     * Neither the name of the Simulation, Systems Optimization and Robotics
00013 //       group, TU Darmstadt nor the names of its contributors may be used to
00014 //       endorse or promote products derived from this software without
00015 //       specific prior written permission.
00016 
00017 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
00018 // ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
00019 // WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
00020 // DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER BE LIABLE FOR ANY
00021 // DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
00022 // (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
00023 // LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
00024 // ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
00025 // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
00026 // SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
00027 //=================================================================================================
00028 
00029 #ifndef VIGIR_FOOTSTEP_PLANNING_THREADING_WORKER_H__
00030 #define VIGIR_FOOTSTEP_PLANNING_THREADING_WORKER_H__
00031 
00032 #include <ros/ros.h>
00033 
00034 #include <boost/thread.hpp>
00035 
00036 #include <vigir_footstep_planning_lib/threading/queue.h>
00037 
00038 
00039 
00040 namespace vigir_footstep_planning
00041 {
00042 namespace threading
00043 {
00044 template <class T>
00045 class Worker
00046 {
00047 public:
00048   Worker(Queue<T>& queue, unsigned int jobs_per_thread = 10, bool auto_start = true)
00049     : queue(queue)
00050     , jobs_per_thread(jobs_per_thread)
00051     , exit(false)
00052   {
00053     if (auto_start)
00054       start();
00055   }
00056 
00057   virtual ~Worker()
00058   {
00059     ROS_INFO_STREAM("[Worker (" << boost::this_thread::get_id() <<")] Destruct");
00060     stop();
00061   }
00062 
00063   void start()
00064   {
00065     ROS_INFO_STREAM("[Worker (" << boost::this_thread::get_id() <<")] Start request");
00066     exit = false;
00067     thread = boost::thread(&Worker::run, this, jobs_per_thread);
00068   }
00069 
00070   void stop()
00071   {
00072     ROS_INFO_STREAM("[Worker (" << boost::this_thread::get_id() <<")] Stop request");
00073 
00074     // soft stop
00075     interruptJobs();
00076     exit = true;
00077 
00078     // hard stop
00079     thread.interrupt();
00080 
00081     thread.join();
00082   }
00083 
00084   void interruptJobs()
00085   {
00086     boost::mutex::scoped_lock lock(run_jobs_mutex);
00087     run_jobs = false;
00088   }
00089 
00090 protected:
00091   void run(unsigned int n)
00092   {
00093     ROS_INFO_STREAM("[Worker (" << boost::this_thread::get_id() <<")] Started with " << n << " jobs per thread.");
00094     std::vector<boost::shared_ptr<T> > jobs;
00095     while (!exit)
00096     {
00097       queue.waitAndDequeueJobs(jobs, n);
00098       {
00099         boost::mutex::scoped_lock lock(run_jobs_mutex);
00100         run_jobs = true;
00101       }
00102 
00103       ROS_DEBUG_STREAM("[Worker (" << boost::this_thread::get_id() <<")] Deqeued " << jobs.size() << " jobs.");
00104       for (size_t i = 0; i < jobs.size() && run_jobs; i++)
00105       {
00106         boost::this_thread::interruption_point();
00107         jobs[i]->run();
00108       }
00109 
00110       {
00111         boost::mutex::scoped_lock lock(run_jobs_mutex);
00112         if (run_jobs)
00113           queue.justFinishedJobs(jobs.size());
00114       }
00115     }
00116     ROS_INFO_STREAM("[Worker (" << boost::this_thread::get_id() <<")] Stopped!");
00117   }
00118 
00119   boost::thread thread;
00120   mutable boost::mutex run_jobs_mutex;
00121 
00122   Queue<T>& queue;
00123   unsigned int jobs_per_thread;
00124   bool exit;
00125   bool run_jobs;
00126 };
00127 }
00128 }
00129 
00130 #endif


vigir_footstep_planning_lib
Author(s): Alexander Stumpf
autogenerated on Sat Jul 15 2017 02:47:56