$search
00001 // 00002 // select_reactor.hpp 00003 // ~~~~~~~~~~~~~~~~~~ 00004 // 00005 // Copyright (c) 2003-2008 Christopher M. Kohlhoff (chris at kohlhoff dot com) 00006 // 00007 // Distributed under the Boost Software License, Version 1.0. (See accompanying 00008 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) 00009 // 00010 00011 #ifndef ASIO_DETAIL_SELECT_REACTOR_HPP 00012 #define ASIO_DETAIL_SELECT_REACTOR_HPP 00013 00014 #if defined(_MSC_VER) && (_MSC_VER >= 1200) 00015 # pragma once 00016 #endif // defined(_MSC_VER) && (_MSC_VER >= 1200) 00017 00018 #include "asio/detail/push_options.hpp" 00019 00020 #include "asio/detail/socket_types.hpp" // Must come before posix_time. 00021 00022 #include "asio/detail/push_options.hpp" 00023 #include <cstddef> 00024 #include <boost/config.hpp> 00025 #include <boost/date_time/posix_time/posix_time_types.hpp> 00026 #include <boost/shared_ptr.hpp> 00027 #include <vector> 00028 #include "asio/detail/pop_options.hpp" 00029 00030 #include "asio/io_service.hpp" 00031 #include "asio/detail/bind_handler.hpp" 00032 #include "asio/detail/fd_set_adapter.hpp" 00033 #include "asio/detail/mutex.hpp" 00034 #include "asio/detail/noncopyable.hpp" 00035 #include "asio/detail/reactor_op_queue.hpp" 00036 #include "asio/detail/select_interrupter.hpp" 00037 #include "asio/detail/select_reactor_fwd.hpp" 00038 #include "asio/detail/service_base.hpp" 00039 #include "asio/detail/signal_blocker.hpp" 00040 #include "asio/detail/socket_ops.hpp" 00041 #include "asio/detail/socket_types.hpp" 00042 #include "asio/detail/task_io_service.hpp" 00043 #include "asio/detail/thread.hpp" 00044 #include "asio/detail/timer_queue.hpp" 00045 00046 namespace asio { 00047 namespace detail { 00048 00049 template <bool Own_Thread> 00050 class select_reactor 00051 : public asio::detail::service_base<select_reactor<Own_Thread> > 00052 { 00053 public: 00054 // Per-descriptor data. 00055 struct per_descriptor_data 00056 { 00057 }; 00058 00059 // Constructor. 00060 select_reactor(asio::io_service& io_service) 00061 : asio::detail::service_base< 00062 select_reactor<Own_Thread> >(io_service), 00063 mutex_(), 00064 select_in_progress_(false), 00065 interrupter_(), 00066 read_op_queue_(), 00067 write_op_queue_(), 00068 except_op_queue_(), 00069 pending_cancellations_(), 00070 stop_thread_(false), 00071 thread_(0), 00072 shutdown_(false) 00073 { 00074 if (Own_Thread) 00075 { 00076 asio::detail::signal_blocker sb; 00077 thread_ = new asio::detail::thread( 00078 bind_handler(&select_reactor::call_run_thread, this)); 00079 } 00080 } 00081 00082 // Destructor. 00083 ~select_reactor() 00084 { 00085 shutdown_service(); 00086 } 00087 00088 // Destroy all user-defined handler objects owned by the service. 00089 void shutdown_service() 00090 { 00091 asio::detail::mutex::scoped_lock lock(mutex_); 00092 shutdown_ = true; 00093 stop_thread_ = true; 00094 lock.unlock(); 00095 00096 if (thread_) 00097 { 00098 interrupter_.interrupt(); 00099 thread_->join(); 00100 delete thread_; 00101 thread_ = 0; 00102 } 00103 00104 read_op_queue_.destroy_operations(); 00105 write_op_queue_.destroy_operations(); 00106 except_op_queue_.destroy_operations(); 00107 00108 for (std::size_t i = 0; i < timer_queues_.size(); ++i) 00109 timer_queues_[i]->destroy_timers(); 00110 timer_queues_.clear(); 00111 } 00112 00113 // Register a socket with the reactor. Returns 0 on success, system error 00114 // code on failure. 00115 int register_descriptor(socket_type, per_descriptor_data&) 00116 { 00117 return 0; 00118 } 00119 00120 // Start a new read operation. The handler object will be invoked when the 00121 // given descriptor is ready to be read, or an error has occurred. 00122 template <typename Handler> 00123 void start_read_op(socket_type descriptor, per_descriptor_data&, 00124 Handler handler, bool /*allow_speculative_read*/ = true) 00125 { 00126 asio::detail::mutex::scoped_lock lock(mutex_); 00127 if (!shutdown_) 00128 if (read_op_queue_.enqueue_operation(descriptor, handler)) 00129 interrupter_.interrupt(); 00130 } 00131 00132 // Start a new write operation. The handler object will be invoked when the 00133 // given descriptor is ready to be written, or an error has occurred. 00134 template <typename Handler> 00135 void start_write_op(socket_type descriptor, per_descriptor_data&, 00136 Handler handler, bool /*allow_speculative_write*/ = true) 00137 { 00138 asio::detail::mutex::scoped_lock lock(mutex_); 00139 if (!shutdown_) 00140 if (write_op_queue_.enqueue_operation(descriptor, handler)) 00141 interrupter_.interrupt(); 00142 } 00143 00144 // Start a new exception operation. The handler object will be invoked when 00145 // the given descriptor has exception information, or an error has occurred. 00146 template <typename Handler> 00147 void start_except_op(socket_type descriptor, 00148 per_descriptor_data&, Handler handler) 00149 { 00150 asio::detail::mutex::scoped_lock lock(mutex_); 00151 if (!shutdown_) 00152 if (except_op_queue_.enqueue_operation(descriptor, handler)) 00153 interrupter_.interrupt(); 00154 } 00155 00156 // Wrapper for connect handlers to enable the handler object to be placed 00157 // in both the write and the except operation queues, but ensure that only 00158 // one of the handlers is called. 00159 template <typename Handler> 00160 class connect_handler_wrapper 00161 { 00162 public: 00163 connect_handler_wrapper(socket_type descriptor, 00164 boost::shared_ptr<bool> completed, 00165 select_reactor<Own_Thread>& reactor, Handler handler) 00166 : descriptor_(descriptor), 00167 completed_(completed), 00168 reactor_(reactor), 00169 handler_(handler) 00170 { 00171 } 00172 00173 bool perform(asio::error_code& ec, 00174 std::size_t& bytes_transferred) 00175 { 00176 // Check whether one of the handlers has already been called. If it has, 00177 // then we don't want to do anything in this handler. 00178 if (*completed_) 00179 { 00180 completed_.reset(); // Indicate that this handler should not complete. 00181 return true; 00182 } 00183 00184 // Cancel the other reactor operation for the connection. 00185 *completed_ = true; 00186 reactor_.enqueue_cancel_ops_unlocked(descriptor_); 00187 00188 // Call the contained handler. 00189 return handler_.perform(ec, bytes_transferred); 00190 } 00191 00192 void complete(const asio::error_code& ec, 00193 std::size_t bytes_transferred) 00194 { 00195 if (completed_.get()) 00196 handler_.complete(ec, bytes_transferred); 00197 } 00198 00199 private: 00200 socket_type descriptor_; 00201 boost::shared_ptr<bool> completed_; 00202 select_reactor<Own_Thread>& reactor_; 00203 Handler handler_; 00204 }; 00205 00206 // Start new write and exception operations. The handler object will be 00207 // invoked when the given descriptor is ready for writing or has exception 00208 // information available, or an error has occurred. The handler will be called 00209 // only once. 00210 template <typename Handler> 00211 void start_connect_op(socket_type descriptor, 00212 per_descriptor_data&, Handler handler) 00213 { 00214 asio::detail::mutex::scoped_lock lock(mutex_); 00215 if (!shutdown_) 00216 { 00217 boost::shared_ptr<bool> completed(new bool(false)); 00218 connect_handler_wrapper<Handler> wrapped_handler( 00219 descriptor, completed, *this, handler); 00220 bool interrupt = write_op_queue_.enqueue_operation( 00221 descriptor, wrapped_handler); 00222 interrupt = except_op_queue_.enqueue_operation( 00223 descriptor, wrapped_handler) || interrupt; 00224 if (interrupt) 00225 interrupter_.interrupt(); 00226 } 00227 } 00228 00229 // Cancel all operations associated with the given descriptor. The 00230 // handlers associated with the descriptor will be invoked with the 00231 // operation_aborted error. 00232 void cancel_ops(socket_type descriptor, per_descriptor_data&) 00233 { 00234 asio::detail::mutex::scoped_lock lock(mutex_); 00235 cancel_ops_unlocked(descriptor); 00236 } 00237 00238 // Enqueue cancellation of all operations associated with the given 00239 // descriptor. The handlers associated with the descriptor will be invoked 00240 // with the operation_aborted error. This function does not acquire the 00241 // select_reactor's mutex, and so should only be used when the reactor lock is 00242 // already held. 00243 void enqueue_cancel_ops_unlocked(socket_type descriptor) 00244 { 00245 pending_cancellations_.push_back(descriptor); 00246 } 00247 00248 // Cancel any operations that are running against the descriptor and remove 00249 // its registration from the reactor. 00250 void close_descriptor(socket_type descriptor, per_descriptor_data&) 00251 { 00252 asio::detail::mutex::scoped_lock lock(mutex_); 00253 cancel_ops_unlocked(descriptor); 00254 } 00255 00256 // Add a new timer queue to the reactor. 00257 template <typename Time_Traits> 00258 void add_timer_queue(timer_queue<Time_Traits>& timer_queue) 00259 { 00260 asio::detail::mutex::scoped_lock lock(mutex_); 00261 timer_queues_.push_back(&timer_queue); 00262 } 00263 00264 // Remove a timer queue from the reactor. 00265 template <typename Time_Traits> 00266 void remove_timer_queue(timer_queue<Time_Traits>& timer_queue) 00267 { 00268 asio::detail::mutex::scoped_lock lock(mutex_); 00269 for (std::size_t i = 0; i < timer_queues_.size(); ++i) 00270 { 00271 if (timer_queues_[i] == &timer_queue) 00272 { 00273 timer_queues_.erase(timer_queues_.begin() + i); 00274 return; 00275 } 00276 } 00277 } 00278 00279 // Schedule a timer in the given timer queue to expire at the specified 00280 // absolute time. The handler object will be invoked when the timer expires. 00281 template <typename Time_Traits, typename Handler> 00282 void schedule_timer(timer_queue<Time_Traits>& timer_queue, 00283 const typename Time_Traits::time_type& time, Handler handler, void* token) 00284 { 00285 asio::detail::mutex::scoped_lock lock(mutex_); 00286 if (!shutdown_) 00287 if (timer_queue.enqueue_timer(time, handler, token)) 00288 interrupter_.interrupt(); 00289 } 00290 00291 // Cancel the timer associated with the given token. Returns the number of 00292 // handlers that have been posted or dispatched. 00293 template <typename Time_Traits> 00294 std::size_t cancel_timer(timer_queue<Time_Traits>& timer_queue, void* token) 00295 { 00296 asio::detail::mutex::scoped_lock lock(mutex_); 00297 std::size_t n = timer_queue.cancel_timer(token); 00298 if (n > 0) 00299 interrupter_.interrupt(); 00300 return n; 00301 } 00302 00303 private: 00304 friend class task_io_service<select_reactor<Own_Thread> >; 00305 00306 // Run select once until interrupted or events are ready to be dispatched. 00307 void run(bool block) 00308 { 00309 asio::detail::mutex::scoped_lock lock(mutex_); 00310 00311 // Dispatch any operation cancellations that were made while the select 00312 // loop was not running. 00313 read_op_queue_.perform_cancellations(); 00314 write_op_queue_.perform_cancellations(); 00315 except_op_queue_.perform_cancellations(); 00316 for (std::size_t i = 0; i < timer_queues_.size(); ++i) 00317 timer_queues_[i]->dispatch_cancellations(); 00318 00319 // Check if the thread is supposed to stop. 00320 if (stop_thread_) 00321 { 00322 complete_operations_and_timers(lock); 00323 return; 00324 } 00325 00326 // We can return immediately if there's no work to do and the reactor is 00327 // not supposed to block. 00328 if (!block && read_op_queue_.empty() && write_op_queue_.empty() 00329 && except_op_queue_.empty() && all_timer_queues_are_empty()) 00330 { 00331 complete_operations_and_timers(lock); 00332 return; 00333 } 00334 00335 // Set up the descriptor sets. 00336 fd_set_adapter read_fds; 00337 read_fds.set(interrupter_.read_descriptor()); 00338 read_op_queue_.get_descriptors(read_fds); 00339 fd_set_adapter write_fds; 00340 write_op_queue_.get_descriptors(write_fds); 00341 fd_set_adapter except_fds; 00342 except_op_queue_.get_descriptors(except_fds); 00343 socket_type max_fd = read_fds.max_descriptor(); 00344 if (write_fds.max_descriptor() > max_fd) 00345 max_fd = write_fds.max_descriptor(); 00346 if (except_fds.max_descriptor() > max_fd) 00347 max_fd = except_fds.max_descriptor(); 00348 00349 // Block on the select call without holding the lock so that new 00350 // operations can be started while the call is executing. 00351 timeval tv_buf = { 0, 0 }; 00352 timeval* tv = block ? get_timeout(tv_buf) : &tv_buf; 00353 select_in_progress_ = true; 00354 lock.unlock(); 00355 asio::error_code ec; 00356 int retval = socket_ops::select(static_cast<int>(max_fd + 1), 00357 read_fds, write_fds, except_fds, tv, ec); 00358 lock.lock(); 00359 select_in_progress_ = false; 00360 00361 // Block signals while dispatching operations. 00362 asio::detail::signal_blocker sb; 00363 00364 // Reset the interrupter. 00365 if (retval > 0 && read_fds.is_set(interrupter_.read_descriptor())) 00366 interrupter_.reset(); 00367 00368 // Dispatch all ready operations. 00369 if (retval > 0) 00370 { 00371 // Exception operations must be processed first to ensure that any 00372 // out-of-band data is read before normal data. 00373 except_op_queue_.perform_operations_for_descriptors( 00374 except_fds, asio::error_code()); 00375 read_op_queue_.perform_operations_for_descriptors( 00376 read_fds, asio::error_code()); 00377 write_op_queue_.perform_operations_for_descriptors( 00378 write_fds, asio::error_code()); 00379 except_op_queue_.perform_cancellations(); 00380 read_op_queue_.perform_cancellations(); 00381 write_op_queue_.perform_cancellations(); 00382 } 00383 for (std::size_t i = 0; i < timer_queues_.size(); ++i) 00384 { 00385 timer_queues_[i]->dispatch_timers(); 00386 timer_queues_[i]->dispatch_cancellations(); 00387 } 00388 00389 // Issue any pending cancellations. 00390 for (size_t i = 0; i < pending_cancellations_.size(); ++i) 00391 cancel_ops_unlocked(pending_cancellations_[i]); 00392 pending_cancellations_.clear(); 00393 00394 complete_operations_and_timers(lock); 00395 } 00396 00397 // Run the select loop in the thread. 00398 void run_thread() 00399 { 00400 asio::detail::mutex::scoped_lock lock(mutex_); 00401 while (!stop_thread_) 00402 { 00403 lock.unlock(); 00404 run(true); 00405 lock.lock(); 00406 } 00407 } 00408 00409 // Entry point for the select loop thread. 00410 static void call_run_thread(select_reactor* reactor) 00411 { 00412 reactor->run_thread(); 00413 } 00414 00415 // Interrupt the select loop. 00416 void interrupt() 00417 { 00418 interrupter_.interrupt(); 00419 } 00420 00421 // Check if all timer queues are empty. 00422 bool all_timer_queues_are_empty() const 00423 { 00424 for (std::size_t i = 0; i < timer_queues_.size(); ++i) 00425 if (!timer_queues_[i]->empty()) 00426 return false; 00427 return true; 00428 } 00429 00430 // Get the timeout value for the select call. 00431 timeval* get_timeout(timeval& tv) 00432 { 00433 if (all_timer_queues_are_empty()) 00434 return 0; 00435 00436 // By default we will wait no longer than 5 minutes. This will ensure that 00437 // any changes to the system clock are detected after no longer than this. 00438 boost::posix_time::time_duration minimum_wait_duration 00439 = boost::posix_time::minutes(5); 00440 00441 for (std::size_t i = 0; i < timer_queues_.size(); ++i) 00442 { 00443 boost::posix_time::time_duration wait_duration 00444 = timer_queues_[i]->wait_duration(); 00445 if (wait_duration < minimum_wait_duration) 00446 minimum_wait_duration = wait_duration; 00447 } 00448 00449 if (minimum_wait_duration > boost::posix_time::time_duration()) 00450 { 00451 tv.tv_sec = minimum_wait_duration.total_seconds(); 00452 tv.tv_usec = minimum_wait_duration.total_microseconds() % 1000000; 00453 } 00454 else 00455 { 00456 tv.tv_sec = 0; 00457 tv.tv_usec = 0; 00458 } 00459 00460 return &tv; 00461 } 00462 00463 // Cancel all operations associated with the given descriptor. The do_cancel 00464 // function of the handler objects will be invoked. This function does not 00465 // acquire the select_reactor's mutex. 00466 void cancel_ops_unlocked(socket_type descriptor) 00467 { 00468 bool interrupt = read_op_queue_.cancel_operations(descriptor); 00469 interrupt = write_op_queue_.cancel_operations(descriptor) || interrupt; 00470 interrupt = except_op_queue_.cancel_operations(descriptor) || interrupt; 00471 if (interrupt) 00472 interrupter_.interrupt(); 00473 } 00474 00475 // Clean up operations and timers. We must not hold the lock since the 00476 // destructors may make calls back into this reactor. We make a copy of the 00477 // vector of timer queues since the original may be modified while the lock 00478 // is not held. 00479 void complete_operations_and_timers( 00480 asio::detail::mutex::scoped_lock& lock) 00481 { 00482 timer_queues_for_cleanup_ = timer_queues_; 00483 lock.unlock(); 00484 read_op_queue_.complete_operations(); 00485 write_op_queue_.complete_operations(); 00486 except_op_queue_.complete_operations(); 00487 for (std::size_t i = 0; i < timer_queues_for_cleanup_.size(); ++i) 00488 timer_queues_for_cleanup_[i]->complete_timers(); 00489 } 00490 00491 // Mutex to protect access to internal data. 00492 asio::detail::mutex mutex_; 00493 00494 // Whether the select loop is currently running or not. 00495 bool select_in_progress_; 00496 00497 // The interrupter is used to break a blocking select call. 00498 select_interrupter interrupter_; 00499 00500 // The queue of read operations. 00501 reactor_op_queue<socket_type> read_op_queue_; 00502 00503 // The queue of write operations. 00504 reactor_op_queue<socket_type> write_op_queue_; 00505 00506 // The queue of exception operations. 00507 reactor_op_queue<socket_type> except_op_queue_; 00508 00509 // The timer queues. 00510 std::vector<timer_queue_base*> timer_queues_; 00511 00512 // A copy of the timer queues, used when cleaning up timers. The copy is 00513 // stored as a class data member to avoid unnecessary memory allocation. 00514 std::vector<timer_queue_base*> timer_queues_for_cleanup_; 00515 00516 // The descriptors that are pending cancellation. 00517 std::vector<socket_type> pending_cancellations_; 00518 00519 // Does the reactor loop thread need to stop. 00520 bool stop_thread_; 00521 00522 // The thread that is running the reactor loop. 00523 asio::detail::thread* thread_; 00524 00525 // Whether the service has been shut down. 00526 bool shutdown_; 00527 }; 00528 00529 } // namespace detail 00530 } // namespace asio 00531 00532 #include "asio/detail/pop_options.hpp" 00533 00534 #endif // ASIO_DETAIL_SELECT_REACTOR_HPP