task_io_service.hpp
Go to the documentation of this file.
00001 //
00002 // task_io_service.hpp
00003 // ~~~~~~~~~~~~~~~~~~~
00004 //
00005 // Copyright (c) 2003-2008 Christopher M. Kohlhoff (chris at kohlhoff dot com)
00006 //
00007 // Distributed under the Boost Software License, Version 1.0. (See accompanying
00008 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
00009 //
00010 
00011 #ifndef ASIO_DETAIL_TASK_IO_SERVICE_HPP
00012 #define ASIO_DETAIL_TASK_IO_SERVICE_HPP
00013 
00014 #if defined(_MSC_VER) && (_MSC_VER >= 1200)
00015 # pragma once
00016 #endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
00017 
00018 #if defined(ASIO_ENABLE_TWO_LOCK_QUEUE)
00019 #include "asio/detail/task_io_service_2lock.hpp"
00020 #else // defined(ASIO_ENABLE_TWO_LOCK_QUEUE)
00021 
00022 #include "asio/detail/push_options.hpp"
00023 
00024 #include "asio/error_code.hpp"
00025 #include "asio/io_service.hpp"
00026 #include "asio/detail/call_stack.hpp"
00027 #include "asio/detail/event.hpp"
00028 #include "asio/detail/handler_alloc_helpers.hpp"
00029 #include "asio/detail/handler_invoke_helpers.hpp"
00030 #include "asio/detail/handler_queue.hpp"
00031 #include "asio/detail/mutex.hpp"
00032 #include "asio/detail/service_base.hpp"
00033 #include "asio/detail/task_io_service_fwd.hpp"
00034 
00035 namespace asio {
00036 namespace detail {
00037 
00038 template <typename Task>
00039 class task_io_service
00040   : public asio::detail::service_base<task_io_service<Task> >
00041 {
00042 public:
00043   // Constructor.
00044   task_io_service(asio::io_service& io_service)
00045     : asio::detail::service_base<task_io_service<Task> >(io_service),
00046       mutex_(),
00047       task_(use_service<Task>(io_service)),
00048       task_interrupted_(true),
00049       outstanding_work_(0),
00050       stopped_(false),
00051       shutdown_(false),
00052       first_idle_thread_(0)
00053   {
00054     handler_queue_.push(&task_handler_);
00055   }
00056 
00057   void init(size_t /*concurrency_hint*/)
00058   {
00059   }
00060 
00061   // Destroy all user-defined handler objects owned by the service.
00062   void shutdown_service()
00063   {
00064     asio::detail::mutex::scoped_lock lock(mutex_);
00065     shutdown_ = true;
00066     lock.unlock();
00067 
00068     // Destroy handler objects.
00069     while (!handler_queue_.empty())
00070     {
00071       handler_queue::handler* h = handler_queue_.front();
00072       handler_queue_.pop();
00073       if (h != &task_handler_)
00074         h->destroy();
00075     }
00076 
00077     // Reset handler queue to initial state.
00078     handler_queue_.push(&task_handler_);
00079   }
00080 
00081   // Run the event loop until interrupted or no more work.
00082   size_t run(asio::error_code& ec)
00083   {
00084     typename call_stack<task_io_service>::context ctx(this);
00085 
00086     idle_thread_info this_idle_thread;
00087     this_idle_thread.next = 0;
00088 
00089     asio::detail::mutex::scoped_lock lock(mutex_);
00090 
00091     size_t n = 0;
00092     while (do_one(lock, &this_idle_thread, ec))
00093       if (n != (std::numeric_limits<size_t>::max)())
00094         ++n;
00095     return n;
00096   }
00097 
00098   // Run until interrupted or one operation is performed.
00099   size_t run_one(asio::error_code& ec)
00100   {
00101     typename call_stack<task_io_service>::context ctx(this);
00102 
00103     idle_thread_info this_idle_thread;
00104     this_idle_thread.next = 0;
00105 
00106     asio::detail::mutex::scoped_lock lock(mutex_);
00107 
00108     return do_one(lock, &this_idle_thread, ec);
00109   }
00110 
00111   // Poll for operations without blocking.
00112   size_t poll(asio::error_code& ec)
00113   {
00114     typename call_stack<task_io_service>::context ctx(this);
00115 
00116     asio::detail::mutex::scoped_lock lock(mutex_);
00117 
00118     size_t n = 0;
00119     while (do_one(lock, 0, ec))
00120       if (n != (std::numeric_limits<size_t>::max)())
00121         ++n;
00122     return n;
00123   }
00124 
00125   // Poll for one operation without blocking.
00126   size_t poll_one(asio::error_code& ec)
00127   {
00128     typename call_stack<task_io_service>::context ctx(this);
00129 
00130     asio::detail::mutex::scoped_lock lock(mutex_);
00131 
00132     return do_one(lock, 0, ec);
00133   }
00134 
00135   // Interrupt the event processing loop.
00136   void stop()
00137   {
00138     asio::detail::mutex::scoped_lock lock(mutex_);
00139     stop_all_threads(lock);
00140   }
00141 
00142   // Reset in preparation for a subsequent run invocation.
00143   void reset()
00144   {
00145     asio::detail::mutex::scoped_lock lock(mutex_);
00146     stopped_ = false;
00147   }
00148 
00149   // Notify that some work has started.
00150   void work_started()
00151   {
00152     asio::detail::mutex::scoped_lock lock(mutex_);
00153     ++outstanding_work_;
00154   }
00155 
00156   // Notify that some work has finished.
00157   void work_finished()
00158   {
00159     asio::detail::mutex::scoped_lock lock(mutex_);
00160     if (--outstanding_work_ == 0)
00161       stop_all_threads(lock);
00162   }
00163 
00164   // Request invocation of the given handler.
00165   template <typename Handler>
00166   void dispatch(Handler handler)
00167   {
00168     if (call_stack<task_io_service>::contains(this))
00169       asio_handler_invoke_helpers::invoke(handler, &handler);
00170     else
00171       post(handler);
00172   }
00173 
00174   // Request invocation of the given handler and return immediately.
00175   template <typename Handler>
00176   void post(Handler handler)
00177   {
00178     // Allocate and construct an operation to wrap the handler.
00179     handler_queue::scoped_ptr ptr(handler_queue::wrap(handler));
00180 
00181     asio::detail::mutex::scoped_lock lock(mutex_);
00182 
00183     // If the service has been shut down we silently discard the handler.
00184     if (shutdown_)
00185       return;
00186 
00187     // Add the handler to the end of the queue.
00188     handler_queue_.push(ptr.get());
00189     ptr.release();
00190 
00191     // An undelivered handler is treated as unfinished work.
00192     ++outstanding_work_;
00193 
00194     // Wake up a thread to execute the handler.
00195     if (!interrupt_one_idle_thread(lock))
00196     {
00197       if (!task_interrupted_)
00198       {
00199         task_interrupted_ = true;
00200         task_.interrupt();
00201       }
00202     }
00203   }
00204 
00205 private:
00206   struct idle_thread_info;
00207 
00208   size_t do_one(asio::detail::mutex::scoped_lock& lock,
00209       idle_thread_info* this_idle_thread, asio::error_code& ec)
00210   {
00211     if (outstanding_work_ == 0 && !stopped_)
00212     {
00213       stop_all_threads(lock);
00214       ec = asio::error_code();
00215       return 0;
00216     }
00217 
00218     bool polling = !this_idle_thread;
00219     bool task_has_run = false;
00220     while (!stopped_)
00221     {
00222       if (!handler_queue_.empty())
00223       {
00224         // Prepare to execute first handler from queue.
00225         handler_queue::handler* h = handler_queue_.front();
00226         handler_queue_.pop();
00227 
00228         if (h == &task_handler_)
00229         {
00230           bool more_handlers = (!handler_queue_.empty());
00231           task_interrupted_ = more_handlers || polling;
00232 
00233           // If the task has already run and we're polling then we're done.
00234           if (task_has_run && polling)
00235           {
00236             task_interrupted_ = true;
00237             handler_queue_.push(&task_handler_);
00238             ec = asio::error_code();
00239             return 0;
00240           }
00241           task_has_run = true;
00242 
00243           lock.unlock();
00244           task_cleanup c(lock, *this);
00245 
00246           // Run the task. May throw an exception. Only block if the handler
00247           // queue is empty and we have an idle_thread_info object, otherwise
00248           // we want to return as soon as possible.
00249           task_.run(!more_handlers && !polling);
00250         }
00251         else
00252         {
00253           lock.unlock();
00254           handler_cleanup c(lock, *this);
00255 
00256           // Invoke the handler. May throw an exception.
00257           h->invoke(); // invoke() deletes the handler object
00258 
00259           ec = asio::error_code();
00260           return 1;
00261         }
00262       }
00263       else if (this_idle_thread)
00264       {
00265         // Nothing to run right now, so just wait for work to do.
00266         this_idle_thread->next = first_idle_thread_;
00267         first_idle_thread_ = this_idle_thread;
00268         this_idle_thread->wakeup_event.clear(lock);
00269         this_idle_thread->wakeup_event.wait(lock);
00270       }
00271       else
00272       {
00273         ec = asio::error_code();
00274         return 0;
00275       }
00276     }
00277 
00278     ec = asio::error_code();
00279     return 0;
00280   }
00281 
00282   // Stop the task and all idle threads.
00283   void stop_all_threads(
00284       asio::detail::mutex::scoped_lock& lock)
00285   {
00286     stopped_ = true;
00287     interrupt_all_idle_threads(lock);
00288     if (!task_interrupted_)
00289     {
00290       task_interrupted_ = true;
00291       task_.interrupt();
00292     }
00293   }
00294 
00295   // Interrupt a single idle thread. Returns true if a thread was interrupted,
00296   // false if no running thread could be found to interrupt.
00297   bool interrupt_one_idle_thread(
00298       asio::detail::mutex::scoped_lock& lock)
00299   {
00300     if (first_idle_thread_)
00301     {
00302       idle_thread_info* idle_thread = first_idle_thread_;
00303       first_idle_thread_ = idle_thread->next;
00304       idle_thread->next = 0;
00305       idle_thread->wakeup_event.signal(lock);
00306       return true;
00307     }
00308     return false;
00309   }
00310 
00311   // Interrupt all idle threads.
00312   void interrupt_all_idle_threads(
00313       asio::detail::mutex::scoped_lock& lock)
00314   {
00315     while (first_idle_thread_)
00316     {
00317       idle_thread_info* idle_thread = first_idle_thread_;
00318       first_idle_thread_ = idle_thread->next;
00319       idle_thread->next = 0;
00320       idle_thread->wakeup_event.signal(lock);
00321     }
00322   }
00323 
00324   // Helper class to perform task-related operations on block exit.
00325   class task_cleanup;
00326   friend class task_cleanup;
00327   class task_cleanup
00328   {
00329   public:
00330     task_cleanup(asio::detail::mutex::scoped_lock& lock,
00331         task_io_service& task_io_svc)
00332       : lock_(lock),
00333         task_io_service_(task_io_svc)
00334     {
00335     }
00336 
00337     ~task_cleanup()
00338     {
00339       // Reinsert the task at the end of the handler queue.
00340       lock_.lock();
00341       task_io_service_.task_interrupted_ = true;
00342       task_io_service_.handler_queue_.push(&task_io_service_.task_handler_);
00343     }
00344 
00345   private:
00346     asio::detail::mutex::scoped_lock& lock_;
00347     task_io_service& task_io_service_;
00348   };
00349 
00350   // Helper class to perform handler-related operations on block exit.
00351   class handler_cleanup;
00352   friend class handler_cleanup;
00353   class handler_cleanup
00354   {
00355   public:
00356     handler_cleanup(asio::detail::mutex::scoped_lock& lock,
00357         task_io_service& task_io_svc)
00358       : lock_(lock),
00359         task_io_service_(task_io_svc)
00360     {
00361     }
00362 
00363     ~handler_cleanup()
00364     {
00365       lock_.lock();
00366       if (--task_io_service_.outstanding_work_ == 0)
00367         task_io_service_.stop_all_threads(lock_);
00368     }
00369 
00370   private:
00371     asio::detail::mutex::scoped_lock& lock_;
00372     task_io_service& task_io_service_;
00373   };
00374 
00375   // Mutex to protect access to internal data.
00376   asio::detail::mutex mutex_;
00377 
00378   // The task to be run by this service.
00379   Task& task_;
00380 
00381   // Handler object to represent the position of the task in the queue.
00382   class task_handler
00383     : public handler_queue::handler
00384   {
00385   public:
00386     task_handler()
00387       : handler_queue::handler(0, 0)
00388     {
00389     }
00390   } task_handler_;
00391 
00392   // Whether the task has been interrupted.
00393   bool task_interrupted_;
00394 
00395   // The count of unfinished work.
00396   int outstanding_work_;
00397 
00398   // The queue of handlers that are ready to be delivered.
00399   handler_queue handler_queue_;
00400 
00401   // Flag to indicate that the dispatcher has been stopped.
00402   bool stopped_;
00403 
00404   // Flag to indicate that the dispatcher has been shut down.
00405   bool shutdown_;
00406 
00407   // Structure containing information about an idle thread.
00408   struct idle_thread_info
00409   {
00410     event wakeup_event;
00411     idle_thread_info* next;
00412   };
00413 
00414   // The number of threads that are currently idle.
00415   idle_thread_info* first_idle_thread_;
00416 };
00417 
00418 } // namespace detail
00419 } // namespace asio
00420 
00421 #include "asio/detail/pop_options.hpp"
00422 
00423 #endif // defined(ASIO_ENABLE_TWO_LOCK_QUEUE)
00424 
00425 #endif // ASIO_DETAIL_TASK_IO_SERVICE_HPP
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines


Castor
Author(s): Carpe Noctem
autogenerated on Fri Nov 8 2013 11:05:39