task_io_service_2lock.hpp
Go to the documentation of this file.
00001 //
00002 // task_io_service_2lock.hpp
00003 // ~~~~~~~~~~~~~~~~~~~~~~~~~
00004 //
00005 // Copyright (c) 2003-2008 Christopher M. Kohlhoff (chris at kohlhoff dot com)
00006 //
00007 // Distributed under the Boost Software License, Version 1.0. (See accompanying
00008 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
00009 //
00010 
00011 #ifndef ASIO_DETAIL_TASK_IO_SERVICE_2LOCK_HPP
00012 #define ASIO_DETAIL_TASK_IO_SERVICE_2LOCK_HPP
00013 
00014 #if defined(_MSC_VER) && (_MSC_VER >= 1200)
00015 # pragma once
00016 #endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
00017 
00018 #include "asio/detail/push_options.hpp"
00019 
00020 #include "asio/error_code.hpp"
00021 #include "asio/io_service.hpp"
00022 #include "asio/detail/call_stack.hpp"
00023 #include "asio/detail/event.hpp"
00024 #include "asio/detail/handler_alloc_helpers.hpp"
00025 #include "asio/detail/handler_invoke_helpers.hpp"
00026 #include "asio/detail/indirect_handler_queue.hpp"
00027 #include "asio/detail/mutex.hpp"
00028 #include "asio/detail/service_base.hpp"
00029 #include "asio/detail/task_io_service_fwd.hpp"
00030 
00031 #include "asio/detail/push_options.hpp"
00032 #include <boost/detail/atomic_count.hpp>
00033 #include "asio/detail/pop_options.hpp"
00034 
00035 namespace asio {
00036 namespace detail {
00037 
00038 // An alternative task_io_service implementation based on a two-lock queue.
00039 
00040 template <typename Task>
00041 class task_io_service
00042   : public asio::detail::service_base<task_io_service<Task> >
00043 {
00044 public:
00045   typedef indirect_handler_queue handler_queue;
00046 
00047   // Constructor.
00048   task_io_service(asio::io_service& io_service)
00049     : asio::detail::service_base<task_io_service<Task> >(io_service),
00050       front_mutex_(),
00051       back_mutex_(),
00052       task_(use_service<Task>(io_service)),
00053       outstanding_work_(0),
00054       front_stopped_(false),
00055       back_stopped_(false),
00056       back_shutdown_(false),
00057       back_first_idle_thread_(0),
00058       back_task_thread_(0)
00059   {
00060     handler_queue_.push(&task_handler_);
00061   }
00062 
00063   void init(size_t /*concurrency_hint*/)
00064   {
00065   }
00066 
00067   // Destroy all user-defined handler objects owned by the service.
00068   void shutdown_service()
00069   {
00070     asio::detail::mutex::scoped_lock back_lock(back_mutex_);
00071     back_shutdown_ = true;
00072     back_lock.unlock();
00073 
00074     // Destroy handler objects.
00075     while (handler_queue::handler* h = handler_queue_.pop())
00076       if (h != &task_handler_)
00077         h->destroy();
00078 
00079     // Reset handler queue to initial state.
00080     handler_queue_.push(&task_handler_);
00081   }
00082 
00083   // Run the event loop until interrupted or no more work.
00084   size_t run(asio::error_code& ec)
00085   {
00086     if (outstanding_work_ == 0)
00087     {
00088       stop();
00089       ec = asio::error_code();
00090       return 0;
00091     }
00092 
00093     typename call_stack<task_io_service>::context ctx(this);
00094 
00095     idle_thread_info this_idle_thread;
00096     this_idle_thread.next = 0;
00097 
00098     size_t n = 0;
00099     while (do_one(&this_idle_thread, ec))
00100       if (n != (std::numeric_limits<size_t>::max)())
00101         ++n;
00102     return n;
00103   }
00104 
00105   // Run until interrupted or one operation is performed.
00106   size_t run_one(asio::error_code& ec)
00107   {
00108     if (outstanding_work_ == 0)
00109     {
00110       stop();
00111       ec = asio::error_code();
00112       return 0;
00113     }
00114 
00115     typename call_stack<task_io_service>::context ctx(this);
00116 
00117     idle_thread_info this_idle_thread;
00118     this_idle_thread.next = 0;
00119 
00120     return do_one(&this_idle_thread, ec);
00121   }
00122 
00123   // Poll for operations without blocking.
00124   size_t poll(asio::error_code& ec)
00125   {
00126     if (outstanding_work_ == 0)
00127     {
00128       stop();
00129       ec = asio::error_code();
00130       return 0;
00131     }
00132 
00133     typename call_stack<task_io_service>::context ctx(this);
00134 
00135     size_t n = 0;
00136     while (do_one(0, ec))
00137       if (n != (std::numeric_limits<size_t>::max)())
00138         ++n;
00139     return n;
00140   }
00141 
00142   // Poll for one operation without blocking.
00143   size_t poll_one(asio::error_code& ec)
00144   {
00145     if (outstanding_work_ == 0)
00146     {
00147       stop();
00148       ec = asio::error_code();
00149       return 0;
00150     }
00151 
00152     typename call_stack<task_io_service>::context ctx(this);
00153 
00154     return do_one(0, ec);
00155   }
00156 
00157   // Interrupt the event processing loop.
00158   void stop()
00159   {
00160     asio::detail::mutex::scoped_lock front_lock(front_mutex_);
00161     front_stopped_ = true;
00162     front_lock.unlock();
00163 
00164     asio::detail::mutex::scoped_lock back_lock(back_mutex_);
00165     back_stopped_ = true;
00166     interrupt_all_idle_threads(back_lock);
00167   }
00168 
00169   // Reset in preparation for a subsequent run invocation.
00170   void reset()
00171   {
00172     asio::detail::mutex::scoped_lock front_lock(front_mutex_);
00173     front_stopped_ = false;
00174     front_lock.unlock();
00175 
00176     asio::detail::mutex::scoped_lock back_lock(back_mutex_);
00177     back_stopped_ = false;
00178   }
00179 
00180   // Notify that some work has started.
00181   void work_started()
00182   {
00183     ++outstanding_work_;
00184   }
00185 
00186   // Notify that some work has finished.
00187   void work_finished()
00188   {
00189     if (--outstanding_work_ == 0)
00190       stop();
00191   }
00192 
00193   // Request invocation of the given handler.
00194   template <typename Handler>
00195   void dispatch(Handler handler)
00196   {
00197     if (call_stack<task_io_service>::contains(this))
00198       asio_handler_invoke_helpers::invoke(handler, &handler);
00199     else
00200       post(handler);
00201   }
00202 
00203   // Request invocation of the given handler and return immediately.
00204   template <typename Handler>
00205   void post(Handler handler)
00206   {
00207     // Allocate and construct an operation to wrap the handler.
00208     handler_queue::scoped_ptr ptr(handler_queue::wrap(handler));
00209 
00210     asio::detail::mutex::scoped_lock back_lock(back_mutex_);
00211 
00212     // If the service has been shut down we silently discard the handler.
00213     if (back_shutdown_)
00214       return;
00215 
00216     // Add the handler to the end of the queue.
00217     handler_queue_.push(ptr.get());
00218     ptr.release();
00219 
00220     // An undelivered handler is treated as unfinished work.
00221     ++outstanding_work_;
00222 
00223     // Wake up a thread to execute the handler.
00224     interrupt_one_idle_thread(back_lock);
00225   }
00226 
00227 private:
00228   struct idle_thread_info;
00229 
00230   size_t do_one(idle_thread_info* this_idle_thread,
00231       asio::error_code& ec)
00232   {
00233     bool task_has_run = false;
00234     for (;;)
00235     {
00236       // The front lock must be held before we can pop items from the queue.
00237       asio::detail::mutex::scoped_lock front_lock(front_mutex_);
00238       if (front_stopped_)
00239       {
00240         ec = asio::error_code();
00241         return 0;
00242       }
00243 
00244       if (handler_queue::handler* h = handler_queue_.pop())
00245       {
00246         if (h == &task_handler_)
00247         {
00248           bool more_handlers = handler_queue_.poppable();
00249           unsigned long front_version = handler_queue_.front_version();
00250           front_lock.unlock();
00251 
00252           // The task is always added to the back of the queue when we exit
00253           // this block.
00254           task_cleanup c(*this);
00255 
00256           // If we're polling and the task has already run then we're done.
00257           bool polling = !this_idle_thread;
00258           if (task_has_run && polling)
00259           {
00260             ec = asio::error_code();
00261             return 0;
00262           }
00263 
00264           // If we're considering going idle we need to check whether the queue
00265           // is still empty. If it is, add the thread to the list of idle
00266           // threads.
00267           if (!more_handlers && !polling)
00268           {
00269             asio::detail::mutex::scoped_lock back_lock(back_mutex_);
00270             if (back_stopped_)
00271             {
00272               ec = asio::error_code();
00273               return 0;
00274             }
00275             else if (front_version == handler_queue_.back_version())
00276             {
00277               back_task_thread_ = this_idle_thread;
00278             }
00279             else
00280             {
00281               more_handlers = true;
00282             }
00283           }
00284 
00285           // Run the task. May throw an exception. Only block if the handler
00286           // queue is empty and we're not polling, otherwise we want to return
00287           // as soon as possible.
00288           task_has_run = true;
00289           task_.run(!more_handlers && !polling);
00290         }
00291         else
00292         {
00293           front_lock.unlock();
00294           handler_cleanup c(*this);
00295 
00296           // Invoke the handler. May throw an exception.
00297           h->invoke(); // invoke() deletes the handler object
00298 
00299           ec = asio::error_code();
00300           return 1;
00301         }
00302       }
00303       else if (this_idle_thread)
00304       {
00305         unsigned long front_version = handler_queue_.front_version();
00306         front_lock.unlock();
00307 
00308         // If we're considering going idle we need to check whether the queue
00309         // is still empty. If it is, add the thread to the list of idle
00310         // threads.
00311         asio::detail::mutex::scoped_lock back_lock(back_mutex_);
00312         if (back_stopped_)
00313         {
00314           ec = asio::error_code();
00315           return 0;
00316         }
00317         else if (front_version == handler_queue_.back_version())
00318         {
00319           this_idle_thread->next = back_first_idle_thread_;
00320           back_first_idle_thread_ = this_idle_thread;
00321           this_idle_thread->wakeup_event.clear(back_lock);
00322           this_idle_thread->wakeup_event.wait(back_lock);
00323         }
00324       }
00325       else
00326       {
00327         ec = asio::error_code();
00328         return 0;
00329       }
00330     }
00331   }
00332 
00333   // Interrupt a single idle thread.
00334   void interrupt_one_idle_thread(
00335       asio::detail::mutex::scoped_lock& back_lock)
00336   {
00337     if (back_first_idle_thread_)
00338     {
00339       idle_thread_info* idle_thread = back_first_idle_thread_;
00340       back_first_idle_thread_ = idle_thread->next;
00341       idle_thread->next = 0;
00342       idle_thread->wakeup_event.signal(back_lock);
00343     }
00344     else if (back_task_thread_)
00345     {
00346       back_task_thread_ = 0;
00347       task_.interrupt();
00348     }
00349   }
00350 
00351   // Interrupt all idle threads.
00352   void interrupt_all_idle_threads(
00353       asio::detail::mutex::scoped_lock& back_lock)
00354   {
00355     while (back_first_idle_thread_)
00356     {
00357       idle_thread_info* idle_thread = back_first_idle_thread_;
00358       back_first_idle_thread_ = idle_thread->next;
00359       idle_thread->next = 0;
00360       idle_thread->wakeup_event.signal(back_lock);
00361     }
00362 
00363     if (back_task_thread_)
00364     {
00365       back_task_thread_ = 0;
00366       task_.interrupt();
00367     }
00368   }
00369 
00370   // Helper class to perform task-related operations on block exit.
00371   class task_cleanup;
00372   friend class task_cleanup;
00373   class task_cleanup
00374   {
00375   public:
00376     task_cleanup(task_io_service& task_io_svc)
00377       : task_io_service_(task_io_svc)
00378     {
00379     }
00380 
00381     ~task_cleanup()
00382     {
00383       // Reinsert the task at the end of the handler queue.
00384       asio::detail::mutex::scoped_lock back_lock(
00385           task_io_service_.back_mutex_);
00386       task_io_service_.back_task_thread_ = 0;
00387       task_io_service_.handler_queue_.push(&task_io_service_.task_handler_);
00388     }
00389 
00390   private:
00391     task_io_service& task_io_service_;
00392   };
00393 
00394   // Helper class to perform handler-related operations on block exit.
00395   class handler_cleanup
00396   {
00397   public:
00398     handler_cleanup(task_io_service& task_io_svc)
00399       : task_io_service_(task_io_svc)
00400     {
00401     }
00402 
00403     ~handler_cleanup()
00404     {
00405       task_io_service_.work_finished();
00406     }
00407 
00408   private:
00409     task_io_service& task_io_service_;
00410   };
00411 
00412   // Mutexes to protect access to internal data.
00413   asio::detail::mutex front_mutex_;
00414   asio::detail::mutex back_mutex_;
00415 
00416   // The task to be run by this service.
00417   Task& task_;
00418 
00419   // Handler object to represent the position of the task in the queue.
00420   class task_handler
00421     : public handler_queue::handler
00422   {
00423   public:
00424     task_handler()
00425       : handler_queue::handler(0, 0)
00426     {
00427     }
00428   } task_handler_;
00429 
00430   // The count of unfinished work.
00431   boost::detail::atomic_count outstanding_work_;
00432 
00433   // The queue of handlers that are ready to be delivered.
00434   handler_queue handler_queue_;
00435 
00436   // Flag to indicate that the dispatcher has been stopped.
00437   bool front_stopped_;
00438   bool back_stopped_;
00439 
00440   // Flag to indicate that the dispatcher has been shut down.
00441   bool back_shutdown_;
00442 
00443   // Structure containing information about an idle thread.
00444   struct idle_thread_info
00445   {
00446     event wakeup_event;
00447     idle_thread_info* next;
00448   };
00449 
00450   // The number of threads that are currently idle.
00451   idle_thread_info* back_first_idle_thread_;
00452 
00453   // The thread that is currently blocked on the task.
00454   idle_thread_info* back_task_thread_;
00455 };
00456 
00457 } // namespace detail
00458 } // namespace asio
00459 
00460 #include "asio/detail/pop_options.hpp"
00461 
00462 #endif // ASIO_DETAIL_TASK_IO_SERVICE_2LOCK_HPP
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines


Castor
Author(s): Carpe Noctem
autogenerated on Fri Nov 8 2013 11:05:39