Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011 #ifndef ASIO_DETAIL_TASK_IO_SERVICE_2LOCK_HPP
00012 #define ASIO_DETAIL_TASK_IO_SERVICE_2LOCK_HPP
00013
00014 #if defined(_MSC_VER) && (_MSC_VER >= 1200)
00015 # pragma once
00016 #endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
00017
00018 #include "asio/detail/push_options.hpp"
00019
00020 #include "asio/error_code.hpp"
00021 #include "asio/io_service.hpp"
00022 #include "asio/detail/call_stack.hpp"
00023 #include "asio/detail/event.hpp"
00024 #include "asio/detail/handler_alloc_helpers.hpp"
00025 #include "asio/detail/handler_invoke_helpers.hpp"
00026 #include "asio/detail/indirect_handler_queue.hpp"
00027 #include "asio/detail/mutex.hpp"
00028 #include "asio/detail/service_base.hpp"
00029 #include "asio/detail/task_io_service_fwd.hpp"
00030
00031 #include "asio/detail/push_options.hpp"
00032 #include <boost/detail/atomic_count.hpp>
00033 #include "asio/detail/pop_options.hpp"
00034
00035 namespace asio {
00036 namespace detail {
00037
00038
00039
00040 template <typename Task>
00041 class task_io_service
00042 : public asio::detail::service_base<task_io_service<Task> >
00043 {
00044 public:
00045 typedef indirect_handler_queue handler_queue;
00046
00047
00048 task_io_service(asio::io_service& io_service)
00049 : asio::detail::service_base<task_io_service<Task> >(io_service),
00050 front_mutex_(),
00051 back_mutex_(),
00052 task_(use_service<Task>(io_service)),
00053 outstanding_work_(0),
00054 front_stopped_(false),
00055 back_stopped_(false),
00056 back_shutdown_(false),
00057 back_first_idle_thread_(0),
00058 back_task_thread_(0)
00059 {
00060 handler_queue_.push(&task_handler_);
00061 }
00062
00063 void init(size_t )
00064 {
00065 }
00066
00067
00068 void shutdown_service()
00069 {
00070 asio::detail::mutex::scoped_lock back_lock(back_mutex_);
00071 back_shutdown_ = true;
00072 back_lock.unlock();
00073
00074
00075 while (handler_queue::handler* h = handler_queue_.pop())
00076 if (h != &task_handler_)
00077 h->destroy();
00078
00079
00080 handler_queue_.push(&task_handler_);
00081 }
00082
00083
00084 size_t run(asio::error_code& ec)
00085 {
00086 if (outstanding_work_ == 0)
00087 {
00088 stop();
00089 ec = asio::error_code();
00090 return 0;
00091 }
00092
00093 typename call_stack<task_io_service>::context ctx(this);
00094
00095 idle_thread_info this_idle_thread;
00096 this_idle_thread.next = 0;
00097
00098 size_t n = 0;
00099 while (do_one(&this_idle_thread, ec))
00100 if (n != (std::numeric_limits<size_t>::max)())
00101 ++n;
00102 return n;
00103 }
00104
00105
00106 size_t run_one(asio::error_code& ec)
00107 {
00108 if (outstanding_work_ == 0)
00109 {
00110 stop();
00111 ec = asio::error_code();
00112 return 0;
00113 }
00114
00115 typename call_stack<task_io_service>::context ctx(this);
00116
00117 idle_thread_info this_idle_thread;
00118 this_idle_thread.next = 0;
00119
00120 return do_one(&this_idle_thread, ec);
00121 }
00122
00123
00124 size_t poll(asio::error_code& ec)
00125 {
00126 if (outstanding_work_ == 0)
00127 {
00128 stop();
00129 ec = asio::error_code();
00130 return 0;
00131 }
00132
00133 typename call_stack<task_io_service>::context ctx(this);
00134
00135 size_t n = 0;
00136 while (do_one(0, ec))
00137 if (n != (std::numeric_limits<size_t>::max)())
00138 ++n;
00139 return n;
00140 }
00141
00142
00143 size_t poll_one(asio::error_code& ec)
00144 {
00145 if (outstanding_work_ == 0)
00146 {
00147 stop();
00148 ec = asio::error_code();
00149 return 0;
00150 }
00151
00152 typename call_stack<task_io_service>::context ctx(this);
00153
00154 return do_one(0, ec);
00155 }
00156
00157
00158 void stop()
00159 {
00160 asio::detail::mutex::scoped_lock front_lock(front_mutex_);
00161 front_stopped_ = true;
00162 front_lock.unlock();
00163
00164 asio::detail::mutex::scoped_lock back_lock(back_mutex_);
00165 back_stopped_ = true;
00166 interrupt_all_idle_threads(back_lock);
00167 }
00168
00169
00170 void reset()
00171 {
00172 asio::detail::mutex::scoped_lock front_lock(front_mutex_);
00173 front_stopped_ = false;
00174 front_lock.unlock();
00175
00176 asio::detail::mutex::scoped_lock back_lock(back_mutex_);
00177 back_stopped_ = false;
00178 }
00179
00180
00181 void work_started()
00182 {
00183 ++outstanding_work_;
00184 }
00185
00186
00187 void work_finished()
00188 {
00189 if (--outstanding_work_ == 0)
00190 stop();
00191 }
00192
00193
00194 template <typename Handler>
00195 void dispatch(Handler handler)
00196 {
00197 if (call_stack<task_io_service>::contains(this))
00198 asio_handler_invoke_helpers::invoke(handler, &handler);
00199 else
00200 post(handler);
00201 }
00202
00203
00204 template <typename Handler>
00205 void post(Handler handler)
00206 {
00207
00208 handler_queue::scoped_ptr ptr(handler_queue::wrap(handler));
00209
00210 asio::detail::mutex::scoped_lock back_lock(back_mutex_);
00211
00212
00213 if (back_shutdown_)
00214 return;
00215
00216
00217 handler_queue_.push(ptr.get());
00218 ptr.release();
00219
00220
00221 ++outstanding_work_;
00222
00223
00224 interrupt_one_idle_thread(back_lock);
00225 }
00226
00227 private:
00228 struct idle_thread_info;
00229
00230 size_t do_one(idle_thread_info* this_idle_thread,
00231 asio::error_code& ec)
00232 {
00233 bool task_has_run = false;
00234 for (;;)
00235 {
00236
00237 asio::detail::mutex::scoped_lock front_lock(front_mutex_);
00238 if (front_stopped_)
00239 {
00240 ec = asio::error_code();
00241 return 0;
00242 }
00243
00244 if (handler_queue::handler* h = handler_queue_.pop())
00245 {
00246 if (h == &task_handler_)
00247 {
00248 bool more_handlers = handler_queue_.poppable();
00249 unsigned long front_version = handler_queue_.front_version();
00250 front_lock.unlock();
00251
00252
00253
00254 task_cleanup c(*this);
00255
00256
00257 bool polling = !this_idle_thread;
00258 if (task_has_run && polling)
00259 {
00260 ec = asio::error_code();
00261 return 0;
00262 }
00263
00264
00265
00266
00267 if (!more_handlers && !polling)
00268 {
00269 asio::detail::mutex::scoped_lock back_lock(back_mutex_);
00270 if (back_stopped_)
00271 {
00272 ec = asio::error_code();
00273 return 0;
00274 }
00275 else if (front_version == handler_queue_.back_version())
00276 {
00277 back_task_thread_ = this_idle_thread;
00278 }
00279 else
00280 {
00281 more_handlers = true;
00282 }
00283 }
00284
00285
00286
00287
00288 task_has_run = true;
00289 task_.run(!more_handlers && !polling);
00290 }
00291 else
00292 {
00293 front_lock.unlock();
00294 handler_cleanup c(*this);
00295
00296
00297 h->invoke();
00298
00299 ec = asio::error_code();
00300 return 1;
00301 }
00302 }
00303 else if (this_idle_thread)
00304 {
00305 unsigned long front_version = handler_queue_.front_version();
00306 front_lock.unlock();
00307
00308
00309
00310
00311 asio::detail::mutex::scoped_lock back_lock(back_mutex_);
00312 if (back_stopped_)
00313 {
00314 ec = asio::error_code();
00315 return 0;
00316 }
00317 else if (front_version == handler_queue_.back_version())
00318 {
00319 this_idle_thread->next = back_first_idle_thread_;
00320 back_first_idle_thread_ = this_idle_thread;
00321 this_idle_thread->wakeup_event.clear(back_lock);
00322 this_idle_thread->wakeup_event.wait(back_lock);
00323 }
00324 }
00325 else
00326 {
00327 ec = asio::error_code();
00328 return 0;
00329 }
00330 }
00331 }
00332
00333
00334 void interrupt_one_idle_thread(
00335 asio::detail::mutex::scoped_lock& back_lock)
00336 {
00337 if (back_first_idle_thread_)
00338 {
00339 idle_thread_info* idle_thread = back_first_idle_thread_;
00340 back_first_idle_thread_ = idle_thread->next;
00341 idle_thread->next = 0;
00342 idle_thread->wakeup_event.signal(back_lock);
00343 }
00344 else if (back_task_thread_)
00345 {
00346 back_task_thread_ = 0;
00347 task_.interrupt();
00348 }
00349 }
00350
00351
00352 void interrupt_all_idle_threads(
00353 asio::detail::mutex::scoped_lock& back_lock)
00354 {
00355 while (back_first_idle_thread_)
00356 {
00357 idle_thread_info* idle_thread = back_first_idle_thread_;
00358 back_first_idle_thread_ = idle_thread->next;
00359 idle_thread->next = 0;
00360 idle_thread->wakeup_event.signal(back_lock);
00361 }
00362
00363 if (back_task_thread_)
00364 {
00365 back_task_thread_ = 0;
00366 task_.interrupt();
00367 }
00368 }
00369
00370
00371 class task_cleanup;
00372 friend class task_cleanup;
00373 class task_cleanup
00374 {
00375 public:
00376 task_cleanup(task_io_service& task_io_svc)
00377 : task_io_service_(task_io_svc)
00378 {
00379 }
00380
00381 ~task_cleanup()
00382 {
00383
00384 asio::detail::mutex::scoped_lock back_lock(
00385 task_io_service_.back_mutex_);
00386 task_io_service_.back_task_thread_ = 0;
00387 task_io_service_.handler_queue_.push(&task_io_service_.task_handler_);
00388 }
00389
00390 private:
00391 task_io_service& task_io_service_;
00392 };
00393
00394
00395 class handler_cleanup
00396 {
00397 public:
00398 handler_cleanup(task_io_service& task_io_svc)
00399 : task_io_service_(task_io_svc)
00400 {
00401 }
00402
00403 ~handler_cleanup()
00404 {
00405 task_io_service_.work_finished();
00406 }
00407
00408 private:
00409 task_io_service& task_io_service_;
00410 };
00411
00412
00413 asio::detail::mutex front_mutex_;
00414 asio::detail::mutex back_mutex_;
00415
00416
00417 Task& task_;
00418
00419
00420 class task_handler
00421 : public handler_queue::handler
00422 {
00423 public:
00424 task_handler()
00425 : handler_queue::handler(0, 0)
00426 {
00427 }
00428 } task_handler_;
00429
00430
00431 boost::detail::atomic_count outstanding_work_;
00432
00433
00434 handler_queue handler_queue_;
00435
00436
00437 bool front_stopped_;
00438 bool back_stopped_;
00439
00440
00441 bool back_shutdown_;
00442
00443
00444 struct idle_thread_info
00445 {
00446 event wakeup_event;
00447 idle_thread_info* next;
00448 };
00449
00450
00451 idle_thread_info* back_first_idle_thread_;
00452
00453
00454 idle_thread_info* back_task_thread_;
00455 };
00456
00457 }
00458 }
00459
00460 #include "asio/detail/pop_options.hpp"
00461
00462 #endif // ASIO_DETAIL_TASK_IO_SERVICE_2LOCK_HPP