epoll_reactor.hpp
Go to the documentation of this file.
00001 //
00002 // epoll_reactor.hpp
00003 // ~~~~~~~~~~~~~~~~~
00004 //
00005 // Copyright (c) 2003-2008 Christopher M. Kohlhoff (chris at kohlhoff dot com)
00006 //
00007 // Distributed under the Boost Software License, Version 1.0. (See accompanying
00008 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
00009 //
00010 
00011 #ifndef ASIO_DETAIL_EPOLL_REACTOR_HPP
00012 #define ASIO_DETAIL_EPOLL_REACTOR_HPP
00013 
00014 #if defined(_MSC_VER) && (_MSC_VER >= 1200)
00015 # pragma once
00016 #endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
00017 
00018 #include "asio/detail/push_options.hpp"
00019 
00020 #include "asio/detail/epoll_reactor_fwd.hpp"
00021 
00022 #if defined(ASIO_HAS_EPOLL)
00023 
00024 #include "asio/detail/push_options.hpp"
00025 #include <cstddef>
00026 #include <vector>
00027 #include <sys/epoll.h>
00028 #include <boost/config.hpp>
00029 #include <boost/date_time/posix_time/posix_time_types.hpp>
00030 #include <boost/throw_exception.hpp>
00031 #include "asio/detail/pop_options.hpp"
00032 
00033 #include "asio/error.hpp"
00034 #include "asio/io_service.hpp"
00035 #include "asio/system_error.hpp"
00036 #include "asio/detail/bind_handler.hpp"
00037 #include "asio/detail/hash_map.hpp"
00038 #include "asio/detail/mutex.hpp"
00039 #include "asio/detail/task_io_service.hpp"
00040 #include "asio/detail/thread.hpp"
00041 #include "asio/detail/reactor_op_queue.hpp"
00042 #include "asio/detail/select_interrupter.hpp"
00043 #include "asio/detail/service_base.hpp"
00044 #include "asio/detail/signal_blocker.hpp"
00045 #include "asio/detail/socket_types.hpp"
00046 #include "asio/detail/timer_queue.hpp"
00047 
00048 namespace asio {
00049 namespace detail {
00050 
00051 template <bool Own_Thread>
00052 class epoll_reactor
00053   : public asio::detail::service_base<epoll_reactor<Own_Thread> >
00054 {
00055 public:
00056   // Per-descriptor data.
00057   struct per_descriptor_data
00058   {
00059     bool allow_speculative_read;
00060     bool allow_speculative_write;
00061   };
00062 
00063   // Constructor.
00064   epoll_reactor(asio::io_service& io_service)
00065     : asio::detail::service_base<epoll_reactor<Own_Thread> >(io_service),
00066       mutex_(),
00067       epoll_fd_(do_epoll_create()),
00068       wait_in_progress_(false),
00069       interrupter_(),
00070       read_op_queue_(),
00071       write_op_queue_(),
00072       except_op_queue_(),
00073       pending_cancellations_(),
00074       stop_thread_(false),
00075       thread_(0),
00076       shutdown_(false),
00077       need_epoll_wait_(true)
00078   {
00079     // Start the reactor's internal thread only if needed.
00080     if (Own_Thread)
00081     {
00082       asio::detail::signal_blocker sb;
00083       thread_ = new asio::detail::thread(
00084           bind_handler(&epoll_reactor::call_run_thread, this));
00085     }
00086 
00087     // Add the interrupter's descriptor to epoll.
00088     epoll_event ev = { 0, { 0 } };
00089     ev.events = EPOLLIN | EPOLLERR;
00090     ev.data.fd = interrupter_.read_descriptor();
00091     epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, interrupter_.read_descriptor(), &ev);
00092   }
00093 
00094   // Destructor.
00095   ~epoll_reactor()
00096   {
00097     shutdown_service();
00098     close(epoll_fd_);
00099   }
00100 
00101   // Destroy all user-defined handler objects owned by the service.
00102   void shutdown_service()
00103   {
00104     asio::detail::mutex::scoped_lock lock(mutex_);
00105     shutdown_ = true;
00106     stop_thread_ = true;
00107     lock.unlock();
00108 
00109     if (thread_)
00110     {
00111       interrupter_.interrupt();
00112       thread_->join();
00113       delete thread_;
00114       thread_ = 0;
00115     }
00116 
00117     read_op_queue_.destroy_operations();
00118     write_op_queue_.destroy_operations();
00119     except_op_queue_.destroy_operations();
00120 
00121     for (std::size_t i = 0; i < timer_queues_.size(); ++i)
00122       timer_queues_[i]->destroy_timers();
00123     timer_queues_.clear();
00124   }
00125 
00126   // Register a socket with the reactor. Returns 0 on success, system error
00127   // code on failure.
00128   int register_descriptor(socket_type descriptor,
00129       per_descriptor_data& descriptor_data)
00130   {
00131     // No need to lock according to epoll documentation.
00132 
00133     descriptor_data.allow_speculative_read = true;
00134     descriptor_data.allow_speculative_write = true;
00135 
00136     epoll_event ev = { 0, { 0 } };
00137     ev.events = 0;
00138     ev.data.fd = descriptor;
00139     int result = epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, descriptor, &ev);
00140     if (result != 0)
00141       return errno;
00142     return 0;
00143   }
00144 
00145   // Start a new read operation. The handler object will be invoked when the
00146   // given descriptor is ready to be read, or an error has occurred.
00147   template <typename Handler>
00148   void start_read_op(socket_type descriptor,
00149       per_descriptor_data& descriptor_data,
00150       Handler handler, bool allow_speculative_read = true)
00151   {
00152     if (allow_speculative_read && descriptor_data.allow_speculative_read)
00153     {
00154       asio::error_code ec;
00155       std::size_t bytes_transferred = 0;
00156       if (handler.perform(ec, bytes_transferred))
00157       {
00158         handler.complete(ec, bytes_transferred);
00159         return;
00160       }
00161 
00162       // We only get one shot at a speculative read in this function.
00163       allow_speculative_read = false;
00164     }
00165 
00166     asio::detail::mutex::scoped_lock lock(mutex_);
00167 
00168     if (shutdown_)
00169       return;
00170 
00171     if (!allow_speculative_read)
00172       need_epoll_wait_ = true;
00173     else if (!read_op_queue_.has_operation(descriptor))
00174     {
00175       // Speculative reads are ok as there are no queued read operations.
00176       descriptor_data.allow_speculative_read = true;
00177 
00178       asio::error_code ec;
00179       std::size_t bytes_transferred = 0;
00180       if (handler.perform(ec, bytes_transferred))
00181       {
00182         handler.complete(ec, bytes_transferred);
00183         return;
00184       }
00185     }
00186 
00187     // Speculative reads are not ok as there will be queued read operations.
00188     descriptor_data.allow_speculative_read = false;
00189 
00190     if (read_op_queue_.enqueue_operation(descriptor, handler))
00191     {
00192       epoll_event ev = { 0, { 0 } };
00193       ev.events = EPOLLIN | EPOLLERR | EPOLLHUP;
00194       if (write_op_queue_.has_operation(descriptor))
00195         ev.events |= EPOLLOUT;
00196       if (except_op_queue_.has_operation(descriptor))
00197         ev.events |= EPOLLPRI;
00198       ev.data.fd = descriptor;
00199 
00200       int result = epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, descriptor, &ev);
00201       if (result != 0 && errno == ENOENT)
00202         result = epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, descriptor, &ev);
00203       if (result != 0)
00204       {
00205         asio::error_code ec(errno,
00206             asio::error::get_system_category());
00207         read_op_queue_.perform_all_operations(descriptor, ec);
00208       }
00209     }
00210   }
00211 
00212   // Start a new write operation. The handler object will be invoked when the
00213   // given descriptor is ready to be written, or an error has occurred.
00214   template <typename Handler>
00215   void start_write_op(socket_type descriptor,
00216       per_descriptor_data& descriptor_data,
00217       Handler handler, bool allow_speculative_write = true)
00218   {
00219     if (allow_speculative_write && descriptor_data.allow_speculative_write)
00220     {
00221       asio::error_code ec;
00222       std::size_t bytes_transferred = 0;
00223       if (handler.perform(ec, bytes_transferred))
00224       {
00225         handler.complete(ec, bytes_transferred);
00226         return;
00227       }
00228 
00229       // We only get one shot at a speculative write in this function.
00230       allow_speculative_write = false;
00231     }
00232 
00233     asio::detail::mutex::scoped_lock lock(mutex_);
00234 
00235     if (shutdown_)
00236       return;
00237 
00238     if (!allow_speculative_write)
00239       need_epoll_wait_ = true;
00240     else if (!write_op_queue_.has_operation(descriptor))
00241     {
00242       // Speculative writes are ok as there are no queued write operations.
00243       descriptor_data.allow_speculative_write = true;
00244 
00245       asio::error_code ec;
00246       std::size_t bytes_transferred = 0;
00247       if (handler.perform(ec, bytes_transferred))
00248       {
00249         handler.complete(ec, bytes_transferred);
00250         return;
00251       }
00252     }
00253 
00254     // Speculative writes are not ok as there will be queued write operations.
00255     descriptor_data.allow_speculative_write = false;
00256 
00257     if (write_op_queue_.enqueue_operation(descriptor, handler))
00258     {
00259       epoll_event ev = { 0, { 0 } };
00260       ev.events = EPOLLOUT | EPOLLERR | EPOLLHUP;
00261       if (read_op_queue_.has_operation(descriptor))
00262         ev.events |= EPOLLIN;
00263       if (except_op_queue_.has_operation(descriptor))
00264         ev.events |= EPOLLPRI;
00265       ev.data.fd = descriptor;
00266 
00267       int result = epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, descriptor, &ev);
00268       if (result != 0 && errno == ENOENT)
00269         result = epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, descriptor, &ev);
00270       if (result != 0)
00271       {
00272         asio::error_code ec(errno,
00273             asio::error::get_system_category());
00274         write_op_queue_.perform_all_operations(descriptor, ec);
00275       }
00276     }
00277   }
00278 
00279   // Start a new exception operation. The handler object will be invoked when
00280   // the given descriptor has exception information, or an error has occurred.
00281   template <typename Handler>
00282   void start_except_op(socket_type descriptor,
00283       per_descriptor_data&, Handler handler)
00284   {
00285     asio::detail::mutex::scoped_lock lock(mutex_);
00286 
00287     if (shutdown_)
00288       return;
00289 
00290     if (except_op_queue_.enqueue_operation(descriptor, handler))
00291     {
00292       epoll_event ev = { 0, { 0 } };
00293       ev.events = EPOLLPRI | EPOLLERR | EPOLLHUP;
00294       if (read_op_queue_.has_operation(descriptor))
00295         ev.events |= EPOLLIN;
00296       if (write_op_queue_.has_operation(descriptor))
00297         ev.events |= EPOLLOUT;
00298       ev.data.fd = descriptor;
00299 
00300       int result = epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, descriptor, &ev);
00301       if (result != 0 && errno == ENOENT)
00302         result = epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, descriptor, &ev);
00303       if (result != 0)
00304       {
00305         asio::error_code ec(errno,
00306             asio::error::get_system_category());
00307         except_op_queue_.perform_all_operations(descriptor, ec);
00308       }
00309     }
00310   }
00311 
00312   // Start a new write operation. The handler object will be invoked when the
00313   // given descriptor is ready for writing or an error has occurred. Speculative
00314   // writes are not allowed.
00315   template <typename Handler>
00316   void start_connect_op(socket_type descriptor,
00317       per_descriptor_data& descriptor_data, Handler handler)
00318   {
00319     asio::detail::mutex::scoped_lock lock(mutex_);
00320 
00321     if (shutdown_)
00322       return;
00323 
00324     // Speculative writes are not ok as there will be queued write operations.
00325     descriptor_data.allow_speculative_write = false;
00326 
00327     if (write_op_queue_.enqueue_operation(descriptor, handler))
00328     {
00329       epoll_event ev = { 0, { 0 } };
00330       ev.events = EPOLLOUT | EPOLLERR | EPOLLHUP;
00331       if (read_op_queue_.has_operation(descriptor))
00332         ev.events |= EPOLLIN;
00333       if (except_op_queue_.has_operation(descriptor))
00334         ev.events |= EPOLLPRI;
00335       ev.data.fd = descriptor;
00336 
00337       int result = epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, descriptor, &ev);
00338       if (result != 0 && errno == ENOENT)
00339         result = epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, descriptor, &ev);
00340       if (result != 0)
00341       {
00342         asio::error_code ec(errno,
00343             asio::error::get_system_category());
00344         write_op_queue_.perform_all_operations(descriptor, ec);
00345       }
00346     }
00347   }
00348 
00349   // Cancel all operations associated with the given descriptor. The
00350   // handlers associated with the descriptor will be invoked with the
00351   // operation_aborted error.
00352   void cancel_ops(socket_type descriptor, per_descriptor_data&)
00353   {
00354     asio::detail::mutex::scoped_lock lock(mutex_);
00355     cancel_ops_unlocked(descriptor);
00356   }
00357 
00358   // Cancel any operations that are running against the descriptor and remove
00359   // its registration from the reactor.
00360   void close_descriptor(socket_type descriptor, per_descriptor_data&)
00361   {
00362     asio::detail::mutex::scoped_lock lock(mutex_);
00363 
00364     // Remove the descriptor from epoll.
00365     epoll_event ev = { 0, { 0 } };
00366     epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, descriptor, &ev);
00367 
00368     // Cancel any outstanding operations associated with the descriptor.
00369     cancel_ops_unlocked(descriptor);
00370   }
00371 
00372   // Add a new timer queue to the reactor.
00373   template <typename Time_Traits>
00374   void add_timer_queue(timer_queue<Time_Traits>& timer_queue)
00375   {
00376     asio::detail::mutex::scoped_lock lock(mutex_);
00377     timer_queues_.push_back(&timer_queue);
00378   }
00379 
00380   // Remove a timer queue from the reactor.
00381   template <typename Time_Traits>
00382   void remove_timer_queue(timer_queue<Time_Traits>& timer_queue)
00383   {
00384     asio::detail::mutex::scoped_lock lock(mutex_);
00385     for (std::size_t i = 0; i < timer_queues_.size(); ++i)
00386     {
00387       if (timer_queues_[i] == &timer_queue)
00388       {
00389         timer_queues_.erase(timer_queues_.begin() + i);
00390         return;
00391       }
00392     }
00393   }
00394 
00395   // Schedule a timer in the given timer queue to expire at the specified
00396   // absolute time. The handler object will be invoked when the timer expires.
00397   template <typename Time_Traits, typename Handler>
00398   void schedule_timer(timer_queue<Time_Traits>& timer_queue,
00399       const typename Time_Traits::time_type& time, Handler handler, void* token)
00400   {
00401     asio::detail::mutex::scoped_lock lock(mutex_);
00402     if (!shutdown_)
00403       if (timer_queue.enqueue_timer(time, handler, token))
00404         interrupter_.interrupt();
00405   }
00406 
00407   // Cancel the timer associated with the given token. Returns the number of
00408   // handlers that have been posted or dispatched.
00409   template <typename Time_Traits>
00410   std::size_t cancel_timer(timer_queue<Time_Traits>& timer_queue, void* token)
00411   {
00412     asio::detail::mutex::scoped_lock lock(mutex_);
00413     std::size_t n = timer_queue.cancel_timer(token);
00414     if (n > 0)
00415       interrupter_.interrupt();
00416     return n;
00417   }
00418 
00419 private:
00420   friend class task_io_service<epoll_reactor<Own_Thread> >;
00421 
00422   // Run epoll once until interrupted or events are ready to be dispatched.
00423   void run(bool block)
00424   {
00425     asio::detail::mutex::scoped_lock lock(mutex_);
00426 
00427     // Dispatch any operation cancellations that were made while the select
00428     // loop was not running.
00429     read_op_queue_.perform_cancellations();
00430     write_op_queue_.perform_cancellations();
00431     except_op_queue_.perform_cancellations();
00432     for (std::size_t i = 0; i < timer_queues_.size(); ++i)
00433       timer_queues_[i]->dispatch_cancellations();
00434 
00435     // Check if the thread is supposed to stop.
00436     if (stop_thread_)
00437     {
00438       complete_operations_and_timers(lock);
00439       return;
00440     }
00441 
00442     // We can return immediately if there's no work to do and the reactor is
00443     // not supposed to block.
00444     if (!block && read_op_queue_.empty() && write_op_queue_.empty()
00445         && except_op_queue_.empty() && all_timer_queues_are_empty())
00446     {
00447       complete_operations_and_timers(lock);
00448       return;
00449     }
00450 
00451     int timeout = block ? get_timeout() : 0;
00452     wait_in_progress_ = true;
00453     lock.unlock();
00454 
00455     // Block on the epoll descriptor.
00456     epoll_event events[128];
00457     int num_events = (block || need_epoll_wait_)
00458       ? epoll_wait(epoll_fd_, events, 128, timeout)
00459       : 0;
00460 
00461     lock.lock();
00462     wait_in_progress_ = false;
00463 
00464     // Block signals while performing operations.
00465     asio::detail::signal_blocker sb;
00466 
00467     // Dispatch the waiting events.
00468     for (int i = 0; i < num_events; ++i)
00469     {
00470       int descriptor = events[i].data.fd;
00471       if (descriptor == interrupter_.read_descriptor())
00472       {
00473         interrupter_.reset();
00474       }
00475       else
00476       {
00477         bool more_reads = false;
00478         bool more_writes = false;
00479         bool more_except = false;
00480         asio::error_code ec;
00481 
00482         // Exception operations must be processed first to ensure that any
00483         // out-of-band data is read before normal data.
00484         if (events[i].events & (EPOLLPRI | EPOLLERR | EPOLLHUP))
00485           more_except = except_op_queue_.perform_operation(descriptor, ec);
00486         else
00487           more_except = except_op_queue_.has_operation(descriptor);
00488 
00489         if (events[i].events & (EPOLLIN | EPOLLERR | EPOLLHUP))
00490           more_reads = read_op_queue_.perform_operation(descriptor, ec);
00491         else
00492           more_reads = read_op_queue_.has_operation(descriptor);
00493 
00494         if (events[i].events & (EPOLLOUT | EPOLLERR | EPOLLHUP))
00495           more_writes = write_op_queue_.perform_operation(descriptor, ec);
00496         else
00497           more_writes = write_op_queue_.has_operation(descriptor);
00498 
00499         if ((events[i].events & (EPOLLERR | EPOLLHUP)) != 0
00500               && (events[i].events & ~(EPOLLERR | EPOLLHUP)) == 0
00501               && !more_except && !more_reads && !more_writes)
00502         {
00503           // If we have an event and no operations associated with the
00504           // descriptor then we need to delete the descriptor from epoll. The
00505           // epoll_wait system call can produce EPOLLHUP or EPOLLERR events
00506           // when there is no operation pending, so if we do not remove the
00507           // descriptor we can end up in a tight loop of repeated
00508           // calls to epoll_wait.
00509           epoll_event ev = { 0, { 0 } };
00510           epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, descriptor, &ev);
00511         }
00512         else
00513         {
00514           epoll_event ev = { 0, { 0 } };
00515           ev.events = EPOLLERR | EPOLLHUP;
00516           if (more_reads)
00517             ev.events |= EPOLLIN;
00518           if (more_writes)
00519             ev.events |= EPOLLOUT;
00520           if (more_except)
00521             ev.events |= EPOLLPRI;
00522           ev.data.fd = descriptor;
00523           int result = epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, descriptor, &ev);
00524           if (result != 0 && errno == ENOENT)
00525             result = epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, descriptor, &ev);
00526           if (result != 0)
00527           {
00528             ec = asio::error_code(errno,
00529                 asio::error::get_system_category());
00530             read_op_queue_.perform_all_operations(descriptor, ec);
00531             write_op_queue_.perform_all_operations(descriptor, ec);
00532             except_op_queue_.perform_all_operations(descriptor, ec);
00533           }
00534         }
00535       }
00536     }
00537     read_op_queue_.perform_cancellations();
00538     write_op_queue_.perform_cancellations();
00539     except_op_queue_.perform_cancellations();
00540     for (std::size_t i = 0; i < timer_queues_.size(); ++i)
00541     {
00542       timer_queues_[i]->dispatch_timers();
00543       timer_queues_[i]->dispatch_cancellations();
00544     }
00545 
00546     // Issue any pending cancellations.
00547     for (size_t i = 0; i < pending_cancellations_.size(); ++i)
00548       cancel_ops_unlocked(pending_cancellations_[i]);
00549     pending_cancellations_.clear();
00550 
00551     // Determine whether epoll_wait should be called when the reactor next runs.
00552     need_epoll_wait_ = !read_op_queue_.empty()
00553       || !write_op_queue_.empty() || !except_op_queue_.empty();
00554 
00555     complete_operations_and_timers(lock);
00556   }
00557 
00558   // Run the select loop in the thread.
00559   void run_thread()
00560   {
00561     asio::detail::mutex::scoped_lock lock(mutex_);
00562     while (!stop_thread_)
00563     {
00564       lock.unlock();
00565       run(true);
00566       lock.lock();
00567     }
00568   }
00569 
00570   // Entry point for the select loop thread.
00571   static void call_run_thread(epoll_reactor* reactor)
00572   {
00573     reactor->run_thread();
00574   }
00575 
00576   // Interrupt the select loop.
00577   void interrupt()
00578   {
00579     interrupter_.interrupt();
00580   }
00581 
00582   // The hint to pass to epoll_create to size its data structures.
00583   enum { epoll_size = 20000 };
00584 
00585   // Create the epoll file descriptor. Throws an exception if the descriptor
00586   // cannot be created.
00587   static int do_epoll_create()
00588   {
00589     int fd = epoll_create(epoll_size);
00590     if (fd == -1)
00591     {
00592       boost::throw_exception(
00593           asio::system_error(
00594             asio::error_code(errno,
00595               asio::error::get_system_category()),
00596             "epoll"));
00597     }
00598     return fd;
00599   }
00600 
00601   // Check if all timer queues are empty.
00602   bool all_timer_queues_are_empty() const
00603   {
00604     for (std::size_t i = 0; i < timer_queues_.size(); ++i)
00605       if (!timer_queues_[i]->empty())
00606         return false;
00607     return true;
00608   }
00609 
00610   // Get the timeout value for the epoll_wait call. The timeout value is
00611   // returned as a number of milliseconds. A return value of -1 indicates
00612   // that epoll_wait should block indefinitely.
00613   int get_timeout()
00614   {
00615     if (all_timer_queues_are_empty())
00616       return -1;
00617 
00618     // By default we will wait no longer than 5 minutes. This will ensure that
00619     // any changes to the system clock are detected after no longer than this.
00620     boost::posix_time::time_duration minimum_wait_duration
00621       = boost::posix_time::minutes(5);
00622 
00623     for (std::size_t i = 0; i < timer_queues_.size(); ++i)
00624     {
00625       boost::posix_time::time_duration wait_duration
00626         = timer_queues_[i]->wait_duration();
00627       if (wait_duration < minimum_wait_duration)
00628         minimum_wait_duration = wait_duration;
00629     }
00630 
00631     if (minimum_wait_duration > boost::posix_time::time_duration())
00632     {
00633       int milliseconds = minimum_wait_duration.total_milliseconds();
00634       return milliseconds > 0 ? milliseconds : 1;
00635     }
00636     else
00637     {
00638       return 0;
00639     }
00640   }
00641 
00642   // Cancel all operations associated with the given descriptor. The do_cancel
00643   // function of the handler objects will be invoked. This function does not
00644   // acquire the epoll_reactor's mutex.
00645   void cancel_ops_unlocked(socket_type descriptor)
00646   {
00647     bool interrupt = read_op_queue_.cancel_operations(descriptor);
00648     interrupt = write_op_queue_.cancel_operations(descriptor) || interrupt;
00649     interrupt = except_op_queue_.cancel_operations(descriptor) || interrupt;
00650     if (interrupt)
00651       interrupter_.interrupt();
00652   }
00653 
00654   // Clean up operations and timers. We must not hold the lock since the
00655   // destructors may make calls back into this reactor. We make a copy of the
00656   // vector of timer queues since the original may be modified while the lock
00657   // is not held.
00658   void complete_operations_and_timers(
00659       asio::detail::mutex::scoped_lock& lock)
00660   {
00661     timer_queues_for_cleanup_ = timer_queues_;
00662     lock.unlock();
00663     read_op_queue_.complete_operations();
00664     write_op_queue_.complete_operations();
00665     except_op_queue_.complete_operations();
00666     for (std::size_t i = 0; i < timer_queues_for_cleanup_.size(); ++i)
00667       timer_queues_for_cleanup_[i]->complete_timers();
00668   }
00669 
00670   // Mutex to protect access to internal data.
00671   asio::detail::mutex mutex_;
00672 
00673   // The epoll file descriptor.
00674   int epoll_fd_;
00675 
00676   // Whether the epoll_wait call is currently in progress
00677   bool wait_in_progress_;
00678 
00679   // The interrupter is used to break a blocking epoll_wait call.
00680   select_interrupter interrupter_;
00681 
00682   // The queue of read operations.
00683   reactor_op_queue<socket_type> read_op_queue_;
00684 
00685   // The queue of write operations.
00686   reactor_op_queue<socket_type> write_op_queue_;
00687 
00688   // The queue of except operations.
00689   reactor_op_queue<socket_type> except_op_queue_;
00690 
00691   // The timer queues.
00692   std::vector<timer_queue_base*> timer_queues_;
00693 
00694   // A copy of the timer queues, used when cleaning up timers. The copy is
00695   // stored as a class data member to avoid unnecessary memory allocation.
00696   std::vector<timer_queue_base*> timer_queues_for_cleanup_;
00697 
00698   // The descriptors that are pending cancellation.
00699   std::vector<socket_type> pending_cancellations_;
00700 
00701   // Does the reactor loop thread need to stop.
00702   bool stop_thread_;
00703 
00704   // The thread that is running the reactor loop.
00705   asio::detail::thread* thread_;
00706 
00707   // Whether the service has been shut down.
00708   bool shutdown_;
00709 
00710   // Whether we need to call epoll_wait the next time the reactor is run.
00711   bool need_epoll_wait_;
00712 };
00713 
00714 } // namespace detail
00715 } // namespace asio
00716 
00717 #endif // defined(ASIO_HAS_EPOLL)
00718 
00719 #include "asio/detail/pop_options.hpp"
00720 
00721 #endif // ASIO_DETAIL_EPOLL_REACTOR_HPP
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines


Castor
Author(s): Carpe Noctem
autogenerated on Fri Nov 8 2013 11:05:39