00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011 #ifndef ASIO_DETAIL_EPOLL_REACTOR_HPP
00012 #define ASIO_DETAIL_EPOLL_REACTOR_HPP
00013
00014 #if defined(_MSC_VER) && (_MSC_VER >= 1200)
00015 # pragma once
00016 #endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
00017
00018 #include "asio/detail/push_options.hpp"
00019
00020 #include "asio/detail/epoll_reactor_fwd.hpp"
00021
00022 #if defined(ASIO_HAS_EPOLL)
00023
00024 #include "asio/detail/push_options.hpp"
00025 #include <cstddef>
00026 #include <vector>
00027 #include <sys/epoll.h>
00028 #include <boost/config.hpp>
00029 #include <boost/date_time/posix_time/posix_time_types.hpp>
00030 #include <boost/throw_exception.hpp>
00031 #include "asio/detail/pop_options.hpp"
00032
00033 #include "asio/error.hpp"
00034 #include "asio/io_service.hpp"
00035 #include "asio/system_error.hpp"
00036 #include "asio/detail/bind_handler.hpp"
00037 #include "asio/detail/hash_map.hpp"
00038 #include "asio/detail/mutex.hpp"
00039 #include "asio/detail/task_io_service.hpp"
00040 #include "asio/detail/thread.hpp"
00041 #include "asio/detail/reactor_op_queue.hpp"
00042 #include "asio/detail/select_interrupter.hpp"
00043 #include "asio/detail/service_base.hpp"
00044 #include "asio/detail/signal_blocker.hpp"
00045 #include "asio/detail/socket_types.hpp"
00046 #include "asio/detail/timer_queue.hpp"
00047
00048 namespace asio {
00049 namespace detail {
00050
00051 template <bool Own_Thread>
00052 class epoll_reactor
00053 : public asio::detail::service_base<epoll_reactor<Own_Thread> >
00054 {
00055 public:
00056
00057 struct per_descriptor_data
00058 {
00059 bool allow_speculative_read;
00060 bool allow_speculative_write;
00061 };
00062
00063
00064 epoll_reactor(asio::io_service& io_service)
00065 : asio::detail::service_base<epoll_reactor<Own_Thread> >(io_service),
00066 mutex_(),
00067 epoll_fd_(do_epoll_create()),
00068 wait_in_progress_(false),
00069 interrupter_(),
00070 read_op_queue_(),
00071 write_op_queue_(),
00072 except_op_queue_(),
00073 pending_cancellations_(),
00074 stop_thread_(false),
00075 thread_(0),
00076 shutdown_(false),
00077 need_epoll_wait_(true)
00078 {
00079
00080 if (Own_Thread)
00081 {
00082 asio::detail::signal_blocker sb;
00083 thread_ = new asio::detail::thread(
00084 bind_handler(&epoll_reactor::call_run_thread, this));
00085 }
00086
00087
00088 epoll_event ev = { 0, { 0 } };
00089 ev.events = EPOLLIN | EPOLLERR;
00090 ev.data.fd = interrupter_.read_descriptor();
00091 epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, interrupter_.read_descriptor(), &ev);
00092 }
00093
00094
00095 ~epoll_reactor()
00096 {
00097 shutdown_service();
00098 close(epoll_fd_);
00099 }
00100
00101
00102 void shutdown_service()
00103 {
00104 asio::detail::mutex::scoped_lock lock(mutex_);
00105 shutdown_ = true;
00106 stop_thread_ = true;
00107 lock.unlock();
00108
00109 if (thread_)
00110 {
00111 interrupter_.interrupt();
00112 thread_->join();
00113 delete thread_;
00114 thread_ = 0;
00115 }
00116
00117 read_op_queue_.destroy_operations();
00118 write_op_queue_.destroy_operations();
00119 except_op_queue_.destroy_operations();
00120
00121 for (std::size_t i = 0; i < timer_queues_.size(); ++i)
00122 timer_queues_[i]->destroy_timers();
00123 timer_queues_.clear();
00124 }
00125
00126
00127
00128 int register_descriptor(socket_type descriptor,
00129 per_descriptor_data& descriptor_data)
00130 {
00131
00132
00133 descriptor_data.allow_speculative_read = true;
00134 descriptor_data.allow_speculative_write = true;
00135
00136 epoll_event ev = { 0, { 0 } };
00137 ev.events = 0;
00138 ev.data.fd = descriptor;
00139 int result = epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, descriptor, &ev);
00140 if (result != 0)
00141 return errno;
00142 return 0;
00143 }
00144
00145
00146
00147 template <typename Handler>
00148 void start_read_op(socket_type descriptor,
00149 per_descriptor_data& descriptor_data,
00150 Handler handler, bool allow_speculative_read = true)
00151 {
00152 if (allow_speculative_read && descriptor_data.allow_speculative_read)
00153 {
00154 asio::error_code ec;
00155 std::size_t bytes_transferred = 0;
00156 if (handler.perform(ec, bytes_transferred))
00157 {
00158 handler.complete(ec, bytes_transferred);
00159 return;
00160 }
00161
00162
00163 allow_speculative_read = false;
00164 }
00165
00166 asio::detail::mutex::scoped_lock lock(mutex_);
00167
00168 if (shutdown_)
00169 return;
00170
00171 if (!allow_speculative_read)
00172 need_epoll_wait_ = true;
00173 else if (!read_op_queue_.has_operation(descriptor))
00174 {
00175
00176 descriptor_data.allow_speculative_read = true;
00177
00178 asio::error_code ec;
00179 std::size_t bytes_transferred = 0;
00180 if (handler.perform(ec, bytes_transferred))
00181 {
00182 handler.complete(ec, bytes_transferred);
00183 return;
00184 }
00185 }
00186
00187
00188 descriptor_data.allow_speculative_read = false;
00189
00190 if (read_op_queue_.enqueue_operation(descriptor, handler))
00191 {
00192 epoll_event ev = { 0, { 0 } };
00193 ev.events = EPOLLIN | EPOLLERR | EPOLLHUP;
00194 if (write_op_queue_.has_operation(descriptor))
00195 ev.events |= EPOLLOUT;
00196 if (except_op_queue_.has_operation(descriptor))
00197 ev.events |= EPOLLPRI;
00198 ev.data.fd = descriptor;
00199
00200 int result = epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, descriptor, &ev);
00201 if (result != 0 && errno == ENOENT)
00202 result = epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, descriptor, &ev);
00203 if (result != 0)
00204 {
00205 asio::error_code ec(errno,
00206 asio::error::get_system_category());
00207 read_op_queue_.perform_all_operations(descriptor, ec);
00208 }
00209 }
00210 }
00211
00212
00213
00214 template <typename Handler>
00215 void start_write_op(socket_type descriptor,
00216 per_descriptor_data& descriptor_data,
00217 Handler handler, bool allow_speculative_write = true)
00218 {
00219 if (allow_speculative_write && descriptor_data.allow_speculative_write)
00220 {
00221 asio::error_code ec;
00222 std::size_t bytes_transferred = 0;
00223 if (handler.perform(ec, bytes_transferred))
00224 {
00225 handler.complete(ec, bytes_transferred);
00226 return;
00227 }
00228
00229
00230 allow_speculative_write = false;
00231 }
00232
00233 asio::detail::mutex::scoped_lock lock(mutex_);
00234
00235 if (shutdown_)
00236 return;
00237
00238 if (!allow_speculative_write)
00239 need_epoll_wait_ = true;
00240 else if (!write_op_queue_.has_operation(descriptor))
00241 {
00242
00243 descriptor_data.allow_speculative_write = true;
00244
00245 asio::error_code ec;
00246 std::size_t bytes_transferred = 0;
00247 if (handler.perform(ec, bytes_transferred))
00248 {
00249 handler.complete(ec, bytes_transferred);
00250 return;
00251 }
00252 }
00253
00254
00255 descriptor_data.allow_speculative_write = false;
00256
00257 if (write_op_queue_.enqueue_operation(descriptor, handler))
00258 {
00259 epoll_event ev = { 0, { 0 } };
00260 ev.events = EPOLLOUT | EPOLLERR | EPOLLHUP;
00261 if (read_op_queue_.has_operation(descriptor))
00262 ev.events |= EPOLLIN;
00263 if (except_op_queue_.has_operation(descriptor))
00264 ev.events |= EPOLLPRI;
00265 ev.data.fd = descriptor;
00266
00267 int result = epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, descriptor, &ev);
00268 if (result != 0 && errno == ENOENT)
00269 result = epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, descriptor, &ev);
00270 if (result != 0)
00271 {
00272 asio::error_code ec(errno,
00273 asio::error::get_system_category());
00274 write_op_queue_.perform_all_operations(descriptor, ec);
00275 }
00276 }
00277 }
00278
00279
00280
00281 template <typename Handler>
00282 void start_except_op(socket_type descriptor,
00283 per_descriptor_data&, Handler handler)
00284 {
00285 asio::detail::mutex::scoped_lock lock(mutex_);
00286
00287 if (shutdown_)
00288 return;
00289
00290 if (except_op_queue_.enqueue_operation(descriptor, handler))
00291 {
00292 epoll_event ev = { 0, { 0 } };
00293 ev.events = EPOLLPRI | EPOLLERR | EPOLLHUP;
00294 if (read_op_queue_.has_operation(descriptor))
00295 ev.events |= EPOLLIN;
00296 if (write_op_queue_.has_operation(descriptor))
00297 ev.events |= EPOLLOUT;
00298 ev.data.fd = descriptor;
00299
00300 int result = epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, descriptor, &ev);
00301 if (result != 0 && errno == ENOENT)
00302 result = epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, descriptor, &ev);
00303 if (result != 0)
00304 {
00305 asio::error_code ec(errno,
00306 asio::error::get_system_category());
00307 except_op_queue_.perform_all_operations(descriptor, ec);
00308 }
00309 }
00310 }
00311
00312
00313
00314
00315 template <typename Handler>
00316 void start_connect_op(socket_type descriptor,
00317 per_descriptor_data& descriptor_data, Handler handler)
00318 {
00319 asio::detail::mutex::scoped_lock lock(mutex_);
00320
00321 if (shutdown_)
00322 return;
00323
00324
00325 descriptor_data.allow_speculative_write = false;
00326
00327 if (write_op_queue_.enqueue_operation(descriptor, handler))
00328 {
00329 epoll_event ev = { 0, { 0 } };
00330 ev.events = EPOLLOUT | EPOLLERR | EPOLLHUP;
00331 if (read_op_queue_.has_operation(descriptor))
00332 ev.events |= EPOLLIN;
00333 if (except_op_queue_.has_operation(descriptor))
00334 ev.events |= EPOLLPRI;
00335 ev.data.fd = descriptor;
00336
00337 int result = epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, descriptor, &ev);
00338 if (result != 0 && errno == ENOENT)
00339 result = epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, descriptor, &ev);
00340 if (result != 0)
00341 {
00342 asio::error_code ec(errno,
00343 asio::error::get_system_category());
00344 write_op_queue_.perform_all_operations(descriptor, ec);
00345 }
00346 }
00347 }
00348
00349
00350
00351
00352 void cancel_ops(socket_type descriptor, per_descriptor_data&)
00353 {
00354 asio::detail::mutex::scoped_lock lock(mutex_);
00355 cancel_ops_unlocked(descriptor);
00356 }
00357
00358
00359
00360 void close_descriptor(socket_type descriptor, per_descriptor_data&)
00361 {
00362 asio::detail::mutex::scoped_lock lock(mutex_);
00363
00364
00365 epoll_event ev = { 0, { 0 } };
00366 epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, descriptor, &ev);
00367
00368
00369 cancel_ops_unlocked(descriptor);
00370 }
00371
00372
00373 template <typename Time_Traits>
00374 void add_timer_queue(timer_queue<Time_Traits>& timer_queue)
00375 {
00376 asio::detail::mutex::scoped_lock lock(mutex_);
00377 timer_queues_.push_back(&timer_queue);
00378 }
00379
00380
00381 template <typename Time_Traits>
00382 void remove_timer_queue(timer_queue<Time_Traits>& timer_queue)
00383 {
00384 asio::detail::mutex::scoped_lock lock(mutex_);
00385 for (std::size_t i = 0; i < timer_queues_.size(); ++i)
00386 {
00387 if (timer_queues_[i] == &timer_queue)
00388 {
00389 timer_queues_.erase(timer_queues_.begin() + i);
00390 return;
00391 }
00392 }
00393 }
00394
00395
00396
00397 template <typename Time_Traits, typename Handler>
00398 void schedule_timer(timer_queue<Time_Traits>& timer_queue,
00399 const typename Time_Traits::time_type& time, Handler handler, void* token)
00400 {
00401 asio::detail::mutex::scoped_lock lock(mutex_);
00402 if (!shutdown_)
00403 if (timer_queue.enqueue_timer(time, handler, token))
00404 interrupter_.interrupt();
00405 }
00406
00407
00408
00409 template <typename Time_Traits>
00410 std::size_t cancel_timer(timer_queue<Time_Traits>& timer_queue, void* token)
00411 {
00412 asio::detail::mutex::scoped_lock lock(mutex_);
00413 std::size_t n = timer_queue.cancel_timer(token);
00414 if (n > 0)
00415 interrupter_.interrupt();
00416 return n;
00417 }
00418
00419 private:
00420 friend class task_io_service<epoll_reactor<Own_Thread> >;
00421
00422
00423 void run(bool block)
00424 {
00425 asio::detail::mutex::scoped_lock lock(mutex_);
00426
00427
00428
00429 read_op_queue_.perform_cancellations();
00430 write_op_queue_.perform_cancellations();
00431 except_op_queue_.perform_cancellations();
00432 for (std::size_t i = 0; i < timer_queues_.size(); ++i)
00433 timer_queues_[i]->dispatch_cancellations();
00434
00435
00436 if (stop_thread_)
00437 {
00438 complete_operations_and_timers(lock);
00439 return;
00440 }
00441
00442
00443
00444 if (!block && read_op_queue_.empty() && write_op_queue_.empty()
00445 && except_op_queue_.empty() && all_timer_queues_are_empty())
00446 {
00447 complete_operations_and_timers(lock);
00448 return;
00449 }
00450
00451 int timeout = block ? get_timeout() : 0;
00452 wait_in_progress_ = true;
00453 lock.unlock();
00454
00455
00456 epoll_event events[128];
00457 int num_events = (block || need_epoll_wait_)
00458 ? epoll_wait(epoll_fd_, events, 128, timeout)
00459 : 0;
00460
00461 lock.lock();
00462 wait_in_progress_ = false;
00463
00464
00465 asio::detail::signal_blocker sb;
00466
00467
00468 for (int i = 0; i < num_events; ++i)
00469 {
00470 int descriptor = events[i].data.fd;
00471 if (descriptor == interrupter_.read_descriptor())
00472 {
00473 interrupter_.reset();
00474 }
00475 else
00476 {
00477 bool more_reads = false;
00478 bool more_writes = false;
00479 bool more_except = false;
00480 asio::error_code ec;
00481
00482
00483
00484 if (events[i].events & (EPOLLPRI | EPOLLERR | EPOLLHUP))
00485 more_except = except_op_queue_.perform_operation(descriptor, ec);
00486 else
00487 more_except = except_op_queue_.has_operation(descriptor);
00488
00489 if (events[i].events & (EPOLLIN | EPOLLERR | EPOLLHUP))
00490 more_reads = read_op_queue_.perform_operation(descriptor, ec);
00491 else
00492 more_reads = read_op_queue_.has_operation(descriptor);
00493
00494 if (events[i].events & (EPOLLOUT | EPOLLERR | EPOLLHUP))
00495 more_writes = write_op_queue_.perform_operation(descriptor, ec);
00496 else
00497 more_writes = write_op_queue_.has_operation(descriptor);
00498
00499 if ((events[i].events & (EPOLLERR | EPOLLHUP)) != 0
00500 && (events[i].events & ~(EPOLLERR | EPOLLHUP)) == 0
00501 && !more_except && !more_reads && !more_writes)
00502 {
00503
00504
00505
00506
00507
00508
00509 epoll_event ev = { 0, { 0 } };
00510 epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, descriptor, &ev);
00511 }
00512 else
00513 {
00514 epoll_event ev = { 0, { 0 } };
00515 ev.events = EPOLLERR | EPOLLHUP;
00516 if (more_reads)
00517 ev.events |= EPOLLIN;
00518 if (more_writes)
00519 ev.events |= EPOLLOUT;
00520 if (more_except)
00521 ev.events |= EPOLLPRI;
00522 ev.data.fd = descriptor;
00523 int result = epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, descriptor, &ev);
00524 if (result != 0 && errno == ENOENT)
00525 result = epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, descriptor, &ev);
00526 if (result != 0)
00527 {
00528 ec = asio::error_code(errno,
00529 asio::error::get_system_category());
00530 read_op_queue_.perform_all_operations(descriptor, ec);
00531 write_op_queue_.perform_all_operations(descriptor, ec);
00532 except_op_queue_.perform_all_operations(descriptor, ec);
00533 }
00534 }
00535 }
00536 }
00537 read_op_queue_.perform_cancellations();
00538 write_op_queue_.perform_cancellations();
00539 except_op_queue_.perform_cancellations();
00540 for (std::size_t i = 0; i < timer_queues_.size(); ++i)
00541 {
00542 timer_queues_[i]->dispatch_timers();
00543 timer_queues_[i]->dispatch_cancellations();
00544 }
00545
00546
00547 for (size_t i = 0; i < pending_cancellations_.size(); ++i)
00548 cancel_ops_unlocked(pending_cancellations_[i]);
00549 pending_cancellations_.clear();
00550
00551
00552 need_epoll_wait_ = !read_op_queue_.empty()
00553 || !write_op_queue_.empty() || !except_op_queue_.empty();
00554
00555 complete_operations_and_timers(lock);
00556 }
00557
00558
00559 void run_thread()
00560 {
00561 asio::detail::mutex::scoped_lock lock(mutex_);
00562 while (!stop_thread_)
00563 {
00564 lock.unlock();
00565 run(true);
00566 lock.lock();
00567 }
00568 }
00569
00570
00571 static void call_run_thread(epoll_reactor* reactor)
00572 {
00573 reactor->run_thread();
00574 }
00575
00576
00577 void interrupt()
00578 {
00579 interrupter_.interrupt();
00580 }
00581
00582
00583 enum { epoll_size = 20000 };
00584
00585
00586
00587 static int do_epoll_create()
00588 {
00589 int fd = epoll_create(epoll_size);
00590 if (fd == -1)
00591 {
00592 boost::throw_exception(
00593 asio::system_error(
00594 asio::error_code(errno,
00595 asio::error::get_system_category()),
00596 "epoll"));
00597 }
00598 return fd;
00599 }
00600
00601
00602 bool all_timer_queues_are_empty() const
00603 {
00604 for (std::size_t i = 0; i < timer_queues_.size(); ++i)
00605 if (!timer_queues_[i]->empty())
00606 return false;
00607 return true;
00608 }
00609
00610
00611
00612
00613 int get_timeout()
00614 {
00615 if (all_timer_queues_are_empty())
00616 return -1;
00617
00618
00619
00620 boost::posix_time::time_duration minimum_wait_duration
00621 = boost::posix_time::minutes(5);
00622
00623 for (std::size_t i = 0; i < timer_queues_.size(); ++i)
00624 {
00625 boost::posix_time::time_duration wait_duration
00626 = timer_queues_[i]->wait_duration();
00627 if (wait_duration < minimum_wait_duration)
00628 minimum_wait_duration = wait_duration;
00629 }
00630
00631 if (minimum_wait_duration > boost::posix_time::time_duration())
00632 {
00633 int milliseconds = minimum_wait_duration.total_milliseconds();
00634 return milliseconds > 0 ? milliseconds : 1;
00635 }
00636 else
00637 {
00638 return 0;
00639 }
00640 }
00641
00642
00643
00644
00645 void cancel_ops_unlocked(socket_type descriptor)
00646 {
00647 bool interrupt = read_op_queue_.cancel_operations(descriptor);
00648 interrupt = write_op_queue_.cancel_operations(descriptor) || interrupt;
00649 interrupt = except_op_queue_.cancel_operations(descriptor) || interrupt;
00650 if (interrupt)
00651 interrupter_.interrupt();
00652 }
00653
00654
00655
00656
00657
00658 void complete_operations_and_timers(
00659 asio::detail::mutex::scoped_lock& lock)
00660 {
00661 timer_queues_for_cleanup_ = timer_queues_;
00662 lock.unlock();
00663 read_op_queue_.complete_operations();
00664 write_op_queue_.complete_operations();
00665 except_op_queue_.complete_operations();
00666 for (std::size_t i = 0; i < timer_queues_for_cleanup_.size(); ++i)
00667 timer_queues_for_cleanup_[i]->complete_timers();
00668 }
00669
00670
00671 asio::detail::mutex mutex_;
00672
00673
00674 int epoll_fd_;
00675
00676
00677 bool wait_in_progress_;
00678
00679
00680 select_interrupter interrupter_;
00681
00682
00683 reactor_op_queue<socket_type> read_op_queue_;
00684
00685
00686 reactor_op_queue<socket_type> write_op_queue_;
00687
00688
00689 reactor_op_queue<socket_type> except_op_queue_;
00690
00691
00692 std::vector<timer_queue_base*> timer_queues_;
00693
00694
00695
00696 std::vector<timer_queue_base*> timer_queues_for_cleanup_;
00697
00698
00699 std::vector<socket_type> pending_cancellations_;
00700
00701
00702 bool stop_thread_;
00703
00704
00705 asio::detail::thread* thread_;
00706
00707
00708 bool shutdown_;
00709
00710
00711 bool need_epoll_wait_;
00712 };
00713
00714 }
00715 }
00716
00717 #endif // defined(ASIO_HAS_EPOLL)
00718
00719 #include "asio/detail/pop_options.hpp"
00720
00721 #endif // ASIO_DETAIL_EPOLL_REACTOR_HPP