$search
00001 // 00002 // strand_service.hpp 00003 // ~~~~~~~~~~~~~~~~~~ 00004 // 00005 // Copyright (c) 2003-2008 Christopher M. Kohlhoff (chris at kohlhoff dot com) 00006 // 00007 // Distributed under the Boost Software License, Version 1.0. (See accompanying 00008 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) 00009 // 00010 00011 #ifndef ASIO_DETAIL_STRAND_SERVICE_HPP 00012 #define ASIO_DETAIL_STRAND_SERVICE_HPP 00013 00014 #if defined(_MSC_VER) && (_MSC_VER >= 1200) 00015 # pragma once 00016 #endif // defined(_MSC_VER) && (_MSC_VER >= 1200) 00017 00018 #include "asio/detail/push_options.hpp" 00019 00020 #include "asio/detail/push_options.hpp" 00021 #include <boost/aligned_storage.hpp> 00022 #include <boost/assert.hpp> 00023 #include <boost/detail/atomic_count.hpp> 00024 #include <boost/intrusive_ptr.hpp> 00025 #include "asio/detail/pop_options.hpp" 00026 00027 #include "asio/io_service.hpp" 00028 #include "asio/detail/bind_handler.hpp" 00029 #include "asio/detail/call_stack.hpp" 00030 #include "asio/detail/handler_alloc_helpers.hpp" 00031 #include "asio/detail/handler_invoke_helpers.hpp" 00032 #include "asio/detail/mutex.hpp" 00033 #include "asio/detail/noncopyable.hpp" 00034 #include "asio/detail/service_base.hpp" 00035 00036 namespace asio { 00037 namespace detail { 00038 00039 // Default service implementation for a strand. 00040 class strand_service 00041 : public asio::detail::service_base<strand_service> 00042 { 00043 public: 00044 class handler_base; 00045 class invoke_current_handler; 00046 class post_next_waiter_on_exit; 00047 00048 // The underlying implementation of a strand. 00049 class strand_impl 00050 { 00051 #if defined (__BORLANDC__) 00052 public: 00053 #else 00054 private: 00055 #endif 00056 void add_ref() 00057 { 00058 ++ref_count_; 00059 } 00060 00061 void release() 00062 { 00063 if (--ref_count_ == 0) 00064 delete this; 00065 } 00066 00067 private: 00068 // Only this service will have access to the internal values. 00069 friend class strand_service; 00070 friend class post_next_waiter_on_exit; 00071 friend class invoke_current_handler; 00072 00073 strand_impl(strand_service& owner) 00074 : owner_(owner), 00075 current_handler_(0), 00076 first_waiter_(0), 00077 last_waiter_(0), 00078 ref_count_(0) 00079 { 00080 // Insert implementation into linked list of all implementations. 00081 asio::detail::mutex::scoped_lock lock(owner_.mutex_); 00082 next_ = owner_.impl_list_; 00083 prev_ = 0; 00084 if (owner_.impl_list_) 00085 owner_.impl_list_->prev_ = this; 00086 owner_.impl_list_ = this; 00087 } 00088 00089 ~strand_impl() 00090 { 00091 // Remove implementation from linked list of all implementations. 00092 asio::detail::mutex::scoped_lock lock(owner_.mutex_); 00093 if (owner_.impl_list_ == this) 00094 owner_.impl_list_ = next_; 00095 if (prev_) 00096 prev_->next_ = next_; 00097 if (next_) 00098 next_->prev_= prev_; 00099 next_ = 0; 00100 prev_ = 0; 00101 lock.unlock(); 00102 00103 if (current_handler_) 00104 { 00105 current_handler_->destroy(); 00106 } 00107 00108 while (first_waiter_) 00109 { 00110 handler_base* next = first_waiter_->next_; 00111 first_waiter_->destroy(); 00112 first_waiter_ = next; 00113 } 00114 } 00115 00116 // Mutex to protect access to internal data. 00117 asio::detail::mutex mutex_; 00118 00119 // The service that owns this implementation. 00120 strand_service& owner_; 00121 00122 // The handler that is ready to execute. If this pointer is non-null then it 00123 // indicates that a handler holds the lock. 00124 handler_base* current_handler_; 00125 00126 // The start of the list of waiting handlers for the strand. 00127 handler_base* first_waiter_; 00128 00129 // The end of the list of waiting handlers for the strand. 00130 handler_base* last_waiter_; 00131 00132 // Storage for posted handlers. 00133 typedef boost::aligned_storage<128> handler_storage_type; 00134 #if defined(__BORLANDC__) 00135 boost::aligned_storage<128> handler_storage_; 00136 #else 00137 handler_storage_type handler_storage_; 00138 #endif 00139 00140 // Pointers to adjacent socket implementations in linked list. 00141 strand_impl* next_; 00142 strand_impl* prev_; 00143 00144 // The reference count on the strand implementation. 00145 boost::detail::atomic_count ref_count_; 00146 00147 #if !defined(__BORLANDC__) 00148 friend void intrusive_ptr_add_ref(strand_impl* p) 00149 { 00150 p->add_ref(); 00151 } 00152 00153 friend void intrusive_ptr_release(strand_impl* p) 00154 { 00155 p->release(); 00156 } 00157 #endif 00158 }; 00159 00160 friend class strand_impl; 00161 00162 typedef boost::intrusive_ptr<strand_impl> implementation_type; 00163 00164 // Base class for all handler types. 00165 class handler_base 00166 { 00167 public: 00168 typedef void (*invoke_func_type)(handler_base*, 00169 strand_service&, implementation_type&); 00170 typedef void (*destroy_func_type)(handler_base*); 00171 00172 handler_base(invoke_func_type invoke_func, destroy_func_type destroy_func) 00173 : next_(0), 00174 invoke_func_(invoke_func), 00175 destroy_func_(destroy_func) 00176 { 00177 } 00178 00179 void invoke(strand_service& service_impl, implementation_type& impl) 00180 { 00181 invoke_func_(this, service_impl, impl); 00182 } 00183 00184 void destroy() 00185 { 00186 destroy_func_(this); 00187 } 00188 00189 protected: 00190 ~handler_base() 00191 { 00192 } 00193 00194 private: 00195 friend class strand_service; 00196 friend class strand_impl; 00197 friend class post_next_waiter_on_exit; 00198 handler_base* next_; 00199 invoke_func_type invoke_func_; 00200 destroy_func_type destroy_func_; 00201 }; 00202 00203 // Helper class to allow handlers to be dispatched. 00204 class invoke_current_handler 00205 { 00206 public: 00207 invoke_current_handler(strand_service& service_impl, 00208 const implementation_type& impl) 00209 : service_impl_(service_impl), 00210 impl_(impl) 00211 { 00212 } 00213 00214 void operator()() 00215 { 00216 impl_->current_handler_->invoke(service_impl_, impl_); 00217 } 00218 00219 friend void* asio_handler_allocate(std::size_t size, 00220 invoke_current_handler* this_handler) 00221 { 00222 return this_handler->do_handler_allocate(size); 00223 } 00224 00225 friend void asio_handler_deallocate(void*, std::size_t, 00226 invoke_current_handler*) 00227 { 00228 } 00229 00230 void* do_handler_allocate(std::size_t size) 00231 { 00232 #if defined(__BORLANDC__) 00233 BOOST_ASSERT(size <= boost::aligned_storage<128>::size); 00234 #else 00235 BOOST_ASSERT(size <= strand_impl::handler_storage_type::size); 00236 #endif 00237 (void)size; 00238 return impl_->handler_storage_.address(); 00239 } 00240 00241 // The asio_handler_invoke hook is not defined here since the default one 00242 // provides the correct behaviour, and including it here breaks MSVC 7.1 00243 // in some situations. 00244 00245 private: 00246 strand_service& service_impl_; 00247 implementation_type impl_; 00248 }; 00249 00250 // Helper class to automatically enqueue next waiter on block exit. 00251 class post_next_waiter_on_exit 00252 { 00253 public: 00254 post_next_waiter_on_exit(strand_service& service_impl, 00255 implementation_type& impl) 00256 : service_impl_(service_impl), 00257 impl_(impl), 00258 cancelled_(false) 00259 { 00260 } 00261 00262 ~post_next_waiter_on_exit() 00263 { 00264 if (!cancelled_) 00265 { 00266 asio::detail::mutex::scoped_lock lock(impl_->mutex_); 00267 impl_->current_handler_ = impl_->first_waiter_; 00268 if (impl_->current_handler_) 00269 { 00270 impl_->first_waiter_ = impl_->first_waiter_->next_; 00271 if (impl_->first_waiter_ == 0) 00272 impl_->last_waiter_ = 0; 00273 lock.unlock(); 00274 service_impl_.get_io_service().post( 00275 invoke_current_handler(service_impl_, impl_)); 00276 } 00277 } 00278 } 00279 00280 void cancel() 00281 { 00282 cancelled_ = true; 00283 } 00284 00285 private: 00286 strand_service& service_impl_; 00287 implementation_type& impl_; 00288 bool cancelled_; 00289 }; 00290 00291 // Class template for a waiter. 00292 template <typename Handler> 00293 class handler_wrapper 00294 : public handler_base 00295 { 00296 public: 00297 handler_wrapper(Handler handler) 00298 : handler_base(&handler_wrapper<Handler>::do_invoke, 00299 &handler_wrapper<Handler>::do_destroy), 00300 handler_(handler) 00301 { 00302 } 00303 00304 static void do_invoke(handler_base* base, 00305 strand_service& service_impl, implementation_type& impl) 00306 { 00307 // Take ownership of the handler object. 00308 typedef handler_wrapper<Handler> this_type; 00309 this_type* h(static_cast<this_type*>(base)); 00310 typedef handler_alloc_traits<Handler, this_type> alloc_traits; 00311 handler_ptr<alloc_traits> ptr(h->handler_, h); 00312 00313 post_next_waiter_on_exit p1(service_impl, impl); 00314 00315 // Make a copy of the handler so that the memory can be deallocated before 00316 // the upcall is made. 00317 Handler handler(h->handler_); 00318 00319 // A handler object must still be valid when the next waiter is posted 00320 // since destroying the last handler might cause the strand object to be 00321 // destroyed. Therefore we create a second post_next_waiter_on_exit object 00322 // that will be destroyed before the handler object. 00323 p1.cancel(); 00324 post_next_waiter_on_exit p2(service_impl, impl); 00325 00326 // Free the memory associated with the handler. 00327 ptr.reset(); 00328 00329 // Indicate that this strand is executing on the current thread. 00330 call_stack<strand_impl>::context ctx(impl.get()); 00331 00332 // Make the upcall. 00333 asio_handler_invoke_helpers::invoke(handler, &handler); 00334 } 00335 00336 static void do_destroy(handler_base* base) 00337 { 00338 // Take ownership of the handler object. 00339 typedef handler_wrapper<Handler> this_type; 00340 this_type* h(static_cast<this_type*>(base)); 00341 typedef handler_alloc_traits<Handler, this_type> alloc_traits; 00342 handler_ptr<alloc_traits> ptr(h->handler_, h); 00343 00344 // A sub-object of the handler may be the true owner of the memory 00345 // associated with the handler. Consequently, a local copy of the handler 00346 // is required to ensure that any owning sub-object remains valid until 00347 // after we have deallocated the memory here. 00348 Handler handler(h->handler_); 00349 (void)handler; 00350 00351 // Free the memory associated with the handler. 00352 ptr.reset(); 00353 } 00354 00355 private: 00356 Handler handler_; 00357 }; 00358 00359 // Construct a new strand service for the specified io_service. 00360 explicit strand_service(asio::io_service& io_service) 00361 : asio::detail::service_base<strand_service>(io_service), 00362 mutex_(), 00363 impl_list_(0) 00364 { 00365 } 00366 00367 // Destroy all user-defined handler objects owned by the service. 00368 void shutdown_service() 00369 { 00370 // Construct a list of all handlers to be destroyed. 00371 asio::detail::mutex::scoped_lock lock(mutex_); 00372 strand_impl* impl = impl_list_; 00373 handler_base* first_handler = 0; 00374 while (impl) 00375 { 00376 if (impl->current_handler_) 00377 { 00378 impl->current_handler_->next_ = first_handler; 00379 first_handler = impl->current_handler_; 00380 impl->current_handler_ = 0; 00381 } 00382 if (impl->first_waiter_) 00383 { 00384 impl->last_waiter_->next_ = first_handler; 00385 first_handler = impl->first_waiter_; 00386 impl->first_waiter_ = 0; 00387 impl->last_waiter_ = 0; 00388 } 00389 impl = impl->next_; 00390 } 00391 00392 // Destroy all handlers without holding the lock. 00393 lock.unlock(); 00394 while (first_handler) 00395 { 00396 handler_base* next = first_handler->next_; 00397 first_handler->destroy(); 00398 first_handler = next; 00399 } 00400 } 00401 00402 // Construct a new strand implementation. 00403 void construct(implementation_type& impl) 00404 { 00405 impl = implementation_type(new strand_impl(*this)); 00406 } 00407 00408 // Destroy a strand implementation. 00409 void destroy(implementation_type& impl) 00410 { 00411 implementation_type().swap(impl); 00412 } 00413 00414 // Request the io_service to invoke the given handler. 00415 template <typename Handler> 00416 void dispatch(implementation_type& impl, Handler handler) 00417 { 00418 if (call_stack<strand_impl>::contains(impl.get())) 00419 { 00420 asio_handler_invoke_helpers::invoke(handler, &handler); 00421 } 00422 else 00423 { 00424 // Allocate and construct an object to wrap the handler. 00425 typedef handler_wrapper<Handler> value_type; 00426 typedef handler_alloc_traits<Handler, value_type> alloc_traits; 00427 raw_handler_ptr<alloc_traits> raw_ptr(handler); 00428 handler_ptr<alloc_traits> ptr(raw_ptr, handler); 00429 00430 asio::detail::mutex::scoped_lock lock(impl->mutex_); 00431 00432 if (impl->current_handler_ == 0) 00433 { 00434 // This handler now has the lock, so can be dispatched immediately. 00435 impl->current_handler_ = ptr.release(); 00436 lock.unlock(); 00437 this->get_io_service().dispatch(invoke_current_handler(*this, impl)); 00438 } 00439 else 00440 { 00441 // Another handler already holds the lock, so this handler must join 00442 // the list of waiters. The handler will be posted automatically when 00443 // its turn comes. 00444 if (impl->last_waiter_) 00445 { 00446 impl->last_waiter_->next_ = ptr.get(); 00447 impl->last_waiter_ = impl->last_waiter_->next_; 00448 } 00449 else 00450 { 00451 impl->first_waiter_ = ptr.get(); 00452 impl->last_waiter_ = ptr.get(); 00453 } 00454 ptr.release(); 00455 } 00456 } 00457 } 00458 00459 // Request the io_service to invoke the given handler and return immediately. 00460 template <typename Handler> 00461 void post(implementation_type& impl, Handler handler) 00462 { 00463 // Allocate and construct an object to wrap the handler. 00464 typedef handler_wrapper<Handler> value_type; 00465 typedef handler_alloc_traits<Handler, value_type> alloc_traits; 00466 raw_handler_ptr<alloc_traits> raw_ptr(handler); 00467 handler_ptr<alloc_traits> ptr(raw_ptr, handler); 00468 00469 asio::detail::mutex::scoped_lock lock(impl->mutex_); 00470 00471 if (impl->current_handler_ == 0) 00472 { 00473 // This handler now has the lock, so can be dispatched immediately. 00474 impl->current_handler_ = ptr.release(); 00475 lock.unlock(); 00476 this->get_io_service().post(invoke_current_handler(*this, impl)); 00477 } 00478 else 00479 { 00480 // Another handler already holds the lock, so this handler must join the 00481 // list of waiters. The handler will be posted automatically when its turn 00482 // comes. 00483 if (impl->last_waiter_) 00484 { 00485 impl->last_waiter_->next_ = ptr.get(); 00486 impl->last_waiter_ = impl->last_waiter_->next_; 00487 } 00488 else 00489 { 00490 impl->first_waiter_ = ptr.get(); 00491 impl->last_waiter_ = ptr.get(); 00492 } 00493 ptr.release(); 00494 } 00495 } 00496 00497 private: 00498 // Mutex to protect access to the linked list of implementations. 00499 asio::detail::mutex mutex_; 00500 00501 // The head of a linked list of all implementations. 00502 strand_impl* impl_list_; 00503 }; 00504 00505 } // namespace detail 00506 } // namespace asio 00507 00508 #if defined(__BORLANDC__) 00509 00510 namespace boost { 00511 00512 inline void intrusive_ptr_add_ref( 00513 asio::detail::strand_service::strand_impl* p) 00514 { 00515 p->add_ref(); 00516 } 00517 00518 inline void intrusive_ptr_release( 00519 asio::detail::strand_service::strand_impl* p) 00520 { 00521 p->release(); 00522 } 00523 00524 } // namespace boost 00525 00526 #endif // defined(__BORLANDC__) 00527 00528 #include "asio/detail/pop_options.hpp" 00529 00530 #endif // ASIO_DETAIL_STRAND_SERVICE_HPP