workerpool.c
Go to the documentation of this file.
1 /* Copyright (C) 2013-2016, The Regents of The University of Michigan.
2 All rights reserved.
3 
4 This software was developed in the APRIL Robotics Lab under the
5 direction of Edwin Olson, ebolson@umich.edu. This software may be
6 available under alternative licensing terms; contact the address above.
7 
8 Redistribution and use in source and binary forms, with or without
9 modification, are permitted provided that the following conditions are met:
10 
11 1. Redistributions of source code must retain the above copyright notice, this
12  list of conditions and the following disclaimer.
13 2. Redistributions in binary form must reproduce the above copyright notice,
14  this list of conditions and the following disclaimer in the documentation
15  and/or other materials provided with the distribution.
16 
17 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
18 ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19 WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
20 DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
21 ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
22 (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
23 LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
24 ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25 (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
26 SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27 
28 The views and conclusions contained in the software and documentation are those
29 of the authors and should not be interpreted as representing official policies,
30 either expressed or implied, of the Regents of The University of Michigan.
31 */
32 
33 #define __USE_GNU
34 #include <pthread.h>
35 #include <sched.h>
36 #include <assert.h>
37 #include <stdio.h>
38 #include <stdlib.h>
39 #include <inttypes.h>
40 
41 #include "workerpool.h"
42 #include "timeprofile.h"
43 #include "math_util.h"
44 #include "string_util.h"
45 
46 struct workerpool {
47  int nthreads;
49  int taskspos;
50 
51  pthread_t *threads;
52  int *status;
53 
54  pthread_mutex_t mutex;
55  pthread_cond_t startcond; // used to signal the availability of work
56  pthread_cond_t endcond; // used to signal completion of all work
57 
58  int end_count; // how many threads are done?
59 };
60 
61 struct task
62 {
63  void (*f)(void *p);
64  void *p;
65 };
66 
67 void *worker_thread(void *p)
68 {
69  workerpool_t *wp = (workerpool_t*) p;
70 
71  int cnt = 0;
72 
73  while (1) {
74  struct task *task;
75 
76  pthread_mutex_lock(&wp->mutex);
77  while (wp->taskspos == zarray_size(wp->tasks)) {
78  wp->end_count++;
79 // printf("%"PRId64" thread %d did %d\n", utime_now(), pthread_self(), cnt);
80  pthread_cond_broadcast(&wp->endcond);
81  pthread_cond_wait(&wp->startcond, &wp->mutex);
82  cnt = 0;
83 // printf("%"PRId64" thread %d awake\n", utime_now(), pthread_self());
84  }
85 
86  zarray_get_volatile(wp->tasks, wp->taskspos, &task);
87  wp->taskspos++;
88  cnt++;
89  pthread_mutex_unlock(&wp->mutex);
90 // pthread_yield();
91  sched_yield();
92 
93  // we've been asked to exit.
94  if (task->f == NULL)
95  return NULL;
96 
97  task->f(task->p);
98  }
99 
100  return NULL;
101 }
102 
104 {
105  assert(nthreads > 0);
106 
107  workerpool_t *wp = calloc(1, sizeof(workerpool_t));
108  wp->nthreads = nthreads;
109  wp->tasks = zarray_create(sizeof(struct task));
110 
111  if (nthreads > 1) {
112  wp->threads = calloc(wp->nthreads, sizeof(pthread_t));
113 
114  pthread_mutex_init(&wp->mutex, NULL);
115  pthread_cond_init(&wp->startcond, NULL);
116  pthread_cond_init(&wp->endcond, NULL);
117 
118  for (int i = 0; i < nthreads; i++) {
119  int res = pthread_create(&wp->threads[i], NULL, worker_thread, wp);
120  if (res != 0) {
121  perror("pthread_create");
122  exit(-1);
123  }
124  }
125  }
126 
127  return wp;
128 }
129 
131 {
132  if (wp == NULL)
133  return;
134 
135  // force all worker threads to exit.
136  if (wp->nthreads > 1) {
137  for (int i = 0; i < wp->nthreads; i++)
138  workerpool_add_task(wp, NULL, NULL);
139 
140  pthread_mutex_lock(&wp->mutex);
141  pthread_cond_broadcast(&wp->startcond);
142  pthread_mutex_unlock(&wp->mutex);
143 
144  for (int i = 0; i < wp->nthreads; i++)
145  pthread_join(wp->threads[i], NULL);
146 
147  pthread_mutex_destroy(&wp->mutex);
148  pthread_cond_destroy(&wp->startcond);
149  pthread_cond_destroy(&wp->endcond);
150  free(wp->threads);
151  }
152 
153  zarray_destroy(wp->tasks);
154  free(wp);
155 }
156 
158 {
159  return wp->nthreads;
160 }
161 
162 void workerpool_add_task(workerpool_t *wp, void (*f)(void *p), void *p)
163 {
164  struct task t;
165  t.f = f;
166  t.p = p;
167 
168  zarray_add(wp->tasks, &t);
169 }
170 
172 {
173  for (int i = 0; i < zarray_size(wp->tasks); i++) {
174  struct task *task;
175  zarray_get_volatile(wp->tasks, i, &task);
176  task->f(task->p);
177  }
178 
179  zarray_clear(wp->tasks);
180 }
181 
182 // runs all added tasks, waits for them to complete.
184 {
185  if (wp->nthreads > 1) {
186  wp->end_count = 0;
187 
188  pthread_mutex_lock(&wp->mutex);
189  pthread_cond_broadcast(&wp->startcond);
190 
191  while (wp->end_count < wp->nthreads) {
192 // printf("caught %d\n", wp->end_count);
193  pthread_cond_wait(&wp->endcond, &wp->mutex);
194  }
195 
196  pthread_mutex_unlock(&wp->mutex);
197 
198  wp->taskspos = 0;
199 
200  zarray_clear(wp->tasks);
201 
202  } else {
204  }
205 }
206 
208 {
209  FILE * f = fopen("/proc/cpuinfo", "r");
210  size_t n = 0;
211  char * buf = NULL;
212 
213  int nproc = 0;
214 
215  while(getline(&buf, &n, f) != -1)
216  {
217  if(!str_starts_with(buf, "processor"))
218  continue;
219 
220  int colon = str_indexof(buf, ":");
221 
222  int v = atoi(&buf[colon+1]);
223  if (v > nproc)
224  nproc = v;
225  }
226 
227  free(buf);
228 
229  return nproc;
230 }
void workerpool_run_single(workerpool_t *wp)
Definition: workerpool.c:171
pthread_mutex_t mutex
Definition: workerpool.c:54
void workerpool_add_task(workerpool_t *wp, void(*f)(void *p), void *p)
Definition: workerpool.c:162
static void zarray_get_volatile(const zarray_t *za, int idx, void *p)
Definition: zarray.h:218
void workerpool_destroy(workerpool_t *wp)
Definition: workerpool.c:130
int end_count
Definition: workerpool.c:58
void(* f)(void *p)
Definition: workerpool.c:63
static int zarray_size(const zarray_t *za)
Definition: zarray.h:136
workerpool_t * workerpool_create(int nthreads)
Definition: workerpool.c:103
bool str_starts_with(const char *haystack, const char *needle)
Definition: string_util.c:580
static void zarray_destroy(zarray_t *za)
Definition: zarray.h:76
zarray_t * tasks
Definition: workerpool.c:48
static zarray_t * zarray_create(size_t el_sz)
Definition: zarray.h:63
void workerpool_run(workerpool_t *wp)
Definition: workerpool.c:183
void * worker_thread(void *p)
Definition: workerpool.c:67
void * p
Definition: workerpool.c:64
pthread_cond_t startcond
Definition: workerpool.c:55
int * status
Definition: workerpool.c:52
int nthreads
Definition: workerpool.c:47
Definition: zarray.h:49
pthread_cond_t endcond
Definition: workerpool.c:56
pthread_t * threads
Definition: workerpool.c:51
int workerpool_get_nthreads(workerpool_t *wp)
Definition: workerpool.c:157
int str_indexof(const char *haystack, const char *needle)
Definition: string_util.c:259
int taskspos
Definition: workerpool.c:49
static void zarray_clear(zarray_t *za)
Definition: zarray.h:391
int workerpool_get_nprocs()
Definition: workerpool.c:207
static void zarray_add(zarray_t *za, const void *p)
Definition: zarray.h:185


apriltags2
Author(s): Danylo Malyuta
autogenerated on Fri Oct 19 2018 04:02:32