Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020 #ifndef STDEX_COROUTINE_H_
00021 #define STDEX_COROUTINE_H_
00022
00023 #ifndef STACK_LIMIT
00024 #define STACK_LIMIT (1024*1024)
00025 #endif
00026
00027 #include <cstdint>
00028 #include <cstring>
00029 #include <cstdio>
00030 #include <cassert>
00031
00032 #include <string>
00033 #include <vector>
00034 #include <list>
00035 #include <thread>
00036 #include <future>
00037
00038 using ::std::string;
00039 using ::std::wstring;
00040
00041 #ifdef _MSC_VER
00042 #include <Windows.h>
00043 #else
00044 #if defined(__APPLE__) && defined(__MACH__)
00045 #define _XOPEN_SOURCE
00046 #include <ucontext.h>
00047 #else
00048 #include <ucontext.h>
00049 #endif
00050 #endif
00051
00052 namespace coroutine {
00053
00054 typedef unsigned routine_t;
00055
00056 enum class ResumeResult
00057 {
00058 INVALID = -1,
00059 FINISHED = -2,
00060 YIELD = 0
00061 };
00062
00063 #ifdef _MSC_VER
00064
00065 struct Routine
00066 {
00067 std::function<void()> func;
00068 bool finished;
00069 LPVOID fiber;
00070
00071 Routine(std::function<void()> f)
00072 {
00073 func = f;
00074 finished = false;
00075 fiber = nullptr;
00076 }
00077
00078 ~Routine()
00079 {
00080 DeleteFiber(fiber);
00081 }
00082 };
00083
00084 struct Ordinator
00085 {
00086 std::vector<Routine *> routines;
00087 std::list<routine_t> indexes;
00088 routine_t current;
00089 size_t stack_size;
00090 LPVOID fiber;
00091
00092 Ordinator(size_t ss = STACK_LIMIT)
00093 {
00094 current = 0;
00095 stack_size = ss;
00096 fiber = ConvertThreadToFiber(nullptr);
00097 }
00098
00099 ~Ordinator()
00100 {
00101 for (auto &routine : routines)
00102 delete routine;
00103 }
00104 };
00105
00106 thread_local static Ordinator ordinator;
00107
00108 inline routine_t create(std::function<void()> f)
00109 {
00110 Routine *routine = new Routine(f);
00111
00112 if (ordinator.indexes.empty())
00113 {
00114 ordinator.routines.push_back(routine);
00115 return ordinator.routines.size();
00116 }
00117 else
00118 {
00119 routine_t id = ordinator.indexes.front();
00120 ordinator.indexes.pop_front();
00121 assert(ordinator.routines[id-1] == nullptr);
00122 ordinator.routines[id-1] = routine;
00123 return id;
00124 }
00125 }
00126
00127 inline void destroy(routine_t id)
00128 {
00129 Routine *routine = ordinator.routines[id-1];
00130 assert(routine != nullptr);
00131
00132 delete routine;
00133 ordinator.routines[id-1] = nullptr;
00134 ordinator.indexes.push_back(id);
00135 }
00136
00137 inline void __stdcall entry(LPVOID lpParameter)
00138 {
00139 routine_t id = ordinator.current;
00140 Routine *routine = ordinator.routines[id-1];
00141 assert(routine != nullptr);
00142
00143 routine->func();
00144
00145 routine->finished = true;
00146 ordinator.current = 0;
00147
00148 SwitchToFiber(ordinator.fiber);
00149 }
00150
00151 inline ResumeResult resume(routine_t id)
00152 {
00153 assert(ordinator.current == 0);
00154
00155 Routine *routine = ordinator.routines[id-1];
00156 if (routine == nullptr)
00157 return ResumeResult::INVALID;
00158
00159 if (routine->finished)
00160 return ResumeResult::FINISHED;
00161
00162 if (routine->fiber == nullptr)
00163 {
00164 routine->fiber = CreateFiber(ordinator.stack_size, entry, 0);
00165 ordinator.current = id;
00166 SwitchToFiber(routine->fiber);
00167 }
00168 else
00169 {
00170 ordinator.current = id;
00171 SwitchToFiber(routine->fiber);
00172 }
00173
00174 return routine->finished ? ResumeResult::FINISHED : ResumeResult::YIELD;
00175 }
00176
00177 inline void yield()
00178 {
00179 routine_t id = ordinator.current;
00180 Routine *routine = ordinator.routines[id-1];
00181 assert(routine != nullptr);
00182
00183 ordinator.current = 0;
00184 SwitchToFiber(ordinator.fiber);
00185 }
00186
00187 inline routine_t current()
00188 {
00189 return ordinator.current;
00190 }
00191
00192 #if 0
00193 template<typename Function>
00194 inline typename std::result_of<Function()>::type
00195 await(Function &&func)
00196 {
00197 auto future = std::async(std::launch::async, func);
00198 std::future_status status = future.wait_for(std::chrono::milliseconds(0));
00199
00200 while (status == std::future_status::timeout)
00201 {
00202 if (ordinator.current != 0)
00203 yield();
00204
00205 status = future.wait_for(std::chrono::milliseconds(0));
00206 }
00207 return future.get();
00208 }
00209 #endif
00210
00211 #if 1
00212 template<typename Function>
00213 inline std::result_of_t<std::decay_t<Function>()>
00214 await(Function &&func)
00215 {
00216 auto future = std::async(std::launch::async, func);
00217 std::future_status status = future.wait_for(std::chrono::milliseconds(0));
00218
00219 while (status == std::future_status::timeout)
00220 {
00221 if (ordinator.current != 0)
00222 yield();
00223
00224 status = future.wait_for(std::chrono::milliseconds(0));
00225 }
00226 return future.get();
00227 }
00228 #endif
00229
00230 #else
00231
00232 struct Routine
00233 {
00234 std::function<void()> func;
00235 char *stack;
00236 bool finished;
00237 ucontext_t ctx;
00238
00239 Routine(std::function<void()> f)
00240 {
00241 func = f;
00242 stack = nullptr;
00243 finished = false;
00244 }
00245
00246 ~Routine()
00247 {
00248 delete[] stack;
00249 }
00250 };
00251
00252 struct Ordinator
00253 {
00254 std::vector<Routine *> routines;
00255 std::list<routine_t> indexes;
00256 routine_t current;
00257 size_t stack_size;
00258 ucontext_t ctx;
00259
00260 inline Ordinator(size_t ss = STACK_LIMIT)
00261 {
00262 current = 0;
00263 stack_size = ss;
00264 }
00265
00266 inline ~Ordinator()
00267 {
00268 for (auto &routine : routines)
00269 delete routine;
00270 }
00271 };
00272
00273 thread_local static Ordinator ordinator;
00274
00275 inline routine_t create(std::function<void()> f)
00276 {
00277 Routine *routine = new Routine(f);
00278
00279 if (ordinator.indexes.empty())
00280 {
00281 ordinator.routines.push_back(routine);
00282 return ordinator.routines.size();
00283 }
00284 else
00285 {
00286 routine_t id = ordinator.indexes.front();
00287 ordinator.indexes.pop_front();
00288 assert(ordinator.routines[id-1] == nullptr);
00289 ordinator.routines[id-1] = routine;
00290 return id;
00291 }
00292 }
00293
00294 inline void destroy(routine_t id)
00295 {
00296 Routine *routine = ordinator.routines[id-1];
00297 assert(routine != nullptr);
00298
00299 delete routine;
00300 ordinator.routines[id-1] = nullptr;
00301 }
00302
00303 inline void entry()
00304 {
00305 routine_t id = ordinator.current;
00306 Routine *routine = ordinator.routines[id-1];
00307 routine->func();
00308
00309 routine->finished = true;
00310 ordinator.current = 0;
00311 ordinator.indexes.push_back(id);
00312 }
00313
00314 inline ResumeResult resume(routine_t id)
00315 {
00316 assert(ordinator.current == 0);
00317
00318 Routine *routine = ordinator.routines[id-1];
00319 if (routine == nullptr)
00320 return ResumeResult::INVALID;
00321
00322 if (routine->finished)
00323 return ResumeResult::FINISHED;
00324
00325 if (routine->stack == nullptr)
00326 {
00327
00328
00329
00330 getcontext(&routine->ctx);
00331
00332
00333
00334
00335 routine->stack = new char[ordinator.stack_size];
00336 routine->ctx.uc_stack.ss_sp = routine->stack;
00337 routine->ctx.uc_stack.ss_size = ordinator.stack_size;
00338 routine->ctx.uc_link = &ordinator.ctx;
00339 ordinator.current = id;
00340
00341
00342
00343
00344 makecontext(&routine->ctx, reinterpret_cast<void (*)(void)>(entry), 0);
00345
00346
00347
00348 swapcontext(&ordinator.ctx, &routine->ctx);
00349 }
00350 else
00351 {
00352 ordinator.current = id;
00353 swapcontext(&ordinator.ctx, &routine->ctx);
00354 }
00355
00356 return routine->finished ? ResumeResult::FINISHED : ResumeResult::YIELD;
00357 }
00358
00359 inline void yield()
00360 {
00361 routine_t id = ordinator.current;
00362 Routine *routine = ordinator.routines[id-1];
00363 assert(routine != nullptr);
00364
00365 char *stack_top = routine->stack + ordinator.stack_size;
00366 char stack_bottom = 0;
00367 assert(size_t(stack_top - &stack_bottom) <= ordinator.stack_size);
00368
00369 ordinator.current = 0;
00370 swapcontext(&routine->ctx , &ordinator.ctx);
00371 }
00372
00373 inline routine_t current()
00374 {
00375 return ordinator.current;
00376 }
00377
00378 template<typename Function>
00379 inline typename std::result_of<Function()>::type
00380 await(Function &&func)
00381 {
00382 auto future = std::async(std::launch::async, func);
00383 std::future_status status = future.wait_for(std::chrono::milliseconds(0));
00384
00385 while (status == std::future_status::timeout)
00386 {
00387 if (ordinator.current != 0)
00388 yield();
00389
00390 status = future.wait_for(std::chrono::milliseconds(0));
00391 }
00392 return future.get();
00393 }
00394
00395 #endif
00396
00397 template<typename Type>
00398 class Channel
00399 {
00400 public:
00401 Channel()
00402 {
00403 _taker = 0;
00404 }
00405
00406 Channel(routine_t id)
00407 {
00408 _taker = id;
00409 }
00410
00411 inline void consumer(routine_t id)
00412 {
00413 _taker = id;
00414 }
00415
00416 inline void push(const Type &obj)
00417 {
00418 _list.push_back(obj);
00419 if (_taker && _taker != current())
00420 resume(_taker);
00421 }
00422
00423 inline void push(Type &&obj)
00424 {
00425 _list.push_back(std::move(obj));
00426 if (_taker && _taker != current())
00427 resume(_taker);
00428 }
00429
00430 inline Type pop()
00431 {
00432 if (!_taker)
00433 _taker = current();
00434
00435 while (_list.empty())
00436 yield();
00437
00438 Type obj = std::move(_list.front());
00439 _list.pop_front();
00440 return std::move(obj);
00441 }
00442
00443 inline void clear()
00444 {
00445 _list.clear();
00446 }
00447
00448 inline void touch()
00449 {
00450 if (_taker && _taker != current())
00451 resume(_taker);
00452 }
00453
00454 inline size_t size()
00455 {
00456 return _list.size();
00457 }
00458
00459 inline bool empty()
00460 {
00461 return _list.empty();
00462 }
00463
00464 private:
00465 std::list<Type> _list;
00466 routine_t _taker;
00467 };
00468
00469 }
00470 #endif //STDEX_COROUTINE_H_