Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011 #ifndef ASIO_DETAIL_TASK_IO_SERVICE_HPP
00012 #define ASIO_DETAIL_TASK_IO_SERVICE_HPP
00013
00014 #if defined(_MSC_VER) && (_MSC_VER >= 1200)
00015 # pragma once
00016 #endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
00017
00018 #if defined(ASIO_ENABLE_TWO_LOCK_QUEUE)
00019 #include "asio/detail/task_io_service_2lock.hpp"
00020 #else // defined(ASIO_ENABLE_TWO_LOCK_QUEUE)
00021
00022 #include "asio/detail/push_options.hpp"
00023
00024 #include "asio/error_code.hpp"
00025 #include "asio/io_service.hpp"
00026 #include "asio/detail/call_stack.hpp"
00027 #include "asio/detail/event.hpp"
00028 #include "asio/detail/handler_alloc_helpers.hpp"
00029 #include "asio/detail/handler_invoke_helpers.hpp"
00030 #include "asio/detail/handler_queue.hpp"
00031 #include "asio/detail/mutex.hpp"
00032 #include "asio/detail/service_base.hpp"
00033 #include "asio/detail/task_io_service_fwd.hpp"
00034
00035 namespace asio {
00036 namespace detail {
00037
00038 template <typename Task>
00039 class task_io_service
00040 : public asio::detail::service_base<task_io_service<Task> >
00041 {
00042 public:
00043
00044 task_io_service(asio::io_service& io_service)
00045 : asio::detail::service_base<task_io_service<Task> >(io_service),
00046 mutex_(),
00047 task_(use_service<Task>(io_service)),
00048 task_interrupted_(true),
00049 outstanding_work_(0),
00050 stopped_(false),
00051 shutdown_(false),
00052 first_idle_thread_(0)
00053 {
00054 handler_queue_.push(&task_handler_);
00055 }
00056
00057 void init(size_t )
00058 {
00059 }
00060
00061
00062 void shutdown_service()
00063 {
00064 asio::detail::mutex::scoped_lock lock(mutex_);
00065 shutdown_ = true;
00066 lock.unlock();
00067
00068
00069 while (!handler_queue_.empty())
00070 {
00071 handler_queue::handler* h = handler_queue_.front();
00072 handler_queue_.pop();
00073 if (h != &task_handler_)
00074 h->destroy();
00075 }
00076
00077
00078 handler_queue_.push(&task_handler_);
00079 }
00080
00081
00082 size_t run(asio::error_code& ec)
00083 {
00084 typename call_stack<task_io_service>::context ctx(this);
00085
00086 idle_thread_info this_idle_thread;
00087 this_idle_thread.next = 0;
00088
00089 asio::detail::mutex::scoped_lock lock(mutex_);
00090
00091 size_t n = 0;
00092 while (do_one(lock, &this_idle_thread, ec))
00093 if (n != (std::numeric_limits<size_t>::max)())
00094 ++n;
00095 return n;
00096 }
00097
00098
00099 size_t run_one(asio::error_code& ec)
00100 {
00101 typename call_stack<task_io_service>::context ctx(this);
00102
00103 idle_thread_info this_idle_thread;
00104 this_idle_thread.next = 0;
00105
00106 asio::detail::mutex::scoped_lock lock(mutex_);
00107
00108 return do_one(lock, &this_idle_thread, ec);
00109 }
00110
00111
00112 size_t poll(asio::error_code& ec)
00113 {
00114 typename call_stack<task_io_service>::context ctx(this);
00115
00116 asio::detail::mutex::scoped_lock lock(mutex_);
00117
00118 size_t n = 0;
00119 while (do_one(lock, 0, ec))
00120 if (n != (std::numeric_limits<size_t>::max)())
00121 ++n;
00122 return n;
00123 }
00124
00125
00126 size_t poll_one(asio::error_code& ec)
00127 {
00128 typename call_stack<task_io_service>::context ctx(this);
00129
00130 asio::detail::mutex::scoped_lock lock(mutex_);
00131
00132 return do_one(lock, 0, ec);
00133 }
00134
00135
00136 void stop()
00137 {
00138 asio::detail::mutex::scoped_lock lock(mutex_);
00139 stop_all_threads(lock);
00140 }
00141
00142
00143 void reset()
00144 {
00145 asio::detail::mutex::scoped_lock lock(mutex_);
00146 stopped_ = false;
00147 }
00148
00149
00150 void work_started()
00151 {
00152 asio::detail::mutex::scoped_lock lock(mutex_);
00153 ++outstanding_work_;
00154 }
00155
00156
00157 void work_finished()
00158 {
00159 asio::detail::mutex::scoped_lock lock(mutex_);
00160 if (--outstanding_work_ == 0)
00161 stop_all_threads(lock);
00162 }
00163
00164
00165 template <typename Handler>
00166 void dispatch(Handler handler)
00167 {
00168 if (call_stack<task_io_service>::contains(this))
00169 asio_handler_invoke_helpers::invoke(handler, &handler);
00170 else
00171 post(handler);
00172 }
00173
00174
00175 template <typename Handler>
00176 void post(Handler handler)
00177 {
00178
00179 handler_queue::scoped_ptr ptr(handler_queue::wrap(handler));
00180
00181 asio::detail::mutex::scoped_lock lock(mutex_);
00182
00183
00184 if (shutdown_)
00185 return;
00186
00187
00188 handler_queue_.push(ptr.get());
00189 ptr.release();
00190
00191
00192 ++outstanding_work_;
00193
00194
00195 if (!interrupt_one_idle_thread(lock))
00196 {
00197 if (!task_interrupted_)
00198 {
00199 task_interrupted_ = true;
00200 task_.interrupt();
00201 }
00202 }
00203 }
00204
00205 private:
00206 struct idle_thread_info;
00207
00208 size_t do_one(asio::detail::mutex::scoped_lock& lock,
00209 idle_thread_info* this_idle_thread, asio::error_code& ec)
00210 {
00211 if (outstanding_work_ == 0 && !stopped_)
00212 {
00213 stop_all_threads(lock);
00214 ec = asio::error_code();
00215 return 0;
00216 }
00217
00218 bool polling = !this_idle_thread;
00219 bool task_has_run = false;
00220 while (!stopped_)
00221 {
00222 if (!handler_queue_.empty())
00223 {
00224
00225 handler_queue::handler* h = handler_queue_.front();
00226 handler_queue_.pop();
00227
00228 if (h == &task_handler_)
00229 {
00230 bool more_handlers = (!handler_queue_.empty());
00231 task_interrupted_ = more_handlers || polling;
00232
00233
00234 if (task_has_run && polling)
00235 {
00236 task_interrupted_ = true;
00237 handler_queue_.push(&task_handler_);
00238 ec = asio::error_code();
00239 return 0;
00240 }
00241 task_has_run = true;
00242
00243 lock.unlock();
00244 task_cleanup c(lock, *this);
00245
00246
00247
00248
00249 task_.run(!more_handlers && !polling);
00250 }
00251 else
00252 {
00253 lock.unlock();
00254 handler_cleanup c(lock, *this);
00255
00256
00257 h->invoke();
00258
00259 ec = asio::error_code();
00260 return 1;
00261 }
00262 }
00263 else if (this_idle_thread)
00264 {
00265
00266 this_idle_thread->next = first_idle_thread_;
00267 first_idle_thread_ = this_idle_thread;
00268 this_idle_thread->wakeup_event.clear(lock);
00269 this_idle_thread->wakeup_event.wait(lock);
00270 }
00271 else
00272 {
00273 ec = asio::error_code();
00274 return 0;
00275 }
00276 }
00277
00278 ec = asio::error_code();
00279 return 0;
00280 }
00281
00282
00283 void stop_all_threads(
00284 asio::detail::mutex::scoped_lock& lock)
00285 {
00286 stopped_ = true;
00287 interrupt_all_idle_threads(lock);
00288 if (!task_interrupted_)
00289 {
00290 task_interrupted_ = true;
00291 task_.interrupt();
00292 }
00293 }
00294
00295
00296
00297 bool interrupt_one_idle_thread(
00298 asio::detail::mutex::scoped_lock& lock)
00299 {
00300 if (first_idle_thread_)
00301 {
00302 idle_thread_info* idle_thread = first_idle_thread_;
00303 first_idle_thread_ = idle_thread->next;
00304 idle_thread->next = 0;
00305 idle_thread->wakeup_event.signal(lock);
00306 return true;
00307 }
00308 return false;
00309 }
00310
00311
00312 void interrupt_all_idle_threads(
00313 asio::detail::mutex::scoped_lock& lock)
00314 {
00315 while (first_idle_thread_)
00316 {
00317 idle_thread_info* idle_thread = first_idle_thread_;
00318 first_idle_thread_ = idle_thread->next;
00319 idle_thread->next = 0;
00320 idle_thread->wakeup_event.signal(lock);
00321 }
00322 }
00323
00324
00325 class task_cleanup;
00326 friend class task_cleanup;
00327 class task_cleanup
00328 {
00329 public:
00330 task_cleanup(asio::detail::mutex::scoped_lock& lock,
00331 task_io_service& task_io_svc)
00332 : lock_(lock),
00333 task_io_service_(task_io_svc)
00334 {
00335 }
00336
00337 ~task_cleanup()
00338 {
00339
00340 lock_.lock();
00341 task_io_service_.task_interrupted_ = true;
00342 task_io_service_.handler_queue_.push(&task_io_service_.task_handler_);
00343 }
00344
00345 private:
00346 asio::detail::mutex::scoped_lock& lock_;
00347 task_io_service& task_io_service_;
00348 };
00349
00350
00351 class handler_cleanup;
00352 friend class handler_cleanup;
00353 class handler_cleanup
00354 {
00355 public:
00356 handler_cleanup(asio::detail::mutex::scoped_lock& lock,
00357 task_io_service& task_io_svc)
00358 : lock_(lock),
00359 task_io_service_(task_io_svc)
00360 {
00361 }
00362
00363 ~handler_cleanup()
00364 {
00365 lock_.lock();
00366 if (--task_io_service_.outstanding_work_ == 0)
00367 task_io_service_.stop_all_threads(lock_);
00368 }
00369
00370 private:
00371 asio::detail::mutex::scoped_lock& lock_;
00372 task_io_service& task_io_service_;
00373 };
00374
00375
00376 asio::detail::mutex mutex_;
00377
00378
00379 Task& task_;
00380
00381
00382 class task_handler
00383 : public handler_queue::handler
00384 {
00385 public:
00386 task_handler()
00387 : handler_queue::handler(0, 0)
00388 {
00389 }
00390 } task_handler_;
00391
00392
00393 bool task_interrupted_;
00394
00395
00396 int outstanding_work_;
00397
00398
00399 handler_queue handler_queue_;
00400
00401
00402 bool stopped_;
00403
00404
00405 bool shutdown_;
00406
00407
00408 struct idle_thread_info
00409 {
00410 event wakeup_event;
00411 idle_thread_info* next;
00412 };
00413
00414
00415 idle_thread_info* first_idle_thread_;
00416 };
00417
00418 }
00419 }
00420
00421 #include "asio/detail/pop_options.hpp"
00422
00423 #endif // defined(ASIO_ENABLE_TWO_LOCK_QUEUE)
00424
00425 #endif // ASIO_DETAIL_TASK_IO_SERVICE_HPP