00001 //================================================================================================= 00002 // Copyright (c) 2017, Alexander Stumpf, TU Darmstadt 00003 // All rights reserved. 00004 00005 // Redistribution and use in source and binary forms, with or without 00006 // modification, are permitted provided that the following conditions are met: 00007 // * Redistributions of source code must retain the above copyright 00008 // notice, this list of conditions and the following disclaimer. 00009 // * Redistributions in binary form must reproduce the above copyright 00010 // notice, this list of conditions and the following disclaimer in the 00011 // documentation and/or other materials provided with the distribution. 00012 // * Neither the name of the Simulation, Systems Optimization and Robotics 00013 // group, TU Darmstadt nor the names of its contributors may be used to 00014 // endorse or promote products derived from this software without 00015 // specific prior written permission. 00016 00017 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND 00018 // ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED 00019 // WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE 00020 // DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER BE LIABLE FOR ANY 00021 // DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES 00022 // (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 00023 // LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND 00024 // ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 00025 // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 00026 // SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 00027 //================================================================================================= 00028 00029 #ifndef VIGIR_FOOTSTEP_PLANNING_THREADING_WORKER_H__ 00030 #define VIGIR_FOOTSTEP_PLANNING_THREADING_WORKER_H__ 00031 00032 #include <ros/ros.h> 00033 00034 #include <boost/thread.hpp> 00035 00036 #include <vigir_footstep_planning_lib/threading/queue.h> 00037 00038 00039 00040 namespace vigir_footstep_planning 00041 { 00042 namespace threading 00043 { 00044 template <class T> 00045 class Worker 00046 { 00047 public: 00048 Worker(Queue<T>& queue, unsigned int jobs_per_thread = 10, bool auto_start = true) 00049 : queue(queue) 00050 , jobs_per_thread(jobs_per_thread) 00051 , exit(false) 00052 { 00053 if (auto_start) 00054 start(); 00055 } 00056 00057 virtual ~Worker() 00058 { 00059 ROS_INFO_STREAM("[Worker (" << boost::this_thread::get_id() <<")] Destruct"); 00060 stop(); 00061 } 00062 00063 void start() 00064 { 00065 ROS_INFO_STREAM("[Worker (" << boost::this_thread::get_id() <<")] Start request"); 00066 exit = false; 00067 thread = boost::thread(&Worker::run, this, jobs_per_thread); 00068 } 00069 00070 void stop() 00071 { 00072 ROS_INFO_STREAM("[Worker (" << boost::this_thread::get_id() <<")] Stop request"); 00073 00074 // soft stop 00075 interruptJobs(); 00076 exit = true; 00077 00078 // hard stop 00079 thread.interrupt(); 00080 00081 thread.join(); 00082 } 00083 00084 void interruptJobs() 00085 { 00086 boost::mutex::scoped_lock lock(run_jobs_mutex); 00087 run_jobs = false; 00088 } 00089 00090 protected: 00091 void run(unsigned int n) 00092 { 00093 ROS_INFO_STREAM("[Worker (" << boost::this_thread::get_id() <<")] Started with " << n << " jobs per thread."); 00094 std::vector<boost::shared_ptr<T> > jobs; 00095 while (!exit) 00096 { 00097 queue.waitAndDequeueJobs(jobs, n); 00098 { 00099 boost::mutex::scoped_lock lock(run_jobs_mutex); 00100 run_jobs = true; 00101 } 00102 00103 ROS_DEBUG_STREAM("[Worker (" << boost::this_thread::get_id() <<")] Deqeued " << jobs.size() << " jobs."); 00104 for (size_t i = 0; i < jobs.size() && run_jobs; i++) 00105 { 00106 boost::this_thread::interruption_point(); 00107 jobs[i]->run(); 00108 } 00109 00110 { 00111 boost::mutex::scoped_lock lock(run_jobs_mutex); 00112 if (run_jobs) 00113 queue.justFinishedJobs(jobs.size()); 00114 } 00115 } 00116 ROS_INFO_STREAM("[Worker (" << boost::this_thread::get_id() <<")] Stopped!"); 00117 } 00118 00119 boost::thread thread; 00120 mutable boost::mutex run_jobs_mutex; 00121 00122 Queue<T>& queue; 00123 unsigned int jobs_per_thread; 00124 bool exit; 00125 bool run_jobs; 00126 }; 00127 } 00128 } 00129 00130 #endif