workerpool.c
Go to the documentation of this file.
1 /* Copyright (C) 2013-2016, The Regents of The University of Michigan.
2 All rights reserved.
3 This software was developed in the APRIL Robotics Lab under the
4 direction of Edwin Olson, ebolson@umich.edu. This software may be
5 available under alternative licensing terms; contact the address above.
6 Redistribution and use in source and binary forms, with or without
7 modification, are permitted provided that the following conditions are met:
8 1. Redistributions of source code must retain the above copyright notice, this
9  list of conditions and the following disclaimer.
10 2. Redistributions in binary form must reproduce the above copyright notice,
11  this list of conditions and the following disclaimer in the documentation
12  and/or other materials provided with the distribution.
13 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
14 ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
15 WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
16 DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
17 ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
18 (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
19 LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
20 ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
21 (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
22 SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
23 The views and conclusions contained in the software and documentation are those
24 of the authors and should not be interpreted as representing official policies,
25 either expressed or implied, of the Regents of The University of Michigan.
26 */
27 #include <errno.h>
28 
29 #define _GNU_SOURCE // Possible fix for 16.04
30 #define __USE_GNU
31 #include "common/pthreads_cross.h"
32 #include <assert.h>
33 #include <stdbool.h>
34 #include <stdio.h>
35 #include <stdlib.h>
36 #ifdef _WIN32
37 #include <windows.h>
38 #else
39 #include <unistd.h>
40 #endif
41 
42 #include "workerpool.h"
43 #include "debug_print.h"
44 
45 struct workerpool {
46  int nthreads;
48  int taskspos;
49 
50  pthread_t *threads;
51  int *status;
52 
53  pthread_mutex_t mutex;
54  pthread_cond_t startcond; // used to signal the availability of work
55  bool start_predicate; // predicate that prevents spurious wakeups on startcond
56  pthread_cond_t endcond; // used to signal completion of all work
57 
58  int end_count; // how many threads are done?
59 };
60 
61 struct task
62 {
63  void (*f)(void *p);
64  void *p;
65 };
66 
67 void *worker_thread(void *p)
68 {
69  workerpool_t *wp = (workerpool_t*) p;
70 
71  while (1) {
72  struct task *task;
73 
74  pthread_mutex_lock(&wp->mutex);
75  while (wp->taskspos == zarray_size(wp->tasks) || !wp->start_predicate) {
76  wp->end_count++;
77  pthread_cond_broadcast(&wp->endcond);
78  pthread_cond_wait(&wp->startcond, &wp->mutex);
79  }
80 
82  wp->taskspos++;
83  pthread_mutex_unlock(&wp->mutex);
84  sched_yield();
85 
86  // we've been asked to exit.
87  if (task->f == NULL)
88  return NULL;
89 
90  task->f(task->p);
91  }
92 
93  return NULL;
94 }
95 
97 {
98  assert(nthreads > 0);
99 
100  workerpool_t *wp = calloc(1, sizeof(workerpool_t));
101  wp->nthreads = nthreads;
102  wp->tasks = zarray_create(sizeof(struct task));
103  wp->start_predicate = false;
104 
105  if (nthreads > 1) {
106  wp->threads = calloc(wp->nthreads, sizeof(pthread_t));
107 
108  pthread_mutex_init(&wp->mutex, NULL);
109  pthread_cond_init(&wp->startcond, NULL);
110  pthread_cond_init(&wp->endcond, NULL);
111 
112  for (int i = 0; i < nthreads; i++) {
113  int res = pthread_create(&wp->threads[i], NULL, worker_thread, wp);
114  if (res != 0) {
115  debug_print("Insufficient system resources to create workerpool threads\n");
116  // errno already set to EAGAIN by pthread_create() failure
117  return NULL;
118  }
119  }
120 
121  // Wait for the worker threads to be ready
122  pthread_mutex_lock(&wp->mutex);
123  while (wp->end_count < wp->nthreads) {
124  pthread_cond_wait(&wp->endcond, &wp->mutex);
125  }
126  pthread_mutex_unlock(&wp->mutex);
127  }
128 
129  return wp;
130 }
131 
133 {
134  if (wp == NULL)
135  return;
136 
137  // force all worker threads to exit.
138  if (wp->nthreads > 1) {
139  for (int i = 0; i < wp->nthreads; i++)
140  workerpool_add_task(wp, NULL, NULL);
141 
142  pthread_mutex_lock(&wp->mutex);
143  wp->start_predicate = true;
144  pthread_cond_broadcast(&wp->startcond);
145  pthread_mutex_unlock(&wp->mutex);
146 
147  for (int i = 0; i < wp->nthreads; i++)
148  pthread_join(wp->threads[i], NULL);
149 
150  pthread_mutex_destroy(&wp->mutex);
151  pthread_cond_destroy(&wp->startcond);
152  pthread_cond_destroy(&wp->endcond);
153  free(wp->threads);
154  }
155 
156  zarray_destroy(wp->tasks);
157  free(wp);
158 }
159 
161 {
162  return wp->nthreads;
163 }
164 
165 void workerpool_add_task(workerpool_t *wp, void (*f)(void *p), void *p)
166 {
167  struct task t;
168  t.f = f;
169  t.p = p;
170 
171  if (wp->nthreads > 1) {
172  pthread_mutex_lock(&wp->mutex);
173  zarray_add(wp->tasks, &t);
174  pthread_mutex_unlock(&wp->mutex);
175  } else {
176  zarray_add(wp->tasks, &t);
177  }
178 }
179 
181 {
182  for (int i = 0; i < zarray_size(wp->tasks); i++) {
183  struct task *task;
184  zarray_get_volatile(wp->tasks, i, &task);
185  task->f(task->p);
186  }
187 
188  zarray_clear(wp->tasks);
189 }
190 
191 // runs all added tasks, waits for them to complete.
193 {
194  if (wp->nthreads > 1) {
195  pthread_mutex_lock(&wp->mutex);
196  wp->end_count = 0;
197  wp->start_predicate = true;
198  pthread_cond_broadcast(&wp->startcond);
199 
200  while (wp->end_count < wp->nthreads) {
201 // printf("caught %d\n", wp->end_count);
202  pthread_cond_wait(&wp->endcond, &wp->mutex);
203  }
204 
205  wp->taskspos = 0;
206  wp->start_predicate = false;
207  pthread_mutex_unlock(&wp->mutex);
208 
209  zarray_clear(wp->tasks);
210 
211  } else {
213  }
214 }
215 
217 {
218 #ifdef _WIN32
219  SYSTEM_INFO sysinfo;
220  GetSystemInfo(&sysinfo);
221  return sysinfo.dwNumberOfProcessors;
222 #else
223  return sysconf (_SC_NPROCESSORS_ONLN);
224 #endif
225 }
task::f
void(* f)(void *p)
Definition: workerpool.c:63
zarray_create
static zarray_t * zarray_create(size_t el_sz)
Definition: zarray.h:57
workerpool::mutex
pthread_mutex_t mutex
Definition: workerpool.c:53
task
Definition: workerpool.c:61
workerpool_add_task
void workerpool_add_task(workerpool_t *wp, void(*f)(void *p), void *p)
Definition: workerpool.c:165
zarray_destroy
static void zarray_destroy(zarray_t *za)
Definition: zarray.h:70
workerpool::tasks
zarray_t * tasks
Definition: workerpool.c:47
workerpool_run
void workerpool_run(workerpool_t *wp)
Definition: workerpool.c:192
workerpool_run_single
void workerpool_run_single(workerpool_t *wp)
Definition: workerpool.c:180
zarray
Definition: zarray.h:43
workerpool::end_count
int end_count
Definition: workerpool.c:58
zarray_size
static int zarray_size(const zarray_t *za)
Definition: zarray.h:130
workerpool_create
workerpool_t * workerpool_create(int nthreads)
Definition: workerpool.c:96
workerpool::start_predicate
bool start_predicate
Definition: workerpool.c:55
task::p
void * p
Definition: workerpool.c:64
workerpool::endcond
pthread_cond_t endcond
Definition: workerpool.c:56
workerpool::startcond
pthread_cond_t startcond
Definition: workerpool.c:54
workerpool::threads
pthread_t * threads
Definition: workerpool.c:50
debug_print
#define debug_print(fmt,...)
Definition: debug_print.h:39
zarray_add
static void zarray_add(zarray_t *za, const void *p)
Definition: zarray.h:179
workerpool::status
int * status
Definition: workerpool.c:51
workerpool
Definition: workerpool.c:45
pthreads_cross.h
workerpool::nthreads
int nthreads
Definition: workerpool.c:46
zarray_get_volatile
static void zarray_get_volatile(const zarray_t *za, int idx, void *p)
Definition: zarray.h:212
workerpool_destroy
void workerpool_destroy(workerpool_t *wp)
Definition: workerpool.c:132
workerpool.h
debug_print.h
workerpool_get_nprocs
int workerpool_get_nprocs()
Definition: workerpool.c:216
zarray_clear
static void zarray_clear(zarray_t *za)
Definition: zarray.h:365
workerpool::taskspos
int taskspos
Definition: workerpool.c:48
worker_thread
void * worker_thread(void *p)
Definition: workerpool.c:67
workerpool_get_nthreads
int workerpool_get_nthreads(workerpool_t *wp)
Definition: workerpool.c:160


apriltag
Author(s): Edwin Olson , Max Krogius
autogenerated on Sun Apr 20 2025 02:08:47