$search
00001 // 00002 // kqueue_reactor.hpp 00003 // ~~~~~~~~~~~~~~~~~~ 00004 // 00005 // Copyright (c) 2003-2008 Christopher M. Kohlhoff (chris at kohlhoff dot com) 00006 // Copyright (c) 2005 Stefan Arentz (stefan at soze dot com) 00007 // 00008 // Distributed under the Boost Software License, Version 1.0. (See accompanying 00009 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) 00010 // 00011 00012 #ifndef ASIO_DETAIL_KQUEUE_REACTOR_HPP 00013 #define ASIO_DETAIL_KQUEUE_REACTOR_HPP 00014 00015 #if defined(_MSC_VER) && (_MSC_VER >= 1200) 00016 # pragma once 00017 #endif // defined(_MSC_VER) && (_MSC_VER >= 1200) 00018 00019 #include "asio/detail/push_options.hpp" 00020 00021 #include "asio/detail/kqueue_reactor_fwd.hpp" 00022 00023 #if defined(ASIO_HAS_KQUEUE) 00024 00025 #include "asio/detail/push_options.hpp" 00026 #include <cstddef> 00027 #include <vector> 00028 #include <sys/types.h> 00029 #include <sys/event.h> 00030 #include <sys/time.h> 00031 #include <boost/config.hpp> 00032 #include <boost/date_time/posix_time/posix_time_types.hpp> 00033 #include <boost/throw_exception.hpp> 00034 #include "asio/detail/pop_options.hpp" 00035 00036 #include "asio/error.hpp" 00037 #include "asio/io_service.hpp" 00038 #include "asio/system_error.hpp" 00039 #include "asio/detail/bind_handler.hpp" 00040 #include "asio/detail/mutex.hpp" 00041 #include "asio/detail/task_io_service.hpp" 00042 #include "asio/detail/thread.hpp" 00043 #include "asio/detail/reactor_op_queue.hpp" 00044 #include "asio/detail/select_interrupter.hpp" 00045 #include "asio/detail/service_base.hpp" 00046 #include "asio/detail/signal_blocker.hpp" 00047 #include "asio/detail/socket_types.hpp" 00048 #include "asio/detail/timer_queue.hpp" 00049 00050 // Older versions of Mac OS X may not define EV_OOBAND. 00051 #if !defined(EV_OOBAND) 00052 # define EV_OOBAND EV_FLAG1 00053 #endif // !defined(EV_OOBAND) 00054 00055 namespace asio { 00056 namespace detail { 00057 00058 template <bool Own_Thread> 00059 class kqueue_reactor 00060 : public asio::detail::service_base<kqueue_reactor<Own_Thread> > 00061 { 00062 public: 00063 // Per-descriptor data. 00064 struct per_descriptor_data 00065 { 00066 bool allow_speculative_read; 00067 bool allow_speculative_write; 00068 }; 00069 00070 // Constructor. 00071 kqueue_reactor(asio::io_service& io_service) 00072 : asio::detail::service_base< 00073 kqueue_reactor<Own_Thread> >(io_service), 00074 mutex_(), 00075 kqueue_fd_(do_kqueue_create()), 00076 wait_in_progress_(false), 00077 interrupter_(), 00078 read_op_queue_(), 00079 write_op_queue_(), 00080 except_op_queue_(), 00081 pending_cancellations_(), 00082 stop_thread_(false), 00083 thread_(0), 00084 shutdown_(false), 00085 need_kqueue_wait_(true) 00086 { 00087 // Start the reactor's internal thread only if needed. 00088 if (Own_Thread) 00089 { 00090 asio::detail::signal_blocker sb; 00091 thread_ = new asio::detail::thread( 00092 bind_handler(&kqueue_reactor::call_run_thread, this)); 00093 } 00094 00095 // Add the interrupter's descriptor to the kqueue. 00096 struct kevent event; 00097 EV_SET(&event, interrupter_.read_descriptor(), 00098 EVFILT_READ, EV_ADD, 0, 0, 0); 00099 ::kevent(kqueue_fd_, &event, 1, 0, 0, 0); 00100 } 00101 00102 // Destructor. 00103 ~kqueue_reactor() 00104 { 00105 shutdown_service(); 00106 close(kqueue_fd_); 00107 } 00108 00109 // Destroy all user-defined handler objects owned by the service. 00110 void shutdown_service() 00111 { 00112 asio::detail::mutex::scoped_lock lock(mutex_); 00113 shutdown_ = true; 00114 stop_thread_ = true; 00115 lock.unlock(); 00116 00117 if (thread_) 00118 { 00119 interrupter_.interrupt(); 00120 thread_->join(); 00121 delete thread_; 00122 thread_ = 0; 00123 } 00124 00125 read_op_queue_.destroy_operations(); 00126 write_op_queue_.destroy_operations(); 00127 except_op_queue_.destroy_operations(); 00128 00129 for (std::size_t i = 0; i < timer_queues_.size(); ++i) 00130 timer_queues_[i]->destroy_timers(); 00131 timer_queues_.clear(); 00132 } 00133 00134 // Register a socket with the reactor. Returns 0 on success, system error 00135 // code on failure. 00136 int register_descriptor(socket_type, per_descriptor_data& descriptor_data) 00137 { 00138 descriptor_data.allow_speculative_read = true; 00139 descriptor_data.allow_speculative_write = true; 00140 00141 return 0; 00142 } 00143 00144 // Start a new read operation. The handler object will be invoked when the 00145 // given descriptor is ready to be read, or an error has occurred. 00146 template <typename Handler> 00147 void start_read_op(socket_type descriptor, 00148 per_descriptor_data& descriptor_data, Handler handler, 00149 bool allow_speculative_read = true) 00150 { 00151 if (allow_speculative_read && descriptor_data.allow_speculative_read) 00152 { 00153 asio::error_code ec; 00154 std::size_t bytes_transferred = 0; 00155 if (handler.perform(ec, bytes_transferred)) 00156 { 00157 handler.complete(ec, bytes_transferred); 00158 return; 00159 } 00160 00161 // We only get one shot at a speculative read in this function. 00162 allow_speculative_read = false; 00163 } 00164 00165 asio::detail::mutex::scoped_lock lock(mutex_); 00166 00167 if (shutdown_) 00168 return; 00169 00170 if (!allow_speculative_read) 00171 need_kqueue_wait_ = true; 00172 else if (!read_op_queue_.has_operation(descriptor)) 00173 { 00174 // Speculative reads are ok as there are no queued read operations. 00175 descriptor_data.allow_speculative_read = true; 00176 00177 asio::error_code ec; 00178 std::size_t bytes_transferred = 0; 00179 if (handler.perform(ec, bytes_transferred)) 00180 { 00181 handler.complete(ec, bytes_transferred); 00182 return; 00183 } 00184 } 00185 00186 // Speculative reads are not ok as there will be queued read operations. 00187 descriptor_data.allow_speculative_read = false; 00188 00189 if (read_op_queue_.enqueue_operation(descriptor, handler)) 00190 { 00191 struct kevent event; 00192 EV_SET(&event, descriptor, EVFILT_READ, EV_ADD, 0, 0, 0); 00193 if (::kevent(kqueue_fd_, &event, 1, 0, 0, 0) == -1) 00194 { 00195 asio::error_code ec(errno, 00196 asio::error::get_system_category()); 00197 read_op_queue_.perform_all_operations(descriptor, ec); 00198 } 00199 } 00200 } 00201 00202 // Start a new write operation. The handler object will be invoked when the 00203 // given descriptor is ready to be written, or an error has occurred. 00204 template <typename Handler> 00205 void start_write_op(socket_type descriptor, 00206 per_descriptor_data& descriptor_data, Handler handler, 00207 bool allow_speculative_write = true) 00208 { 00209 if (allow_speculative_write && descriptor_data.allow_speculative_write) 00210 { 00211 asio::error_code ec; 00212 std::size_t bytes_transferred = 0; 00213 if (handler.perform(ec, bytes_transferred)) 00214 { 00215 handler.complete(ec, bytes_transferred); 00216 return; 00217 } 00218 00219 // We only get one shot at a speculative write in this function. 00220 allow_speculative_write = false; 00221 } 00222 00223 asio::detail::mutex::scoped_lock lock(mutex_); 00224 00225 if (shutdown_) 00226 return; 00227 00228 if (!allow_speculative_write) 00229 need_kqueue_wait_ = true; 00230 else if (!write_op_queue_.has_operation(descriptor)) 00231 { 00232 // Speculative writes are ok as there are no queued write operations. 00233 descriptor_data.allow_speculative_write = true; 00234 00235 asio::error_code ec; 00236 std::size_t bytes_transferred = 0; 00237 if (handler.perform(ec, bytes_transferred)) 00238 { 00239 handler.complete(ec, bytes_transferred); 00240 return; 00241 } 00242 } 00243 00244 // Speculative writes are not ok as there will be queued write operations. 00245 descriptor_data.allow_speculative_write = false; 00246 00247 if (write_op_queue_.enqueue_operation(descriptor, handler)) 00248 { 00249 struct kevent event; 00250 EV_SET(&event, descriptor, EVFILT_WRITE, EV_ADD, 0, 0, 0); 00251 if (::kevent(kqueue_fd_, &event, 1, 0, 0, 0) == -1) 00252 { 00253 asio::error_code ec(errno, 00254 asio::error::get_system_category()); 00255 write_op_queue_.perform_all_operations(descriptor, ec); 00256 } 00257 } 00258 } 00259 00260 // Start a new exception operation. The handler object will be invoked when 00261 // the given descriptor has exception information, or an error has occurred. 00262 template <typename Handler> 00263 void start_except_op(socket_type descriptor, 00264 per_descriptor_data&, Handler handler) 00265 { 00266 asio::detail::mutex::scoped_lock lock(mutex_); 00267 00268 if (shutdown_) 00269 return; 00270 00271 if (except_op_queue_.enqueue_operation(descriptor, handler)) 00272 { 00273 struct kevent event; 00274 if (read_op_queue_.has_operation(descriptor)) 00275 EV_SET(&event, descriptor, EVFILT_READ, EV_ADD, 0, 0, 0); 00276 else 00277 EV_SET(&event, descriptor, EVFILT_READ, EV_ADD, EV_OOBAND, 0, 0); 00278 if (::kevent(kqueue_fd_, &event, 1, 0, 0, 0) == -1) 00279 { 00280 asio::error_code ec(errno, 00281 asio::error::get_system_category()); 00282 except_op_queue_.perform_all_operations(descriptor, ec); 00283 } 00284 } 00285 } 00286 00287 // Start a new write operation. The handler object will be invoked when the 00288 // given descriptor is ready to be written, or an error has occurred. 00289 template <typename Handler> 00290 void start_connect_op(socket_type descriptor, 00291 per_descriptor_data& descriptor_data, Handler handler) 00292 { 00293 asio::detail::mutex::scoped_lock lock(mutex_); 00294 00295 if (shutdown_) 00296 return; 00297 00298 // Speculative writes are not ok as there will be queued write operations. 00299 descriptor_data.allow_speculative_write = false; 00300 00301 if (write_op_queue_.enqueue_operation(descriptor, handler)) 00302 { 00303 struct kevent event; 00304 EV_SET(&event, descriptor, EVFILT_WRITE, EV_ADD, 0, 0, 0); 00305 if (::kevent(kqueue_fd_, &event, 1, 0, 0, 0) == -1) 00306 { 00307 asio::error_code ec(errno, 00308 asio::error::get_system_category()); 00309 write_op_queue_.perform_all_operations(descriptor, ec); 00310 } 00311 } 00312 } 00313 00314 // Cancel all operations associated with the given descriptor. The 00315 // handlers associated with the descriptor will be invoked with the 00316 // operation_aborted error. 00317 void cancel_ops(socket_type descriptor, per_descriptor_data&) 00318 { 00319 asio::detail::mutex::scoped_lock lock(mutex_); 00320 cancel_ops_unlocked(descriptor); 00321 } 00322 00323 // Cancel any operations that are running against the descriptor and remove 00324 // its registration from the reactor. 00325 void close_descriptor(socket_type descriptor, per_descriptor_data&) 00326 { 00327 asio::detail::mutex::scoped_lock lock(mutex_); 00328 00329 // Remove the descriptor from kqueue. 00330 struct kevent event[2]; 00331 EV_SET(&event[0], descriptor, EVFILT_READ, EV_DELETE, 0, 0, 0); 00332 EV_SET(&event[1], descriptor, EVFILT_WRITE, EV_DELETE, 0, 0, 0); 00333 ::kevent(kqueue_fd_, event, 2, 0, 0, 0); 00334 00335 // Cancel any outstanding operations associated with the descriptor. 00336 cancel_ops_unlocked(descriptor); 00337 } 00338 00339 // Add a new timer queue to the reactor. 00340 template <typename Time_Traits> 00341 void add_timer_queue(timer_queue<Time_Traits>& timer_queue) 00342 { 00343 asio::detail::mutex::scoped_lock lock(mutex_); 00344 timer_queues_.push_back(&timer_queue); 00345 } 00346 00347 // Remove a timer queue from the reactor. 00348 template <typename Time_Traits> 00349 void remove_timer_queue(timer_queue<Time_Traits>& timer_queue) 00350 { 00351 asio::detail::mutex::scoped_lock lock(mutex_); 00352 for (std::size_t i = 0; i < timer_queues_.size(); ++i) 00353 { 00354 if (timer_queues_[i] == &timer_queue) 00355 { 00356 timer_queues_.erase(timer_queues_.begin() + i); 00357 return; 00358 } 00359 } 00360 } 00361 00362 // Schedule a timer in the given timer queue to expire at the specified 00363 // absolute time. The handler object will be invoked when the timer expires. 00364 template <typename Time_Traits, typename Handler> 00365 void schedule_timer(timer_queue<Time_Traits>& timer_queue, 00366 const typename Time_Traits::time_type& time, Handler handler, void* token) 00367 { 00368 asio::detail::mutex::scoped_lock lock(mutex_); 00369 if (!shutdown_) 00370 if (timer_queue.enqueue_timer(time, handler, token)) 00371 interrupter_.interrupt(); 00372 } 00373 00374 // Cancel the timer associated with the given token. Returns the number of 00375 // handlers that have been posted or dispatched. 00376 template <typename Time_Traits> 00377 std::size_t cancel_timer(timer_queue<Time_Traits>& timer_queue, void* token) 00378 { 00379 asio::detail::mutex::scoped_lock lock(mutex_); 00380 std::size_t n = timer_queue.cancel_timer(token); 00381 if (n > 0) 00382 interrupter_.interrupt(); 00383 return n; 00384 } 00385 00386 private: 00387 friend class task_io_service<kqueue_reactor<Own_Thread> >; 00388 00389 // Run the kqueue loop. 00390 void run(bool block) 00391 { 00392 asio::detail::mutex::scoped_lock lock(mutex_); 00393 00394 // Dispatch any operation cancellations that were made while the select 00395 // loop was not running. 00396 read_op_queue_.perform_cancellations(); 00397 write_op_queue_.perform_cancellations(); 00398 except_op_queue_.perform_cancellations(); 00399 for (std::size_t i = 0; i < timer_queues_.size(); ++i) 00400 timer_queues_[i]->dispatch_cancellations(); 00401 00402 // Check if the thread is supposed to stop. 00403 if (stop_thread_) 00404 { 00405 complete_operations_and_timers(lock); 00406 return; 00407 } 00408 00409 // We can return immediately if there's no work to do and the reactor is 00410 // not supposed to block. 00411 if (!block && read_op_queue_.empty() && write_op_queue_.empty() 00412 && except_op_queue_.empty() && all_timer_queues_are_empty()) 00413 { 00414 complete_operations_and_timers(lock); 00415 return; 00416 } 00417 00418 // Determine how long to block while waiting for events. 00419 timespec timeout_buf = { 0, 0 }; 00420 timespec* timeout = block ? get_timeout(timeout_buf) : &timeout_buf; 00421 00422 wait_in_progress_ = true; 00423 lock.unlock(); 00424 00425 // Block on the kqueue descriptor. 00426 struct kevent events[128]; 00427 int num_events = (block || need_kqueue_wait_) 00428 ? kevent(kqueue_fd_, 0, 0, events, 128, timeout) 00429 : 0; 00430 00431 lock.lock(); 00432 wait_in_progress_ = false; 00433 00434 // Block signals while performing operations. 00435 asio::detail::signal_blocker sb; 00436 00437 // Dispatch the waiting events. 00438 for (int i = 0; i < num_events; ++i) 00439 { 00440 int descriptor = events[i].ident; 00441 if (descriptor == interrupter_.read_descriptor()) 00442 { 00443 interrupter_.reset(); 00444 } 00445 else if (events[i].filter == EVFILT_READ) 00446 { 00447 // Dispatch operations associated with the descriptor. 00448 bool more_reads = false; 00449 bool more_except = false; 00450 if (events[i].flags & EV_ERROR) 00451 { 00452 asio::error_code error( 00453 events[i].data, asio::error::get_system_category()); 00454 except_op_queue_.perform_all_operations(descriptor, error); 00455 read_op_queue_.perform_all_operations(descriptor, error); 00456 } 00457 else if (events[i].flags & EV_OOBAND) 00458 { 00459 asio::error_code error; 00460 more_except = except_op_queue_.perform_operation(descriptor, error); 00461 if (events[i].data > 0) 00462 more_reads = read_op_queue_.perform_operation(descriptor, error); 00463 else 00464 more_reads = read_op_queue_.has_operation(descriptor); 00465 } 00466 else 00467 { 00468 asio::error_code error; 00469 more_reads = read_op_queue_.perform_operation(descriptor, error); 00470 more_except = except_op_queue_.has_operation(descriptor); 00471 } 00472 00473 // Update the descriptor in the kqueue. 00474 struct kevent event; 00475 if (more_reads) 00476 EV_SET(&event, descriptor, EVFILT_READ, EV_ADD, 0, 0, 0); 00477 else if (more_except) 00478 EV_SET(&event, descriptor, EVFILT_READ, EV_ADD, EV_OOBAND, 0, 0); 00479 else 00480 EV_SET(&event, descriptor, EVFILT_READ, EV_DELETE, 0, 0, 0); 00481 if (::kevent(kqueue_fd_, &event, 1, 0, 0, 0) == -1) 00482 { 00483 asio::error_code error(errno, 00484 asio::error::get_system_category()); 00485 except_op_queue_.perform_all_operations(descriptor, error); 00486 read_op_queue_.perform_all_operations(descriptor, error); 00487 } 00488 } 00489 else if (events[i].filter == EVFILT_WRITE) 00490 { 00491 // Dispatch operations associated with the descriptor. 00492 bool more_writes = false; 00493 if (events[i].flags & EV_ERROR) 00494 { 00495 asio::error_code error( 00496 events[i].data, asio::error::get_system_category()); 00497 write_op_queue_.perform_all_operations(descriptor, error); 00498 } 00499 else 00500 { 00501 asio::error_code error; 00502 more_writes = write_op_queue_.perform_operation(descriptor, error); 00503 } 00504 00505 // Update the descriptor in the kqueue. 00506 struct kevent event; 00507 if (more_writes) 00508 EV_SET(&event, descriptor, EVFILT_WRITE, EV_ADD, 0, 0, 0); 00509 else 00510 EV_SET(&event, descriptor, EVFILT_WRITE, EV_DELETE, 0, 0, 0); 00511 if (::kevent(kqueue_fd_, &event, 1, 0, 0, 0) == -1) 00512 { 00513 asio::error_code error(errno, 00514 asio::error::get_system_category()); 00515 write_op_queue_.perform_all_operations(descriptor, error); 00516 } 00517 } 00518 } 00519 00520 read_op_queue_.perform_cancellations(); 00521 write_op_queue_.perform_cancellations(); 00522 except_op_queue_.perform_cancellations(); 00523 for (std::size_t i = 0; i < timer_queues_.size(); ++i) 00524 { 00525 timer_queues_[i]->dispatch_timers(); 00526 timer_queues_[i]->dispatch_cancellations(); 00527 } 00528 00529 // Issue any pending cancellations. 00530 for (std::size_t i = 0; i < pending_cancellations_.size(); ++i) 00531 cancel_ops_unlocked(pending_cancellations_[i]); 00532 pending_cancellations_.clear(); 00533 00534 // Determine whether kqueue needs to be called next time the reactor is run. 00535 need_kqueue_wait_ = !read_op_queue_.empty() 00536 || !write_op_queue_.empty() || !except_op_queue_.empty(); 00537 00538 complete_operations_and_timers(lock); 00539 } 00540 00541 // Run the select loop in the thread. 00542 void run_thread() 00543 { 00544 asio::detail::mutex::scoped_lock lock(mutex_); 00545 while (!stop_thread_) 00546 { 00547 lock.unlock(); 00548 run(true); 00549 lock.lock(); 00550 } 00551 } 00552 00553 // Entry point for the select loop thread. 00554 static void call_run_thread(kqueue_reactor* reactor) 00555 { 00556 reactor->run_thread(); 00557 } 00558 00559 // Interrupt the select loop. 00560 void interrupt() 00561 { 00562 interrupter_.interrupt(); 00563 } 00564 00565 // Create the kqueue file descriptor. Throws an exception if the descriptor 00566 // cannot be created. 00567 static int do_kqueue_create() 00568 { 00569 int fd = kqueue(); 00570 if (fd == -1) 00571 { 00572 boost::throw_exception( 00573 asio::system_error( 00574 asio::error_code(errno, 00575 asio::error::get_system_category()), 00576 "kqueue")); 00577 } 00578 return fd; 00579 } 00580 00581 // Check if all timer queues are empty. 00582 bool all_timer_queues_are_empty() const 00583 { 00584 for (std::size_t i = 0; i < timer_queues_.size(); ++i) 00585 if (!timer_queues_[i]->empty()) 00586 return false; 00587 return true; 00588 } 00589 00590 // Get the timeout value for the kevent call. 00591 timespec* get_timeout(timespec& ts) 00592 { 00593 if (all_timer_queues_are_empty()) 00594 return 0; 00595 00596 // By default we will wait no longer than 5 minutes. This will ensure that 00597 // any changes to the system clock are detected after no longer than this. 00598 boost::posix_time::time_duration minimum_wait_duration 00599 = boost::posix_time::minutes(5); 00600 00601 for (std::size_t i = 0; i < timer_queues_.size(); ++i) 00602 { 00603 boost::posix_time::time_duration wait_duration 00604 = timer_queues_[i]->wait_duration(); 00605 if (wait_duration < minimum_wait_duration) 00606 minimum_wait_duration = wait_duration; 00607 } 00608 00609 if (minimum_wait_duration > boost::posix_time::time_duration()) 00610 { 00611 ts.tv_sec = minimum_wait_duration.total_seconds(); 00612 ts.tv_nsec = minimum_wait_duration.total_nanoseconds() % 1000000000; 00613 } 00614 else 00615 { 00616 ts.tv_sec = 0; 00617 ts.tv_nsec = 0; 00618 } 00619 00620 return &ts; 00621 } 00622 00623 // Cancel all operations associated with the given descriptor. The do_cancel 00624 // function of the handler objects will be invoked. This function does not 00625 // acquire the kqueue_reactor's mutex. 00626 void cancel_ops_unlocked(socket_type descriptor) 00627 { 00628 bool interrupt = read_op_queue_.cancel_operations(descriptor); 00629 interrupt = write_op_queue_.cancel_operations(descriptor) || interrupt; 00630 interrupt = except_op_queue_.cancel_operations(descriptor) || interrupt; 00631 if (interrupt) 00632 interrupter_.interrupt(); 00633 } 00634 00635 // Clean up operations and timers. We must not hold the lock since the 00636 // destructors may make calls back into this reactor. We make a copy of the 00637 // vector of timer queues since the original may be modified while the lock 00638 // is not held. 00639 void complete_operations_and_timers( 00640 asio::detail::mutex::scoped_lock& lock) 00641 { 00642 timer_queues_for_cleanup_ = timer_queues_; 00643 lock.unlock(); 00644 read_op_queue_.complete_operations(); 00645 write_op_queue_.complete_operations(); 00646 except_op_queue_.complete_operations(); 00647 for (std::size_t i = 0; i < timer_queues_for_cleanup_.size(); ++i) 00648 timer_queues_for_cleanup_[i]->complete_timers(); 00649 } 00650 00651 // Mutex to protect access to internal data. 00652 asio::detail::mutex mutex_; 00653 00654 // The kqueue file descriptor. 00655 int kqueue_fd_; 00656 00657 // Whether the kqueue wait call is currently in progress 00658 bool wait_in_progress_; 00659 00660 // The interrupter is used to break a blocking kevent call. 00661 select_interrupter interrupter_; 00662 00663 // The queue of read operations. 00664 reactor_op_queue<socket_type> read_op_queue_; 00665 00666 // The queue of write operations. 00667 reactor_op_queue<socket_type> write_op_queue_; 00668 00669 // The queue of except operations. 00670 reactor_op_queue<socket_type> except_op_queue_; 00671 00672 // The timer queues. 00673 std::vector<timer_queue_base*> timer_queues_; 00674 00675 // A copy of the timer queues, used when cleaning up timers. The copy is 00676 // stored as a class data member to avoid unnecessary memory allocation. 00677 std::vector<timer_queue_base*> timer_queues_for_cleanup_; 00678 00679 // The descriptors that are pending cancellation. 00680 std::vector<socket_type> pending_cancellations_; 00681 00682 // Does the reactor loop thread need to stop. 00683 bool stop_thread_; 00684 00685 // The thread that is running the reactor loop. 00686 asio::detail::thread* thread_; 00687 00688 // Whether the service has been shut down. 00689 bool shutdown_; 00690 00691 // Whether we need to call kqueue the next time the reactor is run. 00692 bool need_kqueue_wait_; 00693 }; 00694 00695 } // namespace detail 00696 } // namespace asio 00697 00698 #endif // defined(ASIO_HAS_KQUEUE) 00699 00700 #include "asio/detail/pop_options.hpp" 00701 00702 #endif // ASIO_DETAIL_KQUEUE_REACTOR_HPP