Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020 #ifndef STDEX_COROUTINE_H_
00021 #define STDEX_COROUTINE_H_
00022
00023 #ifndef STACK_LIMIT
00024 #define STACK_LIMIT (1024 * 1024)
00025 #endif
00026
00027 #include <cstdint>
00028 #include <cstring>
00029 #include <cstdio>
00030 #include <cassert>
00031
00032 #include <string>
00033 #include <vector>
00034 #include <list>
00035 #include <thread>
00036 #include <future>
00037
00038 using ::std::string;
00039 using ::std::wstring;
00040
00041 #ifdef _MSC_VER
00042 #include <Windows.h>
00043 #else
00044 #if defined(__APPLE__) && defined(__MACH__)
00045 #define _XOPEN_SOURCE
00046 #include <ucontext.h>
00047 #else
00048 #include <ucontext.h>
00049 #endif
00050 #endif
00051
00052 namespace coroutine
00053 {
00054 typedef unsigned routine_t;
00055
00056 enum class ResumeResult
00057 {
00058 INVALID = -1,
00059 FINISHED = -2,
00060 YIELD = 0
00061 };
00062
00063 #ifdef _MSC_VER
00064
00065 struct Routine
00066 {
00067 std::function<void()> func;
00068 bool finished;
00069 LPVOID fiber;
00070
00071 Routine(std::function<void()> f)
00072 {
00073 func = f;
00074 finished = false;
00075 fiber = nullptr;
00076 }
00077
00078 ~Routine()
00079 {
00080 DeleteFiber(fiber);
00081 }
00082 };
00083
00084 struct Ordinator
00085 {
00086 std::vector<Routine*> routines;
00087 std::list<routine_t> indexes;
00088 routine_t current;
00089 size_t stack_size;
00090 LPVOID fiber;
00091
00092 Ordinator(size_t ss = STACK_LIMIT)
00093 {
00094 current = 0;
00095 stack_size = ss;
00096 fiber = ConvertThreadToFiber(nullptr);
00097 }
00098
00099 ~Ordinator()
00100 {
00101 for (auto& routine : routines)
00102 delete routine;
00103 }
00104 };
00105
00106 thread_local static Ordinator ordinator;
00107
00108 inline routine_t create(std::function<void()> f)
00109 {
00110 Routine* routine = new Routine(f);
00111
00112 if (ordinator.indexes.empty())
00113 {
00114 ordinator.routines.push_back(routine);
00115 return (routine_t)ordinator.routines.size();
00116 }
00117 else
00118 {
00119 routine_t id = ordinator.indexes.front();
00120 ordinator.indexes.pop_front();
00121 assert(ordinator.routines[id - 1] == nullptr);
00122 ordinator.routines[id - 1] = routine;
00123 return id;
00124 }
00125 }
00126
00127 inline void destroy(routine_t id)
00128 {
00129 Routine* routine = ordinator.routines[id - 1];
00130 assert(routine != nullptr);
00131
00132 delete routine;
00133 ordinator.routines[id - 1] = nullptr;
00134 ordinator.indexes.push_back(id);
00135 }
00136
00137 inline void __stdcall entry(LPVOID)
00138 {
00139 routine_t id = ordinator.current;
00140 Routine* routine = ordinator.routines[id - 1];
00141 assert(routine != nullptr);
00142
00143 routine->func();
00144
00145 routine->finished = true;
00146 ordinator.current = 0;
00147
00148 SwitchToFiber(ordinator.fiber);
00149 }
00150
00151 inline ResumeResult resume(routine_t id)
00152 {
00153 assert(ordinator.current == 0);
00154
00155 Routine* routine = ordinator.routines[id - 1];
00156 if (routine == nullptr)
00157 return ResumeResult::INVALID;
00158
00159 if (routine->finished)
00160 return ResumeResult::FINISHED;
00161
00162 if (routine->fiber == nullptr)
00163 {
00164 routine->fiber = CreateFiber(ordinator.stack_size, entry, 0);
00165 ordinator.current = id;
00166 SwitchToFiber(routine->fiber);
00167 }
00168 else
00169 {
00170 ordinator.current = id;
00171 SwitchToFiber(routine->fiber);
00172 }
00173
00174 return routine->finished ? ResumeResult::FINISHED : ResumeResult::YIELD;
00175 }
00176
00177 inline void yield()
00178 {
00179 routine_t id = ordinator.current;
00180 Routine* routine = ordinator.routines[id - 1];
00181 if (routine == nullptr)
00182 {
00183 throw std::runtime_error("Error in yield of coroutine");
00184 }
00185
00186 ordinator.current = 0;
00187 SwitchToFiber(ordinator.fiber);
00188 }
00189
00190 inline routine_t current()
00191 {
00192 return ordinator.current;
00193 }
00194
00195 #if 0
00196 template<typename Function>
00197 inline typename std::result_of<Function()>::type
00198 await(Function &&func)
00199 {
00200 auto future = std::async(std::launch::async, func);
00201 std::future_status status = future.wait_for(std::chrono::milliseconds(0));
00202
00203 while (status == std::future_status::timeout)
00204 {
00205 if (ordinator.current != 0)
00206 yield();
00207
00208 status = future.wait_for(std::chrono::milliseconds(0));
00209 }
00210 return future.get();
00211 }
00212 #endif
00213
00214 #if 1
00215 template <typename Function>
00216 inline std::result_of_t<std::decay_t<Function>()> await(Function&& func)
00217 {
00218 auto future = std::async(std::launch::async, func);
00219 std::future_status status = future.wait_for(std::chrono::milliseconds(0));
00220
00221 while (status == std::future_status::timeout)
00222 {
00223 if (ordinator.current != 0)
00224 yield();
00225
00226 status = future.wait_for(std::chrono::milliseconds(0));
00227 }
00228 return future.get();
00229 }
00230 #endif
00231
00232 #else
00233
00234 struct Routine
00235 {
00236 std::function<void()> func;
00237 char* stack;
00238 bool finished;
00239 ucontext_t ctx;
00240
00241 Routine(std::function<void()> f)
00242 {
00243 func = f;
00244 stack = nullptr;
00245 finished = false;
00246 }
00247
00248 ~Routine()
00249 {
00250 delete[] stack;
00251 }
00252 };
00253
00254 struct Ordinator
00255 {
00256 std::vector<Routine*> routines;
00257 std::list<routine_t> indexes;
00258 routine_t current;
00259 size_t stack_size;
00260 ucontext_t ctx;
00261
00262 inline Ordinator(size_t ss = STACK_LIMIT)
00263 {
00264 current = 0;
00265 stack_size = ss;
00266 }
00267
00268 inline ~Ordinator()
00269 {
00270 for (auto& routine : routines)
00271 delete routine;
00272 }
00273 };
00274
00275 thread_local static Ordinator ordinator;
00276
00277 inline routine_t create(std::function<void()> f)
00278 {
00279 Routine* routine = new Routine(f);
00280
00281 if (ordinator.indexes.empty())
00282 {
00283 ordinator.routines.push_back(routine);
00284 return ordinator.routines.size();
00285 }
00286 else
00287 {
00288 routine_t id = ordinator.indexes.front();
00289 ordinator.indexes.pop_front();
00290 assert(ordinator.routines[id - 1] == nullptr);
00291 ordinator.routines[id - 1] = routine;
00292 return id;
00293 }
00294 }
00295
00296 inline void destroy(routine_t id)
00297 {
00298 Routine* routine = ordinator.routines[id - 1];
00299 assert(routine != nullptr);
00300
00301 delete routine;
00302 ordinator.routines[id - 1] = nullptr;
00303 }
00304
00305 inline void entry()
00306 {
00307 routine_t id = ordinator.current;
00308 Routine* routine = ordinator.routines[id - 1];
00309 routine->func();
00310
00311 routine->finished = true;
00312 ordinator.current = 0;
00313 ordinator.indexes.push_back(id);
00314 }
00315
00316 inline ResumeResult resume(routine_t id)
00317 {
00318 assert(ordinator.current == 0);
00319
00320 Routine* routine = ordinator.routines[id - 1];
00321 if (routine == nullptr)
00322 return ResumeResult::INVALID;
00323
00324 if (routine->finished)
00325 return ResumeResult::FINISHED;
00326
00327 if (routine->stack == nullptr)
00328 {
00329
00330
00331
00332 getcontext(&routine->ctx);
00333
00334
00335
00336
00337 routine->stack = new char[ordinator.stack_size];
00338 routine->ctx.uc_stack.ss_sp = routine->stack;
00339 routine->ctx.uc_stack.ss_size = ordinator.stack_size;
00340 routine->ctx.uc_link = &ordinator.ctx;
00341 ordinator.current = id;
00342
00343
00344
00345
00346 makecontext(&routine->ctx, reinterpret_cast<void (*)(void)>(entry), 0);
00347
00348
00349
00350 swapcontext(&ordinator.ctx, &routine->ctx);
00351 }
00352 else
00353 {
00354 ordinator.current = id;
00355 swapcontext(&ordinator.ctx, &routine->ctx);
00356 }
00357
00358 return routine->finished ? ResumeResult::FINISHED : ResumeResult::YIELD;
00359 }
00360
00361 inline void yield()
00362 {
00363 routine_t id = ordinator.current;
00364 Routine* routine = ordinator.routines[id - 1];
00365 assert(routine != nullptr);
00366
00367 char* stack_top = routine->stack + ordinator.stack_size;
00368 char stack_bottom = 0;
00369 assert(size_t(stack_top - &stack_bottom) <= ordinator.stack_size);
00370
00371 ordinator.current = 0;
00372 swapcontext(&routine->ctx, &ordinator.ctx);
00373 }
00374
00375 inline routine_t current()
00376 {
00377 return ordinator.current;
00378 }
00379
00380 template <typename Function>
00381 inline typename std::result_of<Function()>::type await(Function&& func)
00382 {
00383 auto future = std::async(std::launch::async, func);
00384 std::future_status status = future.wait_for(std::chrono::milliseconds(0));
00385
00386 while (status == std::future_status::timeout)
00387 {
00388 if (ordinator.current != 0)
00389 yield();
00390
00391 status = future.wait_for(std::chrono::milliseconds(0));
00392 }
00393 return future.get();
00394 }
00395
00396 #endif
00397
00398 template <typename Type>
00399 class Channel
00400 {
00401 public:
00402 Channel()
00403 {
00404 _taker = 0;
00405 }
00406
00407 Channel(routine_t id)
00408 {
00409 _taker = id;
00410 }
00411
00412 inline void consumer(routine_t id)
00413 {
00414 _taker = id;
00415 }
00416
00417 inline void push(const Type& obj)
00418 {
00419 _list.push_back(obj);
00420 if (_taker && _taker != current())
00421 resume(_taker);
00422 }
00423
00424 inline void push(Type&& obj)
00425 {
00426 _list.push_back(std::move(obj));
00427 if (_taker && _taker != current())
00428 resume(_taker);
00429 }
00430
00431 inline Type pop()
00432 {
00433 if (!_taker)
00434 _taker = current();
00435
00436 while (_list.empty())
00437 yield();
00438
00439 Type obj = std::move(_list.front());
00440 _list.pop_front();
00441 return std::move(obj);
00442 }
00443
00444 inline void clear()
00445 {
00446 _list.clear();
00447 }
00448
00449 inline void touch()
00450 {
00451 if (_taker && _taker != current())
00452 resume(_taker);
00453 }
00454
00455 inline size_t size()
00456 {
00457 return _list.size();
00458 }
00459
00460 inline bool empty()
00461 {
00462 return _list.empty();
00463 }
00464
00465 private:
00466 std::list<Type> _list;
00467 routine_t _taker;
00468 };
00469
00470 }
00471 #endif //STDEX_COROUTINE_H_