Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011 #ifndef ASIO_DETAIL_STRAND_SERVICE_HPP
00012 #define ASIO_DETAIL_STRAND_SERVICE_HPP
00013
00014 #if defined(_MSC_VER) && (_MSC_VER >= 1200)
00015 # pragma once
00016 #endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
00017
00018 #include "asio/detail/push_options.hpp"
00019
00020 #include "asio/detail/push_options.hpp"
00021 #include <boost/aligned_storage.hpp>
00022 #include <boost/assert.hpp>
00023 #include <boost/detail/atomic_count.hpp>
00024 #include <boost/intrusive_ptr.hpp>
00025 #include "asio/detail/pop_options.hpp"
00026
00027 #include "asio/io_service.hpp"
00028 #include "asio/detail/bind_handler.hpp"
00029 #include "asio/detail/call_stack.hpp"
00030 #include "asio/detail/handler_alloc_helpers.hpp"
00031 #include "asio/detail/handler_invoke_helpers.hpp"
00032 #include "asio/detail/mutex.hpp"
00033 #include "asio/detail/noncopyable.hpp"
00034 #include "asio/detail/service_base.hpp"
00035
00036 namespace asio {
00037 namespace detail {
00038
00039
00040 class strand_service
00041 : public asio::detail::service_base<strand_service>
00042 {
00043 public:
00044 class handler_base;
00045 class invoke_current_handler;
00046 class post_next_waiter_on_exit;
00047
00048
00049 class strand_impl
00050 {
00051 #if defined (__BORLANDC__)
00052 public:
00053 #else
00054 private:
00055 #endif
00056 void add_ref()
00057 {
00058 ++ref_count_;
00059 }
00060
00061 void release()
00062 {
00063 if (--ref_count_ == 0)
00064 delete this;
00065 }
00066
00067 private:
00068
00069 friend class strand_service;
00070 friend class post_next_waiter_on_exit;
00071 friend class invoke_current_handler;
00072
00073 strand_impl(strand_service& owner)
00074 : owner_(owner),
00075 current_handler_(0),
00076 first_waiter_(0),
00077 last_waiter_(0),
00078 ref_count_(0)
00079 {
00080
00081 asio::detail::mutex::scoped_lock lock(owner_.mutex_);
00082 next_ = owner_.impl_list_;
00083 prev_ = 0;
00084 if (owner_.impl_list_)
00085 owner_.impl_list_->prev_ = this;
00086 owner_.impl_list_ = this;
00087 }
00088
00089 ~strand_impl()
00090 {
00091
00092 asio::detail::mutex::scoped_lock lock(owner_.mutex_);
00093 if (owner_.impl_list_ == this)
00094 owner_.impl_list_ = next_;
00095 if (prev_)
00096 prev_->next_ = next_;
00097 if (next_)
00098 next_->prev_= prev_;
00099 next_ = 0;
00100 prev_ = 0;
00101 lock.unlock();
00102
00103 if (current_handler_)
00104 {
00105 current_handler_->destroy();
00106 }
00107
00108 while (first_waiter_)
00109 {
00110 handler_base* next = first_waiter_->next_;
00111 first_waiter_->destroy();
00112 first_waiter_ = next;
00113 }
00114 }
00115
00116
00117 asio::detail::mutex mutex_;
00118
00119
00120 strand_service& owner_;
00121
00122
00123
00124 handler_base* current_handler_;
00125
00126
00127 handler_base* first_waiter_;
00128
00129
00130 handler_base* last_waiter_;
00131
00132
00133 typedef boost::aligned_storage<128> handler_storage_type;
00134 #if defined(__BORLANDC__)
00135 boost::aligned_storage<128> handler_storage_;
00136 #else
00137 handler_storage_type handler_storage_;
00138 #endif
00139
00140
00141 strand_impl* next_;
00142 strand_impl* prev_;
00143
00144
00145 boost::detail::atomic_count ref_count_;
00146
00147 #if !defined(__BORLANDC__)
00148 friend void intrusive_ptr_add_ref(strand_impl* p)
00149 {
00150 p->add_ref();
00151 }
00152
00153 friend void intrusive_ptr_release(strand_impl* p)
00154 {
00155 p->release();
00156 }
00157 #endif
00158 };
00159
00160 friend class strand_impl;
00161
00162 typedef boost::intrusive_ptr<strand_impl> implementation_type;
00163
00164
00165 class handler_base
00166 {
00167 public:
00168 typedef void (*invoke_func_type)(handler_base*,
00169 strand_service&, implementation_type&);
00170 typedef void (*destroy_func_type)(handler_base*);
00171
00172 handler_base(invoke_func_type invoke_func, destroy_func_type destroy_func)
00173 : next_(0),
00174 invoke_func_(invoke_func),
00175 destroy_func_(destroy_func)
00176 {
00177 }
00178
00179 void invoke(strand_service& service_impl, implementation_type& impl)
00180 {
00181 invoke_func_(this, service_impl, impl);
00182 }
00183
00184 void destroy()
00185 {
00186 destroy_func_(this);
00187 }
00188
00189 protected:
00190 ~handler_base()
00191 {
00192 }
00193
00194 private:
00195 friend class strand_service;
00196 friend class strand_impl;
00197 friend class post_next_waiter_on_exit;
00198 handler_base* next_;
00199 invoke_func_type invoke_func_;
00200 destroy_func_type destroy_func_;
00201 };
00202
00203
00204 class invoke_current_handler
00205 {
00206 public:
00207 invoke_current_handler(strand_service& service_impl,
00208 const implementation_type& impl)
00209 : service_impl_(service_impl),
00210 impl_(impl)
00211 {
00212 }
00213
00214 void operator()()
00215 {
00216 impl_->current_handler_->invoke(service_impl_, impl_);
00217 }
00218
00219 friend void* asio_handler_allocate(std::size_t size,
00220 invoke_current_handler* this_handler)
00221 {
00222 return this_handler->do_handler_allocate(size);
00223 }
00224
00225 friend void asio_handler_deallocate(void*, std::size_t,
00226 invoke_current_handler*)
00227 {
00228 }
00229
00230 void* do_handler_allocate(std::size_t size)
00231 {
00232 #if defined(__BORLANDC__)
00233 BOOST_ASSERT(size <= boost::aligned_storage<128>::size);
00234 #else
00235 BOOST_ASSERT(size <= strand_impl::handler_storage_type::size);
00236 #endif
00237 (void)size;
00238 return impl_->handler_storage_.address();
00239 }
00240
00241
00242
00243
00244
00245 private:
00246 strand_service& service_impl_;
00247 implementation_type impl_;
00248 };
00249
00250
00251 class post_next_waiter_on_exit
00252 {
00253 public:
00254 post_next_waiter_on_exit(strand_service& service_impl,
00255 implementation_type& impl)
00256 : service_impl_(service_impl),
00257 impl_(impl),
00258 cancelled_(false)
00259 {
00260 }
00261
00262 ~post_next_waiter_on_exit()
00263 {
00264 if (!cancelled_)
00265 {
00266 asio::detail::mutex::scoped_lock lock(impl_->mutex_);
00267 impl_->current_handler_ = impl_->first_waiter_;
00268 if (impl_->current_handler_)
00269 {
00270 impl_->first_waiter_ = impl_->first_waiter_->next_;
00271 if (impl_->first_waiter_ == 0)
00272 impl_->last_waiter_ = 0;
00273 lock.unlock();
00274 service_impl_.get_io_service().post(
00275 invoke_current_handler(service_impl_, impl_));
00276 }
00277 }
00278 }
00279
00280 void cancel()
00281 {
00282 cancelled_ = true;
00283 }
00284
00285 private:
00286 strand_service& service_impl_;
00287 implementation_type& impl_;
00288 bool cancelled_;
00289 };
00290
00291
00292 template <typename Handler>
00293 class handler_wrapper
00294 : public handler_base
00295 {
00296 public:
00297 handler_wrapper(Handler handler)
00298 : handler_base(&handler_wrapper<Handler>::do_invoke,
00299 &handler_wrapper<Handler>::do_destroy),
00300 handler_(handler)
00301 {
00302 }
00303
00304 static void do_invoke(handler_base* base,
00305 strand_service& service_impl, implementation_type& impl)
00306 {
00307
00308 typedef handler_wrapper<Handler> this_type;
00309 this_type* h(static_cast<this_type*>(base));
00310 typedef handler_alloc_traits<Handler, this_type> alloc_traits;
00311 handler_ptr<alloc_traits> ptr(h->handler_, h);
00312
00313 post_next_waiter_on_exit p1(service_impl, impl);
00314
00315
00316
00317 Handler handler(h->handler_);
00318
00319
00320
00321
00322
00323 p1.cancel();
00324 post_next_waiter_on_exit p2(service_impl, impl);
00325
00326
00327 ptr.reset();
00328
00329
00330 call_stack<strand_impl>::context ctx(impl.get());
00331
00332
00333 asio_handler_invoke_helpers::invoke(handler, &handler);
00334 }
00335
00336 static void do_destroy(handler_base* base)
00337 {
00338
00339 typedef handler_wrapper<Handler> this_type;
00340 this_type* h(static_cast<this_type*>(base));
00341 typedef handler_alloc_traits<Handler, this_type> alloc_traits;
00342 handler_ptr<alloc_traits> ptr(h->handler_, h);
00343
00344
00345
00346
00347
00348 Handler handler(h->handler_);
00349 (void)handler;
00350
00351
00352 ptr.reset();
00353 }
00354
00355 private:
00356 Handler handler_;
00357 };
00358
00359
00360 explicit strand_service(asio::io_service& io_service)
00361 : asio::detail::service_base<strand_service>(io_service),
00362 mutex_(),
00363 impl_list_(0)
00364 {
00365 }
00366
00367
00368 void shutdown_service()
00369 {
00370
00371 asio::detail::mutex::scoped_lock lock(mutex_);
00372 strand_impl* impl = impl_list_;
00373 handler_base* first_handler = 0;
00374 while (impl)
00375 {
00376 if (impl->current_handler_)
00377 {
00378 impl->current_handler_->next_ = first_handler;
00379 first_handler = impl->current_handler_;
00380 impl->current_handler_ = 0;
00381 }
00382 if (impl->first_waiter_)
00383 {
00384 impl->last_waiter_->next_ = first_handler;
00385 first_handler = impl->first_waiter_;
00386 impl->first_waiter_ = 0;
00387 impl->last_waiter_ = 0;
00388 }
00389 impl = impl->next_;
00390 }
00391
00392
00393 lock.unlock();
00394 while (first_handler)
00395 {
00396 handler_base* next = first_handler->next_;
00397 first_handler->destroy();
00398 first_handler = next;
00399 }
00400 }
00401
00402
00403 void construct(implementation_type& impl)
00404 {
00405 impl = implementation_type(new strand_impl(*this));
00406 }
00407
00408
00409 void destroy(implementation_type& impl)
00410 {
00411 implementation_type().swap(impl);
00412 }
00413
00414
00415 template <typename Handler>
00416 void dispatch(implementation_type& impl, Handler handler)
00417 {
00418 if (call_stack<strand_impl>::contains(impl.get()))
00419 {
00420 asio_handler_invoke_helpers::invoke(handler, &handler);
00421 }
00422 else
00423 {
00424
00425 typedef handler_wrapper<Handler> value_type;
00426 typedef handler_alloc_traits<Handler, value_type> alloc_traits;
00427 raw_handler_ptr<alloc_traits> raw_ptr(handler);
00428 handler_ptr<alloc_traits> ptr(raw_ptr, handler);
00429
00430 asio::detail::mutex::scoped_lock lock(impl->mutex_);
00431
00432 if (impl->current_handler_ == 0)
00433 {
00434
00435 impl->current_handler_ = ptr.release();
00436 lock.unlock();
00437 this->get_io_service().dispatch(invoke_current_handler(*this, impl));
00438 }
00439 else
00440 {
00441
00442
00443
00444 if (impl->last_waiter_)
00445 {
00446 impl->last_waiter_->next_ = ptr.get();
00447 impl->last_waiter_ = impl->last_waiter_->next_;
00448 }
00449 else
00450 {
00451 impl->first_waiter_ = ptr.get();
00452 impl->last_waiter_ = ptr.get();
00453 }
00454 ptr.release();
00455 }
00456 }
00457 }
00458
00459
00460 template <typename Handler>
00461 void post(implementation_type& impl, Handler handler)
00462 {
00463
00464 typedef handler_wrapper<Handler> value_type;
00465 typedef handler_alloc_traits<Handler, value_type> alloc_traits;
00466 raw_handler_ptr<alloc_traits> raw_ptr(handler);
00467 handler_ptr<alloc_traits> ptr(raw_ptr, handler);
00468
00469 asio::detail::mutex::scoped_lock lock(impl->mutex_);
00470
00471 if (impl->current_handler_ == 0)
00472 {
00473
00474 impl->current_handler_ = ptr.release();
00475 lock.unlock();
00476 this->get_io_service().post(invoke_current_handler(*this, impl));
00477 }
00478 else
00479 {
00480
00481
00482
00483 if (impl->last_waiter_)
00484 {
00485 impl->last_waiter_->next_ = ptr.get();
00486 impl->last_waiter_ = impl->last_waiter_->next_;
00487 }
00488 else
00489 {
00490 impl->first_waiter_ = ptr.get();
00491 impl->last_waiter_ = ptr.get();
00492 }
00493 ptr.release();
00494 }
00495 }
00496
00497 private:
00498
00499 asio::detail::mutex mutex_;
00500
00501
00502 strand_impl* impl_list_;
00503 };
00504
00505 }
00506 }
00507
00508 #if defined(__BORLANDC__)
00509
00510 namespace boost {
00511
00512 inline void intrusive_ptr_add_ref(
00513 asio::detail::strand_service::strand_impl* p)
00514 {
00515 p->add_ref();
00516 }
00517
00518 inline void intrusive_ptr_release(
00519 asio::detail::strand_service::strand_impl* p)
00520 {
00521 p->release();
00522 }
00523
00524 }
00525
00526 #endif // defined(__BORLANDC__)
00527
00528 #include "asio/detail/pop_options.hpp"
00529
00530 #endif // ASIO_DETAIL_STRAND_SERVICE_HPP