coroutine.h
Go to the documentation of this file.
1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements. See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership. The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing,
13  * software distributed under the License is distributed on an
14  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15  * KIND, either express or implied. See the License for the
16  * specific language governing permissions and limitations
17  * under the License.
18  */
19 
20 #ifndef STDEX_COROUTINE_H_
21 #define STDEX_COROUTINE_H_
22 
23 #ifndef STACK_LIMIT
24 #define STACK_LIMIT (1024*1024)
25 #endif
26 
27 #include <cstdint>
28 #include <cstring>
29 #include <cstdio>
30 #include <cassert>
31 
32 #include <string>
33 #include <vector>
34 #include <list>
35 #include <thread>
36 #include <future>
37 
38 using ::std::string;
39 using ::std::wstring;
40 
41 #ifdef _MSC_VER
42 #include <Windows.h>
43 #else
44 #if defined(__APPLE__) && defined(__MACH__)
45 #define _XOPEN_SOURCE
46 #include <ucontext.h>
47 #else
48 #include <ucontext.h>
49 #endif
50 #endif
51 
52 namespace coroutine {
53 
54 typedef unsigned routine_t;
55 
56 enum class ResumeResult
57 {
58  INVALID = -1,
59  FINISHED = -2,
60  YIELD = 0
61 };
62 
63 #ifdef _MSC_VER
64 
65 struct Routine
66 {
67  std::function<void()> func;
68  bool finished;
69  LPVOID fiber;
70 
71  Routine(std::function<void()> f)
72  {
73  func = f;
74  finished = false;
75  fiber = nullptr;
76  }
77 
78  ~Routine()
79  {
80  DeleteFiber(fiber);
81  }
82 };
83 
84 struct Ordinator
85 {
86  std::vector<Routine *> routines;
87  std::list<routine_t> indexes;
88  routine_t current;
89  size_t stack_size;
90  LPVOID fiber;
91 
92  Ordinator(size_t ss = STACK_LIMIT)
93  {
94  current = 0;
95  stack_size = ss;
96  fiber = ConvertThreadToFiber(nullptr);
97  }
98 
99  ~Ordinator()
100  {
101  for (auto &routine : routines)
102  delete routine;
103  }
104 };
105 
106 thread_local static Ordinator ordinator;
107 
108 inline routine_t create(std::function<void()> f)
109 {
110  Routine *routine = new Routine(f);
111 
112  if (ordinator.indexes.empty())
113  {
114  ordinator.routines.push_back(routine);
115  return ordinator.routines.size();
116  }
117  else
118  {
119  routine_t id = ordinator.indexes.front();
120  ordinator.indexes.pop_front();
121  assert(ordinator.routines[id-1] == nullptr);
122  ordinator.routines[id-1] = routine;
123  return id;
124  }
125 }
126 
127 inline void destroy(routine_t id)
128 {
129  Routine *routine = ordinator.routines[id-1];
130  assert(routine != nullptr);
131 
132  delete routine;
133  ordinator.routines[id-1] = nullptr;
134  ordinator.indexes.push_back(id);
135 }
136 
137 inline void __stdcall entry(LPVOID lpParameter)
138 {
139  routine_t id = ordinator.current;
140  Routine *routine = ordinator.routines[id-1];
141  assert(routine != nullptr);
142 
143  routine->func();
144 
145  routine->finished = true;
146  ordinator.current = 0;
147 
148  SwitchToFiber(ordinator.fiber);
149 }
150 
151 inline ResumeResult resume(routine_t id)
152 {
153  assert(ordinator.current == 0);
154 
155  Routine *routine = ordinator.routines[id-1];
156  if (routine == nullptr)
157  return ResumeResult::INVALID;
158 
159  if (routine->finished)
160  return ResumeResult::FINISHED;
161 
162  if (routine->fiber == nullptr)
163  {
164  routine->fiber = CreateFiber(ordinator.stack_size, entry, 0);
165  ordinator.current = id;
166  SwitchToFiber(routine->fiber);
167  }
168  else
169  {
170  ordinator.current = id;
171  SwitchToFiber(routine->fiber);
172  }
173 
175 }
176 
177 inline void yield()
178 {
179  routine_t id = ordinator.current;
180  Routine *routine = ordinator.routines[id-1];
181  assert(routine != nullptr);
182 
183  ordinator.current = 0;
184  SwitchToFiber(ordinator.fiber);
185 }
186 
187 inline routine_t current()
188 {
189  return ordinator.current;
190 }
191 
192 #if 0
193 template<typename Function>
194 inline typename std::result_of<Function()>::type
195 await(Function &&func)
196 {
197  auto future = std::async(std::launch::async, func);
198  std::future_status status = future.wait_for(std::chrono::milliseconds(0));
199 
200  while (status == std::future_status::timeout)
201  {
202  if (ordinator.current != 0)
203  yield();
204 
205  status = future.wait_for(std::chrono::milliseconds(0));
206  }
207  return future.get();
208 }
209 #endif
210 
211 #if 1
212 template<typename Function>
213 inline std::result_of_t<std::decay_t<Function>()>
214 await(Function &&func)
215 {
216  auto future = std::async(std::launch::async, func);
217  std::future_status status = future.wait_for(std::chrono::milliseconds(0));
218 
219  while (status == std::future_status::timeout)
220  {
221  if (ordinator.current != 0)
222  yield();
223 
224  status = future.wait_for(std::chrono::milliseconds(0));
225  }
226  return future.get();
227 }
228 #endif
229 
230 #else
231 
232 struct Routine
233 {
234  std::function<void()> func;
235  char *stack;
236  bool finished;
237  ucontext_t ctx;
238 
239  Routine(std::function<void()> f)
240  {
241  func = f;
242  stack = nullptr;
243  finished = false;
244  }
245 
247  {
248  delete[] stack;
249  }
250 };
251 
252 struct Ordinator
253 {
254  std::vector<Routine *> routines;
255  std::list<routine_t> indexes;
256  routine_t current;
257  size_t stack_size;
258  ucontext_t ctx;
259 
260  inline Ordinator(size_t ss = STACK_LIMIT)
261  {
262  current = 0;
263  stack_size = ss;
264  }
265 
266  inline ~Ordinator()
267  {
268  for (auto &routine : routines)
269  delete routine;
270  }
271 };
272 
273 thread_local static Ordinator ordinator;
274 
275 inline routine_t create(std::function<void()> f)
276 {
277  Routine *routine = new Routine(f);
278 
279  if (ordinator.indexes.empty())
280  {
281  ordinator.routines.push_back(routine);
282  return ordinator.routines.size();
283  }
284  else
285  {
286  routine_t id = ordinator.indexes.front();
287  ordinator.indexes.pop_front();
288  assert(ordinator.routines[id-1] == nullptr);
289  ordinator.routines[id-1] = routine;
290  return id;
291  }
292 }
293 
294 inline void destroy(routine_t id)
295 {
296  Routine *routine = ordinator.routines[id-1];
297  assert(routine != nullptr);
298 
299  delete routine;
300  ordinator.routines[id-1] = nullptr;
301 }
302 
303 inline void entry()
304 {
305  routine_t id = ordinator.current;
306  Routine *routine = ordinator.routines[id-1];
307  routine->func();
308 
309  routine->finished = true;
310  ordinator.current = 0;
311  ordinator.indexes.push_back(id);
312 }
313 
314 inline ResumeResult resume(routine_t id)
315 {
316  assert(ordinator.current == 0);
317 
318  Routine *routine = ordinator.routines[id-1];
319  if (routine == nullptr)
320  return ResumeResult::INVALID;
321 
322  if (routine->finished)
323  return ResumeResult::FINISHED;
324 
325  if (routine->stack == nullptr)
326  {
327  //initializes the structure to the currently active context.
328  //When successful, getcontext() returns 0
329  //On error, return -1 and set errno appropriately.
330  getcontext(&routine->ctx);
331 
332  //Before invoking makecontext(), the caller must allocate a new stack
333  //for this context and assign its address to ucp->uc_stack,
334  //and define a successor context and assign its address to ucp->uc_link.
335  routine->stack = new char[ordinator.stack_size];
336  routine->ctx.uc_stack.ss_sp = routine->stack;
337  routine->ctx.uc_stack.ss_size = ordinator.stack_size;
338  routine->ctx.uc_link = &ordinator.ctx;
339  ordinator.current = id;
340 
341  //When this context is later activated by swapcontext(), the function entry is called.
342  //When this function returns, the successor context is activated.
343  //If the successor context pointer is NULL, the thread exits.
344  makecontext(&routine->ctx, reinterpret_cast<void (*)(void)>(entry), 0);
345 
346  //The swapcontext() function saves the current context,
347  //and then activates the context of another.
348  swapcontext(&ordinator.ctx, &routine->ctx);
349  }
350  else
351  {
352  ordinator.current = id;
353  swapcontext(&ordinator.ctx, &routine->ctx);
354  }
355 
357 }
358 
359 inline void yield()
360 {
361  routine_t id = ordinator.current;
362  Routine *routine = ordinator.routines[id-1];
363  assert(routine != nullptr);
364 
365  char *stack_top = routine->stack + ordinator.stack_size;
366  char stack_bottom = 0;
367  assert(size_t(stack_top - &stack_bottom) <= ordinator.stack_size);
368 
369  ordinator.current = 0;
370  swapcontext(&routine->ctx , &ordinator.ctx);
371 }
372 
373 inline routine_t current()
374 {
375  return ordinator.current;
376 }
377 
378 template<typename Function>
379 inline typename std::result_of<Function()>::type
380 await(Function &&func)
381 {
382  auto future = std::async(std::launch::async, func);
383  std::future_status status = future.wait_for(std::chrono::milliseconds(0));
384 
385  while (status == std::future_status::timeout)
386  {
387  if (ordinator.current != 0)
388  yield();
389 
390  status = future.wait_for(std::chrono::milliseconds(0));
391  }
392  return future.get();
393 }
394 
395 #endif
396 
397 template<typename Type>
398 class Channel
399 {
400 public:
402  {
403  _taker = 0;
404  }
405 
406  Channel(routine_t id)
407  {
408  _taker = id;
409  }
410 
411  inline void consumer(routine_t id)
412  {
413  _taker = id;
414  }
415 
416  inline void push(const Type &obj)
417  {
418  _list.push_back(obj);
419  if (_taker && _taker != current())
420  resume(_taker);
421  }
422 
423  inline void push(Type &&obj)
424  {
425  _list.push_back(std::move(obj));
426  if (_taker && _taker != current())
427  resume(_taker);
428  }
429 
430  inline Type pop()
431  {
432  if (!_taker)
433  _taker = current();
434 
435  while (_list.empty())
436  yield();
437 
438  Type obj = std::move(_list.front());
439  _list.pop_front();
440  return std::move(obj);
441  }
442 
443  inline void clear()
444  {
445  _list.clear();
446  }
447 
448  inline void touch()
449  {
450  if (_taker && _taker != current())
451  resume(_taker);
452  }
453 
454  inline size_t size()
455  {
456  return _list.size();
457  }
458 
459  inline bool empty()
460  {
461  return _list.empty();
462  }
463 
464 private:
465  std::list<Type> _list;
466  routine_t _taker;
467 };
468 
469 }
470 #endif //STDEX_COROUTINE_H_
ucontext_t ctx
Definition: coroutine.h:237
static thread_local Ordinator ordinator
Definition: coroutine.h:273
std::function< void()> func
Definition: coroutine.h:234
void yield()
Definition: coroutine.h:359
std::list< Type > _list
Definition: coroutine.h:465
std::list< routine_t > indexes
Definition: coroutine.h:255
void entry()
Definition: coroutine.h:303
routine_t create(std::function< void()> f)
Definition: coroutine.h:275
void destroy(routine_t id)
Definition: coroutine.h:294
unsigned routine_t
Definition: coroutine.h:54
Channel(routine_t id)
Definition: coroutine.h:406
std::result_of< Function()>::type await(Function &&func)
Definition: coroutine.h:380
void consumer(routine_t id)
Definition: coroutine.h:411
void push(const Type &obj)
Definition: coroutine.h:416
void push(Type &&obj)
Definition: coroutine.h:423
routine_t _taker
Definition: coroutine.h:466
std::vector< Routine * > routines
Definition: coroutine.h:254
#define STACK_LIMIT
Definition: coroutine.h:24
Routine(std::function< void()> f)
Definition: coroutine.h:239
ResumeResult resume(routine_t id)
Definition: coroutine.h:314
routine_t current()
Definition: coroutine.h:373
Ordinator(size_t ss=STACK_LIMIT)
Definition: coroutine.h:260


behaviortree_cpp
Author(s): Michele Colledanchise, Davide Faconti
autogenerated on Sun Feb 3 2019 03:14:32