worker.h
Go to the documentation of this file.
1 //=================================================================================================
2 // Copyright (c) 2017, Alexander Stumpf, TU Darmstadt
3 // All rights reserved.
4 
5 // Redistribution and use in source and binary forms, with or without
6 // modification, are permitted provided that the following conditions are met:
7 // * Redistributions of source code must retain the above copyright
8 // notice, this list of conditions and the following disclaimer.
9 // * Redistributions in binary form must reproduce the above copyright
10 // notice, this list of conditions and the following disclaimer in the
11 // documentation and/or other materials provided with the distribution.
12 // * Neither the name of the Simulation, Systems Optimization and Robotics
13 // group, TU Darmstadt nor the names of its contributors may be used to
14 // endorse or promote products derived from this software without
15 // specific prior written permission.
16 
17 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
18 // ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19 // WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
20 // DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER BE LIABLE FOR ANY
21 // DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
22 // (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
23 // LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
24 // ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25 // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
26 // SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27 //=================================================================================================
28 
29 #ifndef VIGIR_FOOTSTEP_PLANNING_THREADING_WORKER_H__
30 #define VIGIR_FOOTSTEP_PLANNING_THREADING_WORKER_H__
31 
32 #include <ros/ros.h>
33 
34 #include <boost/thread.hpp>
35 
37 
38 
39 
41 {
42 namespace threading
43 {
44 template <class T>
45 class Worker
46 {
47 public:
48  Worker(Queue<T>& queue, unsigned int jobs_per_thread = 10, bool auto_start = true)
49  : queue(queue)
51  , exit(false)
52  {
53  if (auto_start)
54  start();
55  }
56 
57  virtual ~Worker()
58  {
59  ROS_INFO_STREAM("[Worker (" << boost::this_thread::get_id() <<")] Destruct");
60  stop();
61  }
62 
63  void start()
64  {
65  ROS_INFO_STREAM("[Worker (" << boost::this_thread::get_id() <<")] Start request");
66  exit = false;
67  thread = boost::thread(&Worker::run, this, jobs_per_thread);
68  }
69 
70  void stop()
71  {
72  ROS_INFO_STREAM("[Worker (" << boost::this_thread::get_id() <<")] Stop request");
73 
74  // soft stop
75  interruptJobs();
76  exit = true;
77 
78  // hard stop
79  thread.interrupt();
80 
81  thread.join();
82  }
83 
85  {
86  boost::mutex::scoped_lock lock(run_jobs_mutex);
87  run_jobs = false;
88  }
89 
90 protected:
91  void run(unsigned int n)
92  {
93  ROS_INFO_STREAM("[Worker (" << boost::this_thread::get_id() <<")] Started with " << n << " jobs per thread.");
94  std::vector<boost::shared_ptr<T> > jobs;
95  while (!exit)
96  {
97  queue.waitAndDequeueJobs(jobs, n);
98  {
99  boost::mutex::scoped_lock lock(run_jobs_mutex);
100  run_jobs = true;
101  }
102 
103  ROS_DEBUG_STREAM("[Worker (" << boost::this_thread::get_id() <<")] Deqeued " << jobs.size() << " jobs.");
104  for (size_t i = 0; i < jobs.size() && run_jobs; i++)
105  {
106  boost::this_thread::interruption_point();
107  jobs[i]->run();
108  }
109 
110  {
111  boost::mutex::scoped_lock lock(run_jobs_mutex);
112  if (run_jobs)
113  queue.justFinishedJobs(jobs.size());
114  }
115  }
116  ROS_INFO_STREAM("[Worker (" << boost::this_thread::get_id() <<")] Stopped!");
117  }
118 
119  boost::thread thread;
120  mutable boost::mutex run_jobs_mutex;
121 
123  unsigned int jobs_per_thread;
124  bool exit;
125  bool run_jobs;
126 };
127 }
128 }
129 
130 #endif
#define ROS_DEBUG_STREAM(args)
#define ROS_INFO_STREAM(args)
Worker(Queue< T > &queue, unsigned int jobs_per_thread=10, bool auto_start=true)
Definition: worker.h:48


vigir_footstep_planning_lib
Author(s): Alexander Stumpf
autogenerated on Mon Jun 10 2019 15:47:33