select_reactor.hpp
Go to the documentation of this file.
00001 //
00002 // select_reactor.hpp
00003 // ~~~~~~~~~~~~~~~~~~
00004 //
00005 // Copyright (c) 2003-2008 Christopher M. Kohlhoff (chris at kohlhoff dot com)
00006 //
00007 // Distributed under the Boost Software License, Version 1.0. (See accompanying
00008 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
00009 //
00010 
00011 #ifndef ASIO_DETAIL_SELECT_REACTOR_HPP
00012 #define ASIO_DETAIL_SELECT_REACTOR_HPP
00013 
00014 #if defined(_MSC_VER) && (_MSC_VER >= 1200)
00015 # pragma once
00016 #endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
00017 
00018 #include "asio/detail/push_options.hpp"
00019 
00020 #include "asio/detail/socket_types.hpp" // Must come before posix_time.
00021 
00022 #include "asio/detail/push_options.hpp"
00023 #include <cstddef>
00024 #include <boost/config.hpp>
00025 #include <boost/date_time/posix_time/posix_time_types.hpp>
00026 #include <boost/shared_ptr.hpp>
00027 #include <vector>
00028 #include "asio/detail/pop_options.hpp"
00029 
00030 #include "asio/io_service.hpp"
00031 #include "asio/detail/bind_handler.hpp"
00032 #include "asio/detail/fd_set_adapter.hpp"
00033 #include "asio/detail/mutex.hpp"
00034 #include "asio/detail/noncopyable.hpp"
00035 #include "asio/detail/reactor_op_queue.hpp"
00036 #include "asio/detail/select_interrupter.hpp"
00037 #include "asio/detail/select_reactor_fwd.hpp"
00038 #include "asio/detail/service_base.hpp"
00039 #include "asio/detail/signal_blocker.hpp"
00040 #include "asio/detail/socket_ops.hpp"
00041 #include "asio/detail/socket_types.hpp"
00042 #include "asio/detail/task_io_service.hpp"
00043 #include "asio/detail/thread.hpp"
00044 #include "asio/detail/timer_queue.hpp"
00045 
00046 namespace asio {
00047 namespace detail {
00048 
00049 template <bool Own_Thread>
00050 class select_reactor
00051   : public asio::detail::service_base<select_reactor<Own_Thread> >
00052 {
00053 public:
00054   // Per-descriptor data.
00055   struct per_descriptor_data
00056   {
00057   };
00058 
00059   // Constructor.
00060   select_reactor(asio::io_service& io_service)
00061     : asio::detail::service_base<
00062         select_reactor<Own_Thread> >(io_service),
00063       mutex_(),
00064       select_in_progress_(false),
00065       interrupter_(),
00066       read_op_queue_(),
00067       write_op_queue_(),
00068       except_op_queue_(),
00069       pending_cancellations_(),
00070       stop_thread_(false),
00071       thread_(0),
00072       shutdown_(false)
00073   {
00074     if (Own_Thread)
00075     {
00076       asio::detail::signal_blocker sb;
00077       thread_ = new asio::detail::thread(
00078           bind_handler(&select_reactor::call_run_thread, this));
00079     }
00080   }
00081 
00082   // Destructor.
00083   ~select_reactor()
00084   {
00085     shutdown_service();
00086   }
00087 
00088   // Destroy all user-defined handler objects owned by the service.
00089   void shutdown_service()
00090   {
00091     asio::detail::mutex::scoped_lock lock(mutex_);
00092     shutdown_ = true;
00093     stop_thread_ = true;
00094     lock.unlock();
00095 
00096     if (thread_)
00097     {
00098       interrupter_.interrupt();
00099       thread_->join();
00100       delete thread_;
00101       thread_ = 0;
00102     }
00103 
00104     read_op_queue_.destroy_operations();
00105     write_op_queue_.destroy_operations();
00106     except_op_queue_.destroy_operations();
00107 
00108     for (std::size_t i = 0; i < timer_queues_.size(); ++i)
00109       timer_queues_[i]->destroy_timers();
00110     timer_queues_.clear();
00111   }
00112 
00113   // Register a socket with the reactor. Returns 0 on success, system error
00114   // code on failure.
00115   int register_descriptor(socket_type, per_descriptor_data&)
00116   {
00117     return 0;
00118   }
00119 
00120   // Start a new read operation. The handler object will be invoked when the
00121   // given descriptor is ready to be read, or an error has occurred.
00122   template <typename Handler>
00123   void start_read_op(socket_type descriptor, per_descriptor_data&,
00124       Handler handler, bool /*allow_speculative_read*/ = true)
00125   {
00126     asio::detail::mutex::scoped_lock lock(mutex_);
00127     if (!shutdown_)
00128       if (read_op_queue_.enqueue_operation(descriptor, handler))
00129         interrupter_.interrupt();
00130   }
00131 
00132   // Start a new write operation. The handler object will be invoked when the
00133   // given descriptor is ready to be written, or an error has occurred.
00134   template <typename Handler>
00135   void start_write_op(socket_type descriptor, per_descriptor_data&,
00136       Handler handler, bool /*allow_speculative_write*/ = true)
00137   {
00138     asio::detail::mutex::scoped_lock lock(mutex_);
00139     if (!shutdown_)
00140       if (write_op_queue_.enqueue_operation(descriptor, handler))
00141         interrupter_.interrupt();
00142   }
00143 
00144   // Start a new exception operation. The handler object will be invoked when
00145   // the given descriptor has exception information, or an error has occurred.
00146   template <typename Handler>
00147   void start_except_op(socket_type descriptor,
00148       per_descriptor_data&, Handler handler)
00149   {
00150     asio::detail::mutex::scoped_lock lock(mutex_);
00151     if (!shutdown_)
00152       if (except_op_queue_.enqueue_operation(descriptor, handler))
00153         interrupter_.interrupt();
00154   }
00155 
00156   // Wrapper for connect handlers to enable the handler object to be placed
00157   // in both the write and the except operation queues, but ensure that only
00158   // one of the handlers is called.
00159   template <typename Handler>
00160   class connect_handler_wrapper
00161   {
00162   public:
00163     connect_handler_wrapper(socket_type descriptor,
00164         boost::shared_ptr<bool> completed,
00165         select_reactor<Own_Thread>& reactor, Handler handler)
00166       : descriptor_(descriptor),
00167         completed_(completed),
00168         reactor_(reactor),
00169         handler_(handler)
00170     {
00171     }
00172 
00173     bool perform(asio::error_code& ec,
00174         std::size_t& bytes_transferred)
00175     {
00176       // Check whether one of the handlers has already been called. If it has,
00177       // then we don't want to do anything in this handler.
00178       if (*completed_)
00179       {
00180         completed_.reset(); // Indicate that this handler should not complete.
00181         return true;
00182       }
00183 
00184       // Cancel the other reactor operation for the connection.
00185       *completed_ = true;
00186       reactor_.enqueue_cancel_ops_unlocked(descriptor_);
00187 
00188       // Call the contained handler.
00189       return handler_.perform(ec, bytes_transferred);
00190     }
00191 
00192     void complete(const asio::error_code& ec,
00193         std::size_t bytes_transferred)
00194     {
00195       if (completed_.get())
00196         handler_.complete(ec, bytes_transferred);
00197     }
00198 
00199   private:
00200     socket_type descriptor_;
00201     boost::shared_ptr<bool> completed_;
00202     select_reactor<Own_Thread>& reactor_;
00203     Handler handler_;
00204   };
00205 
00206   // Start new write and exception operations. The handler object will be
00207   // invoked when the given descriptor is ready for writing or has exception
00208   // information available, or an error has occurred. The handler will be called
00209   // only once.
00210   template <typename Handler>
00211   void start_connect_op(socket_type descriptor,
00212       per_descriptor_data&, Handler handler)
00213   {
00214     asio::detail::mutex::scoped_lock lock(mutex_);
00215     if (!shutdown_)
00216     {
00217       boost::shared_ptr<bool> completed(new bool(false));
00218       connect_handler_wrapper<Handler> wrapped_handler(
00219           descriptor, completed, *this, handler);
00220       bool interrupt = write_op_queue_.enqueue_operation(
00221           descriptor, wrapped_handler);
00222       interrupt = except_op_queue_.enqueue_operation(
00223           descriptor, wrapped_handler) || interrupt;
00224       if (interrupt)
00225         interrupter_.interrupt();
00226     }
00227   }
00228 
00229   // Cancel all operations associated with the given descriptor. The
00230   // handlers associated with the descriptor will be invoked with the
00231   // operation_aborted error.
00232   void cancel_ops(socket_type descriptor, per_descriptor_data&)
00233   {
00234     asio::detail::mutex::scoped_lock lock(mutex_);
00235     cancel_ops_unlocked(descriptor);
00236   }
00237 
00238   // Enqueue cancellation of all operations associated with the given
00239   // descriptor. The handlers associated with the descriptor will be invoked
00240   // with the operation_aborted error. This function does not acquire the
00241   // select_reactor's mutex, and so should only be used when the reactor lock is
00242   // already held.
00243   void enqueue_cancel_ops_unlocked(socket_type descriptor)
00244   {
00245     pending_cancellations_.push_back(descriptor);
00246   }
00247 
00248   // Cancel any operations that are running against the descriptor and remove
00249   // its registration from the reactor.
00250   void close_descriptor(socket_type descriptor, per_descriptor_data&)
00251   {
00252     asio::detail::mutex::scoped_lock lock(mutex_);
00253     cancel_ops_unlocked(descriptor);
00254   }
00255 
00256   // Add a new timer queue to the reactor.
00257   template <typename Time_Traits>
00258   void add_timer_queue(timer_queue<Time_Traits>& timer_queue)
00259   {
00260     asio::detail::mutex::scoped_lock lock(mutex_);
00261     timer_queues_.push_back(&timer_queue);
00262   }
00263 
00264   // Remove a timer queue from the reactor.
00265   template <typename Time_Traits>
00266   void remove_timer_queue(timer_queue<Time_Traits>& timer_queue)
00267   {
00268     asio::detail::mutex::scoped_lock lock(mutex_);
00269     for (std::size_t i = 0; i < timer_queues_.size(); ++i)
00270     {
00271       if (timer_queues_[i] == &timer_queue)
00272       {
00273         timer_queues_.erase(timer_queues_.begin() + i);
00274         return;
00275       }
00276     }
00277   }
00278 
00279   // Schedule a timer in the given timer queue to expire at the specified
00280   // absolute time. The handler object will be invoked when the timer expires.
00281   template <typename Time_Traits, typename Handler>
00282   void schedule_timer(timer_queue<Time_Traits>& timer_queue,
00283       const typename Time_Traits::time_type& time, Handler handler, void* token)
00284   {
00285     asio::detail::mutex::scoped_lock lock(mutex_);
00286     if (!shutdown_)
00287       if (timer_queue.enqueue_timer(time, handler, token))
00288         interrupter_.interrupt();
00289   }
00290 
00291   // Cancel the timer associated with the given token. Returns the number of
00292   // handlers that have been posted or dispatched.
00293   template <typename Time_Traits>
00294   std::size_t cancel_timer(timer_queue<Time_Traits>& timer_queue, void* token)
00295   {
00296     asio::detail::mutex::scoped_lock lock(mutex_);
00297     std::size_t n = timer_queue.cancel_timer(token);
00298     if (n > 0)
00299       interrupter_.interrupt();
00300     return n;
00301   }
00302 
00303 private:
00304   friend class task_io_service<select_reactor<Own_Thread> >;
00305 
00306   // Run select once until interrupted or events are ready to be dispatched.
00307   void run(bool block)
00308   {
00309     asio::detail::mutex::scoped_lock lock(mutex_);
00310 
00311     // Dispatch any operation cancellations that were made while the select
00312     // loop was not running.
00313     read_op_queue_.perform_cancellations();
00314     write_op_queue_.perform_cancellations();
00315     except_op_queue_.perform_cancellations();
00316     for (std::size_t i = 0; i < timer_queues_.size(); ++i)
00317       timer_queues_[i]->dispatch_cancellations();
00318 
00319     // Check if the thread is supposed to stop.
00320     if (stop_thread_)
00321     {
00322       complete_operations_and_timers(lock);
00323       return;
00324     }
00325 
00326     // We can return immediately if there's no work to do and the reactor is
00327     // not supposed to block.
00328     if (!block && read_op_queue_.empty() && write_op_queue_.empty()
00329         && except_op_queue_.empty() && all_timer_queues_are_empty())
00330     {
00331       complete_operations_and_timers(lock);
00332       return;
00333     }
00334 
00335     // Set up the descriptor sets.
00336     fd_set_adapter read_fds;
00337     read_fds.set(interrupter_.read_descriptor());
00338     read_op_queue_.get_descriptors(read_fds);
00339     fd_set_adapter write_fds;
00340     write_op_queue_.get_descriptors(write_fds);
00341     fd_set_adapter except_fds;
00342     except_op_queue_.get_descriptors(except_fds);
00343     socket_type max_fd = read_fds.max_descriptor();
00344     if (write_fds.max_descriptor() > max_fd)
00345       max_fd = write_fds.max_descriptor();
00346     if (except_fds.max_descriptor() > max_fd)
00347       max_fd = except_fds.max_descriptor();
00348 
00349     // Block on the select call without holding the lock so that new
00350     // operations can be started while the call is executing.
00351     timeval tv_buf = { 0, 0 };
00352     timeval* tv = block ? get_timeout(tv_buf) : &tv_buf;
00353     select_in_progress_ = true;
00354     lock.unlock();
00355     asio::error_code ec;
00356     int retval = socket_ops::select(static_cast<int>(max_fd + 1),
00357         read_fds, write_fds, except_fds, tv, ec);
00358     lock.lock();
00359     select_in_progress_ = false;
00360 
00361     // Block signals while dispatching operations.
00362     asio::detail::signal_blocker sb;
00363 
00364     // Reset the interrupter.
00365     if (retval > 0 && read_fds.is_set(interrupter_.read_descriptor()))
00366       interrupter_.reset();
00367 
00368     // Dispatch all ready operations.
00369     if (retval > 0)
00370     {
00371       // Exception operations must be processed first to ensure that any
00372       // out-of-band data is read before normal data.
00373       except_op_queue_.perform_operations_for_descriptors(
00374           except_fds, asio::error_code());
00375       read_op_queue_.perform_operations_for_descriptors(
00376           read_fds, asio::error_code());
00377       write_op_queue_.perform_operations_for_descriptors(
00378           write_fds, asio::error_code());
00379       except_op_queue_.perform_cancellations();
00380       read_op_queue_.perform_cancellations();
00381       write_op_queue_.perform_cancellations();
00382     }
00383     for (std::size_t i = 0; i < timer_queues_.size(); ++i)
00384     {
00385       timer_queues_[i]->dispatch_timers();
00386       timer_queues_[i]->dispatch_cancellations();
00387     }
00388 
00389     // Issue any pending cancellations.
00390     for (size_t i = 0; i < pending_cancellations_.size(); ++i)
00391       cancel_ops_unlocked(pending_cancellations_[i]);
00392     pending_cancellations_.clear();
00393 
00394     complete_operations_and_timers(lock);
00395   }
00396 
00397   // Run the select loop in the thread.
00398   void run_thread()
00399   {
00400     asio::detail::mutex::scoped_lock lock(mutex_);
00401     while (!stop_thread_)
00402     {
00403       lock.unlock();
00404       run(true);
00405       lock.lock();
00406     }
00407   }
00408 
00409   // Entry point for the select loop thread.
00410   static void call_run_thread(select_reactor* reactor)
00411   {
00412     reactor->run_thread();
00413   }
00414 
00415   // Interrupt the select loop.
00416   void interrupt()
00417   {
00418     interrupter_.interrupt();
00419   }
00420 
00421   // Check if all timer queues are empty.
00422   bool all_timer_queues_are_empty() const
00423   {
00424     for (std::size_t i = 0; i < timer_queues_.size(); ++i)
00425       if (!timer_queues_[i]->empty())
00426         return false;
00427     return true;
00428   }
00429 
00430   // Get the timeout value for the select call.
00431   timeval* get_timeout(timeval& tv)
00432   {
00433     if (all_timer_queues_are_empty())
00434       return 0;
00435 
00436     // By default we will wait no longer than 5 minutes. This will ensure that
00437     // any changes to the system clock are detected after no longer than this.
00438     boost::posix_time::time_duration minimum_wait_duration
00439       = boost::posix_time::minutes(5);
00440 
00441     for (std::size_t i = 0; i < timer_queues_.size(); ++i)
00442     {
00443       boost::posix_time::time_duration wait_duration
00444         = timer_queues_[i]->wait_duration();
00445       if (wait_duration < minimum_wait_duration)
00446         minimum_wait_duration = wait_duration;
00447     }
00448 
00449     if (minimum_wait_duration > boost::posix_time::time_duration())
00450     {
00451       tv.tv_sec = minimum_wait_duration.total_seconds();
00452       tv.tv_usec = minimum_wait_duration.total_microseconds() % 1000000;
00453     }
00454     else
00455     {
00456       tv.tv_sec = 0;
00457       tv.tv_usec = 0;
00458     }
00459 
00460     return &tv;
00461   }
00462 
00463   // Cancel all operations associated with the given descriptor. The do_cancel
00464   // function of the handler objects will be invoked. This function does not
00465   // acquire the select_reactor's mutex.
00466   void cancel_ops_unlocked(socket_type descriptor)
00467   {
00468     bool interrupt = read_op_queue_.cancel_operations(descriptor);
00469     interrupt = write_op_queue_.cancel_operations(descriptor) || interrupt;
00470     interrupt = except_op_queue_.cancel_operations(descriptor) || interrupt;
00471     if (interrupt)
00472       interrupter_.interrupt();
00473   }
00474 
00475   // Clean up operations and timers. We must not hold the lock since the
00476   // destructors may make calls back into this reactor. We make a copy of the
00477   // vector of timer queues since the original may be modified while the lock
00478   // is not held.
00479   void complete_operations_and_timers(
00480       asio::detail::mutex::scoped_lock& lock)
00481   {
00482     timer_queues_for_cleanup_ = timer_queues_;
00483     lock.unlock();
00484     read_op_queue_.complete_operations();
00485     write_op_queue_.complete_operations();
00486     except_op_queue_.complete_operations();
00487     for (std::size_t i = 0; i < timer_queues_for_cleanup_.size(); ++i)
00488       timer_queues_for_cleanup_[i]->complete_timers();
00489   }
00490 
00491   // Mutex to protect access to internal data.
00492   asio::detail::mutex mutex_;
00493 
00494   // Whether the select loop is currently running or not.
00495   bool select_in_progress_;
00496 
00497   // The interrupter is used to break a blocking select call.
00498   select_interrupter interrupter_;
00499 
00500   // The queue of read operations.
00501   reactor_op_queue<socket_type> read_op_queue_;
00502 
00503   // The queue of write operations.
00504   reactor_op_queue<socket_type> write_op_queue_;
00505 
00506   // The queue of exception operations.
00507   reactor_op_queue<socket_type> except_op_queue_;
00508 
00509   // The timer queues.
00510   std::vector<timer_queue_base*> timer_queues_;
00511 
00512   // A copy of the timer queues, used when cleaning up timers. The copy is
00513   // stored as a class data member to avoid unnecessary memory allocation.
00514   std::vector<timer_queue_base*> timer_queues_for_cleanup_;
00515 
00516   // The descriptors that are pending cancellation.
00517   std::vector<socket_type> pending_cancellations_;
00518 
00519   // Does the reactor loop thread need to stop.
00520   bool stop_thread_;
00521 
00522   // The thread that is running the reactor loop.
00523   asio::detail::thread* thread_;
00524 
00525   // Whether the service has been shut down.
00526   bool shutdown_;
00527 };
00528 
00529 } // namespace detail
00530 } // namespace asio
00531 
00532 #include "asio/detail/pop_options.hpp"
00533 
00534 #endif // ASIO_DETAIL_SELECT_REACTOR_HPP
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines


Castor
Author(s): Carpe Noctem
autogenerated on Fri Nov 8 2013 11:05:39