strand_service.hpp
Go to the documentation of this file.
00001 //
00002 // strand_service.hpp
00003 // ~~~~~~~~~~~~~~~~~~
00004 //
00005 // Copyright (c) 2003-2008 Christopher M. Kohlhoff (chris at kohlhoff dot com)
00006 //
00007 // Distributed under the Boost Software License, Version 1.0. (See accompanying
00008 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
00009 //
00010 
00011 #ifndef ASIO_DETAIL_STRAND_SERVICE_HPP
00012 #define ASIO_DETAIL_STRAND_SERVICE_HPP
00013 
00014 #if defined(_MSC_VER) && (_MSC_VER >= 1200)
00015 # pragma once
00016 #endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
00017 
00018 #include "asio/detail/push_options.hpp"
00019 
00020 #include "asio/detail/push_options.hpp"
00021 #include <boost/aligned_storage.hpp>
00022 #include <boost/assert.hpp>
00023 #include <boost/detail/atomic_count.hpp>
00024 #include <boost/intrusive_ptr.hpp>
00025 #include "asio/detail/pop_options.hpp"
00026 
00027 #include "asio/io_service.hpp"
00028 #include "asio/detail/bind_handler.hpp"
00029 #include "asio/detail/call_stack.hpp"
00030 #include "asio/detail/handler_alloc_helpers.hpp"
00031 #include "asio/detail/handler_invoke_helpers.hpp"
00032 #include "asio/detail/mutex.hpp"
00033 #include "asio/detail/noncopyable.hpp"
00034 #include "asio/detail/service_base.hpp"
00035 
00036 namespace asio {
00037 namespace detail {
00038 
00039 // Default service implementation for a strand.
00040 class strand_service
00041   : public asio::detail::service_base<strand_service>
00042 {
00043 public:
00044   class handler_base;
00045   class invoke_current_handler;
00046   class post_next_waiter_on_exit;
00047 
00048   // The underlying implementation of a strand.
00049   class strand_impl
00050   {
00051 #if defined (__BORLANDC__)
00052   public:
00053 #else
00054   private:
00055 #endif
00056     void add_ref()
00057     {
00058       ++ref_count_;
00059     }
00060 
00061     void release()
00062     {
00063       if (--ref_count_ == 0)
00064         delete this;
00065     }
00066 
00067   private:
00068     // Only this service will have access to the internal values.
00069     friend class strand_service;
00070     friend class post_next_waiter_on_exit;
00071     friend class invoke_current_handler;
00072 
00073     strand_impl(strand_service& owner)
00074       : owner_(owner),
00075         current_handler_(0),
00076         first_waiter_(0),
00077         last_waiter_(0),
00078         ref_count_(0)
00079     {
00080       // Insert implementation into linked list of all implementations.
00081       asio::detail::mutex::scoped_lock lock(owner_.mutex_);
00082       next_ = owner_.impl_list_;
00083       prev_ = 0;
00084       if (owner_.impl_list_)
00085         owner_.impl_list_->prev_ = this;
00086       owner_.impl_list_ = this;
00087     }
00088 
00089     ~strand_impl()
00090     {
00091       // Remove implementation from linked list of all implementations.
00092       asio::detail::mutex::scoped_lock lock(owner_.mutex_);
00093       if (owner_.impl_list_ == this)
00094         owner_.impl_list_ = next_;
00095       if (prev_)
00096         prev_->next_ = next_;
00097       if (next_)
00098         next_->prev_= prev_;
00099       next_ = 0;
00100       prev_ = 0;
00101       lock.unlock();
00102 
00103       if (current_handler_)
00104       {
00105         current_handler_->destroy();
00106       }
00107 
00108       while (first_waiter_)
00109       {
00110         handler_base* next = first_waiter_->next_;
00111         first_waiter_->destroy();
00112         first_waiter_ = next;
00113       }
00114     }
00115 
00116     // Mutex to protect access to internal data.
00117     asio::detail::mutex mutex_;
00118 
00119     // The service that owns this implementation.
00120     strand_service& owner_;
00121 
00122     // The handler that is ready to execute. If this pointer is non-null then it
00123     // indicates that a handler holds the lock.
00124     handler_base* current_handler_;
00125 
00126     // The start of the list of waiting handlers for the strand.
00127     handler_base* first_waiter_;
00128     
00129     // The end of the list of waiting handlers for the strand.
00130     handler_base* last_waiter_;
00131 
00132     // Storage for posted handlers.
00133     typedef boost::aligned_storage<128> handler_storage_type;
00134 #if defined(__BORLANDC__)
00135     boost::aligned_storage<128> handler_storage_;
00136 #else
00137     handler_storage_type handler_storage_;
00138 #endif
00139 
00140     // Pointers to adjacent socket implementations in linked list.
00141     strand_impl* next_;
00142     strand_impl* prev_;
00143 
00144     // The reference count on the strand implementation.
00145     boost::detail::atomic_count ref_count_;
00146 
00147 #if !defined(__BORLANDC__)
00148     friend void intrusive_ptr_add_ref(strand_impl* p)
00149     {
00150       p->add_ref();
00151     }
00152 
00153     friend void intrusive_ptr_release(strand_impl* p)
00154     {
00155       p->release();
00156     }
00157 #endif
00158   };
00159 
00160   friend class strand_impl;
00161 
00162   typedef boost::intrusive_ptr<strand_impl> implementation_type;
00163 
00164   // Base class for all handler types.
00165   class handler_base
00166   {
00167   public:
00168     typedef void (*invoke_func_type)(handler_base*,
00169         strand_service&, implementation_type&);
00170     typedef void (*destroy_func_type)(handler_base*);
00171 
00172     handler_base(invoke_func_type invoke_func, destroy_func_type destroy_func)
00173       : next_(0),
00174         invoke_func_(invoke_func),
00175         destroy_func_(destroy_func)
00176     {
00177     }
00178 
00179     void invoke(strand_service& service_impl, implementation_type& impl)
00180     {
00181       invoke_func_(this, service_impl, impl);
00182     }
00183 
00184     void destroy()
00185     {
00186       destroy_func_(this);
00187     }
00188 
00189   protected:
00190     ~handler_base()
00191     {
00192     }
00193 
00194   private:
00195     friend class strand_service;
00196     friend class strand_impl;
00197     friend class post_next_waiter_on_exit;
00198     handler_base* next_;
00199     invoke_func_type invoke_func_;
00200     destroy_func_type destroy_func_;
00201   };
00202 
00203   // Helper class to allow handlers to be dispatched.
00204   class invoke_current_handler
00205   {
00206   public:
00207     invoke_current_handler(strand_service& service_impl,
00208         const implementation_type& impl)
00209       : service_impl_(service_impl),
00210         impl_(impl)
00211     {
00212     }
00213 
00214     void operator()()
00215     {
00216       impl_->current_handler_->invoke(service_impl_, impl_);
00217     }
00218 
00219     friend void* asio_handler_allocate(std::size_t size,
00220         invoke_current_handler* this_handler)
00221     {
00222       return this_handler->do_handler_allocate(size);
00223     }
00224 
00225     friend void asio_handler_deallocate(void*, std::size_t,
00226         invoke_current_handler*)
00227     {
00228     }
00229 
00230     void* do_handler_allocate(std::size_t size)
00231     {
00232 #if defined(__BORLANDC__)
00233       BOOST_ASSERT(size <= boost::aligned_storage<128>::size);
00234 #else
00235       BOOST_ASSERT(size <= strand_impl::handler_storage_type::size);
00236 #endif
00237       (void)size;
00238       return impl_->handler_storage_.address();
00239     }
00240 
00241     // The asio_handler_invoke hook is not defined here since the default one
00242     // provides the correct behaviour, and including it here breaks MSVC 7.1
00243     // in some situations.
00244 
00245   private:
00246     strand_service& service_impl_;
00247     implementation_type impl_;
00248   };
00249 
00250   // Helper class to automatically enqueue next waiter on block exit.
00251   class post_next_waiter_on_exit
00252   {
00253   public:
00254     post_next_waiter_on_exit(strand_service& service_impl,
00255         implementation_type& impl)
00256       : service_impl_(service_impl),
00257         impl_(impl),
00258         cancelled_(false)
00259     {
00260     }
00261 
00262     ~post_next_waiter_on_exit()
00263     {
00264       if (!cancelled_)
00265       {
00266         asio::detail::mutex::scoped_lock lock(impl_->mutex_);
00267         impl_->current_handler_ = impl_->first_waiter_;
00268         if (impl_->current_handler_)
00269         {
00270           impl_->first_waiter_ = impl_->first_waiter_->next_;
00271           if (impl_->first_waiter_ == 0)
00272             impl_->last_waiter_ = 0;
00273           lock.unlock();
00274           service_impl_.get_io_service().post(
00275               invoke_current_handler(service_impl_, impl_));
00276         }
00277       }
00278     }
00279 
00280     void cancel()
00281     {
00282       cancelled_ = true;
00283     }
00284 
00285   private:
00286     strand_service& service_impl_;
00287     implementation_type& impl_;
00288     bool cancelled_;
00289   };
00290 
00291   // Class template for a waiter.
00292   template <typename Handler>
00293   class handler_wrapper
00294     : public handler_base
00295   {
00296   public:
00297     handler_wrapper(Handler handler)
00298       : handler_base(&handler_wrapper<Handler>::do_invoke,
00299           &handler_wrapper<Handler>::do_destroy),
00300         handler_(handler)
00301     {
00302     }
00303 
00304     static void do_invoke(handler_base* base,
00305         strand_service& service_impl, implementation_type& impl)
00306     {
00307       // Take ownership of the handler object.
00308       typedef handler_wrapper<Handler> this_type;
00309       this_type* h(static_cast<this_type*>(base));
00310       typedef handler_alloc_traits<Handler, this_type> alloc_traits;
00311       handler_ptr<alloc_traits> ptr(h->handler_, h);
00312 
00313       post_next_waiter_on_exit p1(service_impl, impl);
00314 
00315       // Make a copy of the handler so that the memory can be deallocated before
00316       // the upcall is made.
00317       Handler handler(h->handler_);
00318 
00319       // A handler object must still be valid when the next waiter is posted
00320       // since destroying the last handler might cause the strand object to be
00321       // destroyed. Therefore we create a second post_next_waiter_on_exit object
00322       // that will be destroyed before the handler object.
00323       p1.cancel();
00324       post_next_waiter_on_exit p2(service_impl, impl);
00325 
00326       // Free the memory associated with the handler.
00327       ptr.reset();
00328 
00329       // Indicate that this strand is executing on the current thread.
00330       call_stack<strand_impl>::context ctx(impl.get());
00331 
00332       // Make the upcall.
00333       asio_handler_invoke_helpers::invoke(handler, &handler);
00334     }
00335 
00336     static void do_destroy(handler_base* base)
00337     {
00338       // Take ownership of the handler object.
00339       typedef handler_wrapper<Handler> this_type;
00340       this_type* h(static_cast<this_type*>(base));
00341       typedef handler_alloc_traits<Handler, this_type> alloc_traits;
00342       handler_ptr<alloc_traits> ptr(h->handler_, h);
00343 
00344       // A sub-object of the handler may be the true owner of the memory
00345       // associated with the handler. Consequently, a local copy of the handler
00346       // is required to ensure that any owning sub-object remains valid until
00347       // after we have deallocated the memory here.
00348       Handler handler(h->handler_);
00349       (void)handler;
00350 
00351       // Free the memory associated with the handler.
00352       ptr.reset();
00353     }
00354 
00355   private:
00356     Handler handler_;
00357   };
00358 
00359   // Construct a new strand service for the specified io_service.
00360   explicit strand_service(asio::io_service& io_service)
00361     : asio::detail::service_base<strand_service>(io_service),
00362       mutex_(),
00363       impl_list_(0)
00364   {
00365   }
00366 
00367   // Destroy all user-defined handler objects owned by the service.
00368   void shutdown_service()
00369   {
00370     // Construct a list of all handlers to be destroyed.
00371     asio::detail::mutex::scoped_lock lock(mutex_);
00372     strand_impl* impl = impl_list_;
00373     handler_base* first_handler = 0;
00374     while (impl)
00375     {
00376       if (impl->current_handler_)
00377       {
00378         impl->current_handler_->next_ = first_handler;
00379         first_handler = impl->current_handler_;
00380         impl->current_handler_ = 0;
00381       }
00382       if (impl->first_waiter_)
00383       {
00384         impl->last_waiter_->next_ = first_handler;
00385         first_handler = impl->first_waiter_;
00386         impl->first_waiter_ = 0;
00387         impl->last_waiter_ = 0;
00388       }
00389       impl = impl->next_;
00390     }
00391 
00392     // Destroy all handlers without holding the lock.
00393     lock.unlock();
00394     while (first_handler)
00395     {
00396       handler_base* next = first_handler->next_;
00397       first_handler->destroy();
00398       first_handler = next;
00399     }
00400   }
00401 
00402   // Construct a new strand implementation.
00403   void construct(implementation_type& impl)
00404   {
00405     impl = implementation_type(new strand_impl(*this));
00406   }
00407 
00408   // Destroy a strand implementation.
00409   void destroy(implementation_type& impl)
00410   {
00411     implementation_type().swap(impl);
00412   }
00413 
00414   // Request the io_service to invoke the given handler.
00415   template <typename Handler>
00416   void dispatch(implementation_type& impl, Handler handler)
00417   {
00418     if (call_stack<strand_impl>::contains(impl.get()))
00419     {
00420       asio_handler_invoke_helpers::invoke(handler, &handler);
00421     }
00422     else
00423     {
00424       // Allocate and construct an object to wrap the handler.
00425       typedef handler_wrapper<Handler> value_type;
00426       typedef handler_alloc_traits<Handler, value_type> alloc_traits;
00427       raw_handler_ptr<alloc_traits> raw_ptr(handler);
00428       handler_ptr<alloc_traits> ptr(raw_ptr, handler);
00429 
00430       asio::detail::mutex::scoped_lock lock(impl->mutex_);
00431 
00432       if (impl->current_handler_ == 0)
00433       {
00434         // This handler now has the lock, so can be dispatched immediately.
00435         impl->current_handler_ = ptr.release();
00436         lock.unlock();
00437         this->get_io_service().dispatch(invoke_current_handler(*this, impl));
00438       }
00439       else
00440       {
00441         // Another handler already holds the lock, so this handler must join
00442         // the list of waiters. The handler will be posted automatically when
00443         // its turn comes.
00444         if (impl->last_waiter_)
00445         {
00446           impl->last_waiter_->next_ = ptr.get();
00447           impl->last_waiter_ = impl->last_waiter_->next_;
00448         }
00449         else
00450         {
00451           impl->first_waiter_ = ptr.get();
00452           impl->last_waiter_ = ptr.get();
00453         }
00454         ptr.release();
00455       }
00456     }
00457   }
00458 
00459   // Request the io_service to invoke the given handler and return immediately.
00460   template <typename Handler>
00461   void post(implementation_type& impl, Handler handler)
00462   {
00463     // Allocate and construct an object to wrap the handler.
00464     typedef handler_wrapper<Handler> value_type;
00465     typedef handler_alloc_traits<Handler, value_type> alloc_traits;
00466     raw_handler_ptr<alloc_traits> raw_ptr(handler);
00467     handler_ptr<alloc_traits> ptr(raw_ptr, handler);
00468 
00469     asio::detail::mutex::scoped_lock lock(impl->mutex_);
00470 
00471     if (impl->current_handler_ == 0)
00472     {
00473       // This handler now has the lock, so can be dispatched immediately.
00474       impl->current_handler_ = ptr.release();
00475       lock.unlock();
00476       this->get_io_service().post(invoke_current_handler(*this, impl));
00477     }
00478     else
00479     {
00480       // Another handler already holds the lock, so this handler must join the
00481       // list of waiters. The handler will be posted automatically when its turn
00482       // comes.
00483       if (impl->last_waiter_)
00484       {
00485         impl->last_waiter_->next_ = ptr.get();
00486         impl->last_waiter_ = impl->last_waiter_->next_;
00487       }
00488       else
00489       {
00490         impl->first_waiter_ = ptr.get();
00491         impl->last_waiter_ = ptr.get();
00492       }
00493       ptr.release();
00494     }
00495   }
00496 
00497 private:
00498   // Mutex to protect access to the linked list of implementations. 
00499   asio::detail::mutex mutex_;
00500 
00501   // The head of a linked list of all implementations.
00502   strand_impl* impl_list_;
00503 };
00504 
00505 } // namespace detail
00506 } // namespace asio
00507 
00508 #if defined(__BORLANDC__)
00509 
00510 namespace boost {
00511 
00512 inline void intrusive_ptr_add_ref(
00513     asio::detail::strand_service::strand_impl* p)
00514 {
00515   p->add_ref();
00516 }
00517 
00518 inline void intrusive_ptr_release(
00519     asio::detail::strand_service::strand_impl* p)
00520 {
00521   p->release();
00522 }
00523 
00524 } // namespace boost
00525 
00526 #endif // defined(__BORLANDC__)
00527 
00528 #include "asio/detail/pop_options.hpp"
00529 
00530 #endif // ASIO_DETAIL_STRAND_SERVICE_HPP
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines


Castor
Author(s): Carpe Noctem
autogenerated on Fri Nov 8 2013 11:05:39