workerpool.c
Go to the documentation of this file.
1 /* Copyright (C) 2013-2016, The Regents of The University of Michigan.
2 All rights reserved.
3 This software was developed in the APRIL Robotics Lab under the
4 direction of Edwin Olson, ebolson@umich.edu. This software may be
5 available under alternative licensing terms; contact the address above.
6 Redistribution and use in source and binary forms, with or without
7 modification, are permitted provided that the following conditions are met:
8 1. Redistributions of source code must retain the above copyright notice, this
9  list of conditions and the following disclaimer.
10 2. Redistributions in binary form must reproduce the above copyright notice,
11  this list of conditions and the following disclaimer in the documentation
12  and/or other materials provided with the distribution.
13 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
14 ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
15 WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
16 DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
17 ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
18 (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
19 LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
20 ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
21 (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
22 SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
23 The views and conclusions contained in the software and documentation are those
24 of the authors and should not be interpreted as representing official policies,
25 either expressed or implied, of the Regents of The University of Michigan.
26 */
27 #include <errno.h>
28 
29 #define _GNU_SOURCE // Possible fix for 16.04
30 #define __USE_GNU
31 #include "common/pthreads_cross.h"
32 #include <assert.h>
33 #include <stdio.h>
34 #include <stdlib.h>
35 #ifdef _WIN32
36 #include <windows.h>
37 #else
38 #include <unistd.h>
39 #endif
40 
41 #include "workerpool.h"
42 #include "debug_print.h"
43 
44 struct workerpool {
45  int nthreads;
47  int taskspos;
48 
49  pthread_t *threads;
50  int *status;
51 
52  pthread_mutex_t mutex;
53  pthread_cond_t startcond; // used to signal the availability of work
54  pthread_cond_t endcond; // used to signal completion of all work
55 
56  int end_count; // how many threads are done?
57 };
58 
59 struct task
60 {
61  void (*f)(void *p);
62  void *p;
63 };
64 
65 void *worker_thread(void *p)
66 {
67  workerpool_t *wp = (workerpool_t*) p;
68 
69  int cnt = 0;
70 
71  while (1) {
72  struct task *task;
73 
74  pthread_mutex_lock(&wp->mutex);
75  while (wp->taskspos == zarray_size(wp->tasks)) {
76  wp->end_count++;
77 // printf("%"PRId64" thread %d did %d\n", utime_now(), pthread_self(), cnt);
78  pthread_cond_broadcast(&wp->endcond);
79  pthread_cond_wait(&wp->startcond, &wp->mutex);
80  cnt = 0;
81 // printf("%"PRId64" thread %d awake\n", utime_now(), pthread_self());
82  }
83 
84  zarray_get_volatile(wp->tasks, wp->taskspos, &task);
85  wp->taskspos++;
86  cnt++;
87  pthread_mutex_unlock(&wp->mutex);
88 // pthread_yield();
89  sched_yield();
90 
91  // we've been asked to exit.
92  if (task->f == NULL)
93  return NULL;
94 
95  task->f(task->p);
96  }
97 
98  return NULL;
99 }
100 
102 {
103  assert(nthreads > 0);
104 
105  workerpool_t *wp = calloc(1, sizeof(workerpool_t));
106  wp->nthreads = nthreads;
107  wp->tasks = zarray_create(sizeof(struct task));
108 
109  if (nthreads > 1) {
110  wp->threads = calloc(wp->nthreads, sizeof(pthread_t));
111 
112  pthread_mutex_init(&wp->mutex, NULL);
113  pthread_cond_init(&wp->startcond, NULL);
114  pthread_cond_init(&wp->endcond, NULL);
115 
116  for (int i = 0; i < nthreads; i++) {
117  int res = pthread_create(&wp->threads[i], NULL, worker_thread, wp);
118  if (res != 0) {
119  debug_print("Insufficient system resources to create workerpool threads\n");
120  // errno already set to EAGAIN by pthread_create() failure
121  return NULL;
122  }
123  }
124  }
125 
126  return wp;
127 }
128 
130 {
131  if (wp == NULL)
132  return;
133 
134  // force all worker threads to exit.
135  if (wp->nthreads > 1) {
136  for (int i = 0; i < wp->nthreads; i++)
137  workerpool_add_task(wp, NULL, NULL);
138 
139  pthread_mutex_lock(&wp->mutex);
140  pthread_cond_broadcast(&wp->startcond);
141  pthread_mutex_unlock(&wp->mutex);
142 
143  for (int i = 0; i < wp->nthreads; i++)
144  pthread_join(wp->threads[i], NULL);
145 
146  pthread_mutex_destroy(&wp->mutex);
147  pthread_cond_destroy(&wp->startcond);
148  pthread_cond_destroy(&wp->endcond);
149  free(wp->threads);
150  }
151 
152  zarray_destroy(wp->tasks);
153  free(wp);
154 }
155 
157 {
158  return wp->nthreads;
159 }
160 
161 void workerpool_add_task(workerpool_t *wp, void (*f)(void *p), void *p)
162 {
163  struct task t;
164  t.f = f;
165  t.p = p;
166 
167  zarray_add(wp->tasks, &t);
168 }
169 
171 {
172  for (int i = 0; i < zarray_size(wp->tasks); i++) {
173  struct task *task;
174  zarray_get_volatile(wp->tasks, i, &task);
175  task->f(task->p);
176  }
177 
178  zarray_clear(wp->tasks);
179 }
180 
181 // runs all added tasks, waits for them to complete.
183 {
184  if (wp->nthreads > 1) {
185  wp->end_count = 0;
186 
187  pthread_mutex_lock(&wp->mutex);
188  pthread_cond_broadcast(&wp->startcond);
189 
190  while (wp->end_count < wp->nthreads) {
191 // printf("caught %d\n", wp->end_count);
192  pthread_cond_wait(&wp->endcond, &wp->mutex);
193  }
194 
195  pthread_mutex_unlock(&wp->mutex);
196 
197  wp->taskspos = 0;
198 
199  zarray_clear(wp->tasks);
200 
201  } else {
203  }
204 }
205 
207 {
208 #ifdef WIN32
209  SYSTEM_INFO sysinfo;
210  GetSystemInfo(&sysinfo);
211  return sysinfo.dwNumberOfProcessors;
212 #else
213  return sysconf (_SC_NPROCESSORS_ONLN);
214 #endif
215 }
void workerpool_run_single(workerpool_t *wp)
Definition: workerpool.c:170
pthread_mutex_t mutex
Definition: workerpool.c:52
void workerpool_add_task(workerpool_t *wp, void(*f)(void *p), void *p)
Definition: workerpool.c:161
static void zarray_get_volatile(const zarray_t *za, int idx, void *p)
Definition: zarray.h:212
void workerpool_destroy(workerpool_t *wp)
Definition: workerpool.c:129
int end_count
Definition: workerpool.c:56
#define debug_print(fmt,...)
Definition: debug_print.h:39
void(* f)(void *p)
Definition: workerpool.c:61
static int zarray_size(const zarray_t *za)
Definition: zarray.h:130
workerpool_t * workerpool_create(int nthreads)
Definition: workerpool.c:101
static void zarray_destroy(zarray_t *za)
Definition: zarray.h:70
zarray_t * tasks
Definition: workerpool.c:46
static zarray_t * zarray_create(size_t el_sz)
Definition: zarray.h:57
void workerpool_run(workerpool_t *wp)
Definition: workerpool.c:182
void * worker_thread(void *p)
Definition: workerpool.c:65
void * p
Definition: workerpool.c:62
pthread_cond_t startcond
Definition: workerpool.c:53
int * status
Definition: workerpool.c:50
int nthreads
Definition: workerpool.c:45
Definition: zarray.h:43
pthread_cond_t endcond
Definition: workerpool.c:54
pthread_t * threads
Definition: workerpool.c:49
int workerpool_get_nthreads(workerpool_t *wp)
Definition: workerpool.c:156
int taskspos
Definition: workerpool.c:47
static void zarray_clear(zarray_t *za)
Definition: zarray.h:365
int workerpool_get_nprocs()
Definition: workerpool.c:206
static void zarray_add(zarray_t *za, const void *p)
Definition: zarray.h:179


apriltag
Author(s): Edwin Olson , Max Krogius
autogenerated on Mon Jun 26 2023 02:26:35