$search
00001 // 00002 // epoll_reactor.hpp 00003 // ~~~~~~~~~~~~~~~~~ 00004 // 00005 // Copyright (c) 2003-2008 Christopher M. Kohlhoff (chris at kohlhoff dot com) 00006 // 00007 // Distributed under the Boost Software License, Version 1.0. (See accompanying 00008 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) 00009 // 00010 00011 #ifndef ASIO_DETAIL_EPOLL_REACTOR_HPP 00012 #define ASIO_DETAIL_EPOLL_REACTOR_HPP 00013 00014 #if defined(_MSC_VER) && (_MSC_VER >= 1200) 00015 # pragma once 00016 #endif // defined(_MSC_VER) && (_MSC_VER >= 1200) 00017 00018 #include "asio/detail/push_options.hpp" 00019 00020 #include "asio/detail/epoll_reactor_fwd.hpp" 00021 00022 #if defined(ASIO_HAS_EPOLL) 00023 00024 #include "asio/detail/push_options.hpp" 00025 #include <cstddef> 00026 #include <vector> 00027 #include <sys/epoll.h> 00028 #include <boost/config.hpp> 00029 #include <boost/date_time/posix_time/posix_time_types.hpp> 00030 #include <boost/throw_exception.hpp> 00031 #include "asio/detail/pop_options.hpp" 00032 00033 #include "asio/error.hpp" 00034 #include "asio/io_service.hpp" 00035 #include "asio/system_error.hpp" 00036 #include "asio/detail/bind_handler.hpp" 00037 #include "asio/detail/hash_map.hpp" 00038 #include "asio/detail/mutex.hpp" 00039 #include "asio/detail/task_io_service.hpp" 00040 #include "asio/detail/thread.hpp" 00041 #include "asio/detail/reactor_op_queue.hpp" 00042 #include "asio/detail/select_interrupter.hpp" 00043 #include "asio/detail/service_base.hpp" 00044 #include "asio/detail/signal_blocker.hpp" 00045 #include "asio/detail/socket_types.hpp" 00046 #include "asio/detail/timer_queue.hpp" 00047 00048 namespace asio { 00049 namespace detail { 00050 00051 template <bool Own_Thread> 00052 class epoll_reactor 00053 : public asio::detail::service_base<epoll_reactor<Own_Thread> > 00054 { 00055 public: 00056 // Per-descriptor data. 00057 struct per_descriptor_data 00058 { 00059 bool allow_speculative_read; 00060 bool allow_speculative_write; 00061 }; 00062 00063 // Constructor. 00064 epoll_reactor(asio::io_service& io_service) 00065 : asio::detail::service_base<epoll_reactor<Own_Thread> >(io_service), 00066 mutex_(), 00067 epoll_fd_(do_epoll_create()), 00068 wait_in_progress_(false), 00069 interrupter_(), 00070 read_op_queue_(), 00071 write_op_queue_(), 00072 except_op_queue_(), 00073 pending_cancellations_(), 00074 stop_thread_(false), 00075 thread_(0), 00076 shutdown_(false), 00077 need_epoll_wait_(true) 00078 { 00079 // Start the reactor's internal thread only if needed. 00080 if (Own_Thread) 00081 { 00082 asio::detail::signal_blocker sb; 00083 thread_ = new asio::detail::thread( 00084 bind_handler(&epoll_reactor::call_run_thread, this)); 00085 } 00086 00087 // Add the interrupter's descriptor to epoll. 00088 epoll_event ev = { 0, { 0 } }; 00089 ev.events = EPOLLIN | EPOLLERR; 00090 ev.data.fd = interrupter_.read_descriptor(); 00091 epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, interrupter_.read_descriptor(), &ev); 00092 } 00093 00094 // Destructor. 00095 ~epoll_reactor() 00096 { 00097 shutdown_service(); 00098 close(epoll_fd_); 00099 } 00100 00101 // Destroy all user-defined handler objects owned by the service. 00102 void shutdown_service() 00103 { 00104 asio::detail::mutex::scoped_lock lock(mutex_); 00105 shutdown_ = true; 00106 stop_thread_ = true; 00107 lock.unlock(); 00108 00109 if (thread_) 00110 { 00111 interrupter_.interrupt(); 00112 thread_->join(); 00113 delete thread_; 00114 thread_ = 0; 00115 } 00116 00117 read_op_queue_.destroy_operations(); 00118 write_op_queue_.destroy_operations(); 00119 except_op_queue_.destroy_operations(); 00120 00121 for (std::size_t i = 0; i < timer_queues_.size(); ++i) 00122 timer_queues_[i]->destroy_timers(); 00123 timer_queues_.clear(); 00124 } 00125 00126 // Register a socket with the reactor. Returns 0 on success, system error 00127 // code on failure. 00128 int register_descriptor(socket_type descriptor, 00129 per_descriptor_data& descriptor_data) 00130 { 00131 // No need to lock according to epoll documentation. 00132 00133 descriptor_data.allow_speculative_read = true; 00134 descriptor_data.allow_speculative_write = true; 00135 00136 epoll_event ev = { 0, { 0 } }; 00137 ev.events = 0; 00138 ev.data.fd = descriptor; 00139 int result = epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, descriptor, &ev); 00140 if (result != 0) 00141 return errno; 00142 return 0; 00143 } 00144 00145 // Start a new read operation. The handler object will be invoked when the 00146 // given descriptor is ready to be read, or an error has occurred. 00147 template <typename Handler> 00148 void start_read_op(socket_type descriptor, 00149 per_descriptor_data& descriptor_data, 00150 Handler handler, bool allow_speculative_read = true) 00151 { 00152 if (allow_speculative_read && descriptor_data.allow_speculative_read) 00153 { 00154 asio::error_code ec; 00155 std::size_t bytes_transferred = 0; 00156 if (handler.perform(ec, bytes_transferred)) 00157 { 00158 handler.complete(ec, bytes_transferred); 00159 return; 00160 } 00161 00162 // We only get one shot at a speculative read in this function. 00163 allow_speculative_read = false; 00164 } 00165 00166 asio::detail::mutex::scoped_lock lock(mutex_); 00167 00168 if (shutdown_) 00169 return; 00170 00171 if (!allow_speculative_read) 00172 need_epoll_wait_ = true; 00173 else if (!read_op_queue_.has_operation(descriptor)) 00174 { 00175 // Speculative reads are ok as there are no queued read operations. 00176 descriptor_data.allow_speculative_read = true; 00177 00178 asio::error_code ec; 00179 std::size_t bytes_transferred = 0; 00180 if (handler.perform(ec, bytes_transferred)) 00181 { 00182 handler.complete(ec, bytes_transferred); 00183 return; 00184 } 00185 } 00186 00187 // Speculative reads are not ok as there will be queued read operations. 00188 descriptor_data.allow_speculative_read = false; 00189 00190 if (read_op_queue_.enqueue_operation(descriptor, handler)) 00191 { 00192 epoll_event ev = { 0, { 0 } }; 00193 ev.events = EPOLLIN | EPOLLERR | EPOLLHUP; 00194 if (write_op_queue_.has_operation(descriptor)) 00195 ev.events |= EPOLLOUT; 00196 if (except_op_queue_.has_operation(descriptor)) 00197 ev.events |= EPOLLPRI; 00198 ev.data.fd = descriptor; 00199 00200 int result = epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, descriptor, &ev); 00201 if (result != 0 && errno == ENOENT) 00202 result = epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, descriptor, &ev); 00203 if (result != 0) 00204 { 00205 asio::error_code ec(errno, 00206 asio::error::get_system_category()); 00207 read_op_queue_.perform_all_operations(descriptor, ec); 00208 } 00209 } 00210 } 00211 00212 // Start a new write operation. The handler object will be invoked when the 00213 // given descriptor is ready to be written, or an error has occurred. 00214 template <typename Handler> 00215 void start_write_op(socket_type descriptor, 00216 per_descriptor_data& descriptor_data, 00217 Handler handler, bool allow_speculative_write = true) 00218 { 00219 if (allow_speculative_write && descriptor_data.allow_speculative_write) 00220 { 00221 asio::error_code ec; 00222 std::size_t bytes_transferred = 0; 00223 if (handler.perform(ec, bytes_transferred)) 00224 { 00225 handler.complete(ec, bytes_transferred); 00226 return; 00227 } 00228 00229 // We only get one shot at a speculative write in this function. 00230 allow_speculative_write = false; 00231 } 00232 00233 asio::detail::mutex::scoped_lock lock(mutex_); 00234 00235 if (shutdown_) 00236 return; 00237 00238 if (!allow_speculative_write) 00239 need_epoll_wait_ = true; 00240 else if (!write_op_queue_.has_operation(descriptor)) 00241 { 00242 // Speculative writes are ok as there are no queued write operations. 00243 descriptor_data.allow_speculative_write = true; 00244 00245 asio::error_code ec; 00246 std::size_t bytes_transferred = 0; 00247 if (handler.perform(ec, bytes_transferred)) 00248 { 00249 handler.complete(ec, bytes_transferred); 00250 return; 00251 } 00252 } 00253 00254 // Speculative writes are not ok as there will be queued write operations. 00255 descriptor_data.allow_speculative_write = false; 00256 00257 if (write_op_queue_.enqueue_operation(descriptor, handler)) 00258 { 00259 epoll_event ev = { 0, { 0 } }; 00260 ev.events = EPOLLOUT | EPOLLERR | EPOLLHUP; 00261 if (read_op_queue_.has_operation(descriptor)) 00262 ev.events |= EPOLLIN; 00263 if (except_op_queue_.has_operation(descriptor)) 00264 ev.events |= EPOLLPRI; 00265 ev.data.fd = descriptor; 00266 00267 int result = epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, descriptor, &ev); 00268 if (result != 0 && errno == ENOENT) 00269 result = epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, descriptor, &ev); 00270 if (result != 0) 00271 { 00272 asio::error_code ec(errno, 00273 asio::error::get_system_category()); 00274 write_op_queue_.perform_all_operations(descriptor, ec); 00275 } 00276 } 00277 } 00278 00279 // Start a new exception operation. The handler object will be invoked when 00280 // the given descriptor has exception information, or an error has occurred. 00281 template <typename Handler> 00282 void start_except_op(socket_type descriptor, 00283 per_descriptor_data&, Handler handler) 00284 { 00285 asio::detail::mutex::scoped_lock lock(mutex_); 00286 00287 if (shutdown_) 00288 return; 00289 00290 if (except_op_queue_.enqueue_operation(descriptor, handler)) 00291 { 00292 epoll_event ev = { 0, { 0 } }; 00293 ev.events = EPOLLPRI | EPOLLERR | EPOLLHUP; 00294 if (read_op_queue_.has_operation(descriptor)) 00295 ev.events |= EPOLLIN; 00296 if (write_op_queue_.has_operation(descriptor)) 00297 ev.events |= EPOLLOUT; 00298 ev.data.fd = descriptor; 00299 00300 int result = epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, descriptor, &ev); 00301 if (result != 0 && errno == ENOENT) 00302 result = epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, descriptor, &ev); 00303 if (result != 0) 00304 { 00305 asio::error_code ec(errno, 00306 asio::error::get_system_category()); 00307 except_op_queue_.perform_all_operations(descriptor, ec); 00308 } 00309 } 00310 } 00311 00312 // Start a new write operation. The handler object will be invoked when the 00313 // given descriptor is ready for writing or an error has occurred. Speculative 00314 // writes are not allowed. 00315 template <typename Handler> 00316 void start_connect_op(socket_type descriptor, 00317 per_descriptor_data& descriptor_data, Handler handler) 00318 { 00319 asio::detail::mutex::scoped_lock lock(mutex_); 00320 00321 if (shutdown_) 00322 return; 00323 00324 // Speculative writes are not ok as there will be queued write operations. 00325 descriptor_data.allow_speculative_write = false; 00326 00327 if (write_op_queue_.enqueue_operation(descriptor, handler)) 00328 { 00329 epoll_event ev = { 0, { 0 } }; 00330 ev.events = EPOLLOUT | EPOLLERR | EPOLLHUP; 00331 if (read_op_queue_.has_operation(descriptor)) 00332 ev.events |= EPOLLIN; 00333 if (except_op_queue_.has_operation(descriptor)) 00334 ev.events |= EPOLLPRI; 00335 ev.data.fd = descriptor; 00336 00337 int result = epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, descriptor, &ev); 00338 if (result != 0 && errno == ENOENT) 00339 result = epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, descriptor, &ev); 00340 if (result != 0) 00341 { 00342 asio::error_code ec(errno, 00343 asio::error::get_system_category()); 00344 write_op_queue_.perform_all_operations(descriptor, ec); 00345 } 00346 } 00347 } 00348 00349 // Cancel all operations associated with the given descriptor. The 00350 // handlers associated with the descriptor will be invoked with the 00351 // operation_aborted error. 00352 void cancel_ops(socket_type descriptor, per_descriptor_data&) 00353 { 00354 asio::detail::mutex::scoped_lock lock(mutex_); 00355 cancel_ops_unlocked(descriptor); 00356 } 00357 00358 // Cancel any operations that are running against the descriptor and remove 00359 // its registration from the reactor. 00360 void close_descriptor(socket_type descriptor, per_descriptor_data&) 00361 { 00362 asio::detail::mutex::scoped_lock lock(mutex_); 00363 00364 // Remove the descriptor from epoll. 00365 epoll_event ev = { 0, { 0 } }; 00366 epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, descriptor, &ev); 00367 00368 // Cancel any outstanding operations associated with the descriptor. 00369 cancel_ops_unlocked(descriptor); 00370 } 00371 00372 // Add a new timer queue to the reactor. 00373 template <typename Time_Traits> 00374 void add_timer_queue(timer_queue<Time_Traits>& timer_queue) 00375 { 00376 asio::detail::mutex::scoped_lock lock(mutex_); 00377 timer_queues_.push_back(&timer_queue); 00378 } 00379 00380 // Remove a timer queue from the reactor. 00381 template <typename Time_Traits> 00382 void remove_timer_queue(timer_queue<Time_Traits>& timer_queue) 00383 { 00384 asio::detail::mutex::scoped_lock lock(mutex_); 00385 for (std::size_t i = 0; i < timer_queues_.size(); ++i) 00386 { 00387 if (timer_queues_[i] == &timer_queue) 00388 { 00389 timer_queues_.erase(timer_queues_.begin() + i); 00390 return; 00391 } 00392 } 00393 } 00394 00395 // Schedule a timer in the given timer queue to expire at the specified 00396 // absolute time. The handler object will be invoked when the timer expires. 00397 template <typename Time_Traits, typename Handler> 00398 void schedule_timer(timer_queue<Time_Traits>& timer_queue, 00399 const typename Time_Traits::time_type& time, Handler handler, void* token) 00400 { 00401 asio::detail::mutex::scoped_lock lock(mutex_); 00402 if (!shutdown_) 00403 if (timer_queue.enqueue_timer(time, handler, token)) 00404 interrupter_.interrupt(); 00405 } 00406 00407 // Cancel the timer associated with the given token. Returns the number of 00408 // handlers that have been posted or dispatched. 00409 template <typename Time_Traits> 00410 std::size_t cancel_timer(timer_queue<Time_Traits>& timer_queue, void* token) 00411 { 00412 asio::detail::mutex::scoped_lock lock(mutex_); 00413 std::size_t n = timer_queue.cancel_timer(token); 00414 if (n > 0) 00415 interrupter_.interrupt(); 00416 return n; 00417 } 00418 00419 private: 00420 friend class task_io_service<epoll_reactor<Own_Thread> >; 00421 00422 // Run epoll once until interrupted or events are ready to be dispatched. 00423 void run(bool block) 00424 { 00425 asio::detail::mutex::scoped_lock lock(mutex_); 00426 00427 // Dispatch any operation cancellations that were made while the select 00428 // loop was not running. 00429 read_op_queue_.perform_cancellations(); 00430 write_op_queue_.perform_cancellations(); 00431 except_op_queue_.perform_cancellations(); 00432 for (std::size_t i = 0; i < timer_queues_.size(); ++i) 00433 timer_queues_[i]->dispatch_cancellations(); 00434 00435 // Check if the thread is supposed to stop. 00436 if (stop_thread_) 00437 { 00438 complete_operations_and_timers(lock); 00439 return; 00440 } 00441 00442 // We can return immediately if there's no work to do and the reactor is 00443 // not supposed to block. 00444 if (!block && read_op_queue_.empty() && write_op_queue_.empty() 00445 && except_op_queue_.empty() && all_timer_queues_are_empty()) 00446 { 00447 complete_operations_and_timers(lock); 00448 return; 00449 } 00450 00451 int timeout = block ? get_timeout() : 0; 00452 wait_in_progress_ = true; 00453 lock.unlock(); 00454 00455 // Block on the epoll descriptor. 00456 epoll_event events[128]; 00457 int num_events = (block || need_epoll_wait_) 00458 ? epoll_wait(epoll_fd_, events, 128, timeout) 00459 : 0; 00460 00461 lock.lock(); 00462 wait_in_progress_ = false; 00463 00464 // Block signals while performing operations. 00465 asio::detail::signal_blocker sb; 00466 00467 // Dispatch the waiting events. 00468 for (int i = 0; i < num_events; ++i) 00469 { 00470 int descriptor = events[i].data.fd; 00471 if (descriptor == interrupter_.read_descriptor()) 00472 { 00473 interrupter_.reset(); 00474 } 00475 else 00476 { 00477 bool more_reads = false; 00478 bool more_writes = false; 00479 bool more_except = false; 00480 asio::error_code ec; 00481 00482 // Exception operations must be processed first to ensure that any 00483 // out-of-band data is read before normal data. 00484 if (events[i].events & (EPOLLPRI | EPOLLERR | EPOLLHUP)) 00485 more_except = except_op_queue_.perform_operation(descriptor, ec); 00486 else 00487 more_except = except_op_queue_.has_operation(descriptor); 00488 00489 if (events[i].events & (EPOLLIN | EPOLLERR | EPOLLHUP)) 00490 more_reads = read_op_queue_.perform_operation(descriptor, ec); 00491 else 00492 more_reads = read_op_queue_.has_operation(descriptor); 00493 00494 if (events[i].events & (EPOLLOUT | EPOLLERR | EPOLLHUP)) 00495 more_writes = write_op_queue_.perform_operation(descriptor, ec); 00496 else 00497 more_writes = write_op_queue_.has_operation(descriptor); 00498 00499 if ((events[i].events & (EPOLLERR | EPOLLHUP)) != 0 00500 && (events[i].events & ~(EPOLLERR | EPOLLHUP)) == 0 00501 && !more_except && !more_reads && !more_writes) 00502 { 00503 // If we have an event and no operations associated with the 00504 // descriptor then we need to delete the descriptor from epoll. The 00505 // epoll_wait system call can produce EPOLLHUP or EPOLLERR events 00506 // when there is no operation pending, so if we do not remove the 00507 // descriptor we can end up in a tight loop of repeated 00508 // calls to epoll_wait. 00509 epoll_event ev = { 0, { 0 } }; 00510 epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, descriptor, &ev); 00511 } 00512 else 00513 { 00514 epoll_event ev = { 0, { 0 } }; 00515 ev.events = EPOLLERR | EPOLLHUP; 00516 if (more_reads) 00517 ev.events |= EPOLLIN; 00518 if (more_writes) 00519 ev.events |= EPOLLOUT; 00520 if (more_except) 00521 ev.events |= EPOLLPRI; 00522 ev.data.fd = descriptor; 00523 int result = epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, descriptor, &ev); 00524 if (result != 0 && errno == ENOENT) 00525 result = epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, descriptor, &ev); 00526 if (result != 0) 00527 { 00528 ec = asio::error_code(errno, 00529 asio::error::get_system_category()); 00530 read_op_queue_.perform_all_operations(descriptor, ec); 00531 write_op_queue_.perform_all_operations(descriptor, ec); 00532 except_op_queue_.perform_all_operations(descriptor, ec); 00533 } 00534 } 00535 } 00536 } 00537 read_op_queue_.perform_cancellations(); 00538 write_op_queue_.perform_cancellations(); 00539 except_op_queue_.perform_cancellations(); 00540 for (std::size_t i = 0; i < timer_queues_.size(); ++i) 00541 { 00542 timer_queues_[i]->dispatch_timers(); 00543 timer_queues_[i]->dispatch_cancellations(); 00544 } 00545 00546 // Issue any pending cancellations. 00547 for (size_t i = 0; i < pending_cancellations_.size(); ++i) 00548 cancel_ops_unlocked(pending_cancellations_[i]); 00549 pending_cancellations_.clear(); 00550 00551 // Determine whether epoll_wait should be called when the reactor next runs. 00552 need_epoll_wait_ = !read_op_queue_.empty() 00553 || !write_op_queue_.empty() || !except_op_queue_.empty(); 00554 00555 complete_operations_and_timers(lock); 00556 } 00557 00558 // Run the select loop in the thread. 00559 void run_thread() 00560 { 00561 asio::detail::mutex::scoped_lock lock(mutex_); 00562 while (!stop_thread_) 00563 { 00564 lock.unlock(); 00565 run(true); 00566 lock.lock(); 00567 } 00568 } 00569 00570 // Entry point for the select loop thread. 00571 static void call_run_thread(epoll_reactor* reactor) 00572 { 00573 reactor->run_thread(); 00574 } 00575 00576 // Interrupt the select loop. 00577 void interrupt() 00578 { 00579 interrupter_.interrupt(); 00580 } 00581 00582 // The hint to pass to epoll_create to size its data structures. 00583 enum { epoll_size = 20000 }; 00584 00585 // Create the epoll file descriptor. Throws an exception if the descriptor 00586 // cannot be created. 00587 static int do_epoll_create() 00588 { 00589 int fd = epoll_create(epoll_size); 00590 if (fd == -1) 00591 { 00592 boost::throw_exception( 00593 asio::system_error( 00594 asio::error_code(errno, 00595 asio::error::get_system_category()), 00596 "epoll")); 00597 } 00598 return fd; 00599 } 00600 00601 // Check if all timer queues are empty. 00602 bool all_timer_queues_are_empty() const 00603 { 00604 for (std::size_t i = 0; i < timer_queues_.size(); ++i) 00605 if (!timer_queues_[i]->empty()) 00606 return false; 00607 return true; 00608 } 00609 00610 // Get the timeout value for the epoll_wait call. The timeout value is 00611 // returned as a number of milliseconds. A return value of -1 indicates 00612 // that epoll_wait should block indefinitely. 00613 int get_timeout() 00614 { 00615 if (all_timer_queues_are_empty()) 00616 return -1; 00617 00618 // By default we will wait no longer than 5 minutes. This will ensure that 00619 // any changes to the system clock are detected after no longer than this. 00620 boost::posix_time::time_duration minimum_wait_duration 00621 = boost::posix_time::minutes(5); 00622 00623 for (std::size_t i = 0; i < timer_queues_.size(); ++i) 00624 { 00625 boost::posix_time::time_duration wait_duration 00626 = timer_queues_[i]->wait_duration(); 00627 if (wait_duration < minimum_wait_duration) 00628 minimum_wait_duration = wait_duration; 00629 } 00630 00631 if (minimum_wait_duration > boost::posix_time::time_duration()) 00632 { 00633 int milliseconds = minimum_wait_duration.total_milliseconds(); 00634 return milliseconds > 0 ? milliseconds : 1; 00635 } 00636 else 00637 { 00638 return 0; 00639 } 00640 } 00641 00642 // Cancel all operations associated with the given descriptor. The do_cancel 00643 // function of the handler objects will be invoked. This function does not 00644 // acquire the epoll_reactor's mutex. 00645 void cancel_ops_unlocked(socket_type descriptor) 00646 { 00647 bool interrupt = read_op_queue_.cancel_operations(descriptor); 00648 interrupt = write_op_queue_.cancel_operations(descriptor) || interrupt; 00649 interrupt = except_op_queue_.cancel_operations(descriptor) || interrupt; 00650 if (interrupt) 00651 interrupter_.interrupt(); 00652 } 00653 00654 // Clean up operations and timers. We must not hold the lock since the 00655 // destructors may make calls back into this reactor. We make a copy of the 00656 // vector of timer queues since the original may be modified while the lock 00657 // is not held. 00658 void complete_operations_and_timers( 00659 asio::detail::mutex::scoped_lock& lock) 00660 { 00661 timer_queues_for_cleanup_ = timer_queues_; 00662 lock.unlock(); 00663 read_op_queue_.complete_operations(); 00664 write_op_queue_.complete_operations(); 00665 except_op_queue_.complete_operations(); 00666 for (std::size_t i = 0; i < timer_queues_for_cleanup_.size(); ++i) 00667 timer_queues_for_cleanup_[i]->complete_timers(); 00668 } 00669 00670 // Mutex to protect access to internal data. 00671 asio::detail::mutex mutex_; 00672 00673 // The epoll file descriptor. 00674 int epoll_fd_; 00675 00676 // Whether the epoll_wait call is currently in progress 00677 bool wait_in_progress_; 00678 00679 // The interrupter is used to break a blocking epoll_wait call. 00680 select_interrupter interrupter_; 00681 00682 // The queue of read operations. 00683 reactor_op_queue<socket_type> read_op_queue_; 00684 00685 // The queue of write operations. 00686 reactor_op_queue<socket_type> write_op_queue_; 00687 00688 // The queue of except operations. 00689 reactor_op_queue<socket_type> except_op_queue_; 00690 00691 // The timer queues. 00692 std::vector<timer_queue_base*> timer_queues_; 00693 00694 // A copy of the timer queues, used when cleaning up timers. The copy is 00695 // stored as a class data member to avoid unnecessary memory allocation. 00696 std::vector<timer_queue_base*> timer_queues_for_cleanup_; 00697 00698 // The descriptors that are pending cancellation. 00699 std::vector<socket_type> pending_cancellations_; 00700 00701 // Does the reactor loop thread need to stop. 00702 bool stop_thread_; 00703 00704 // The thread that is running the reactor loop. 00705 asio::detail::thread* thread_; 00706 00707 // Whether the service has been shut down. 00708 bool shutdown_; 00709 00710 // Whether we need to call epoll_wait the next time the reactor is run. 00711 bool need_epoll_wait_; 00712 }; 00713 00714 } // namespace detail 00715 } // namespace asio 00716 00717 #endif // defined(ASIO_HAS_EPOLL) 00718 00719 #include "asio/detail/pop_options.hpp" 00720 00721 #endif // ASIO_DETAIL_EPOLL_REACTOR_HPP