00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011 #ifndef ASIO_DETAIL_DEV_POLL_REACTOR_HPP
00012 #define ASIO_DETAIL_DEV_POLL_REACTOR_HPP
00013
00014 #if defined(_MSC_VER) && (_MSC_VER >= 1200)
00015 # pragma once
00016 #endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
00017
00018 #include "asio/detail/push_options.hpp"
00019
00020 #include "asio/detail/dev_poll_reactor_fwd.hpp"
00021
00022 #if defined(ASIO_HAS_DEV_POLL)
00023
00024 #include "asio/detail/push_options.hpp"
00025 #include <cstddef>
00026 #include <vector>
00027 #include <boost/config.hpp>
00028 #include <boost/date_time/posix_time/posix_time_types.hpp>
00029 #include <boost/throw_exception.hpp>
00030 #include <sys/devpoll.h>
00031 #include "asio/detail/pop_options.hpp"
00032
00033 #include "asio/error.hpp"
00034 #include "asio/io_service.hpp"
00035 #include "asio/system_error.hpp"
00036 #include "asio/detail/bind_handler.hpp"
00037 #include "asio/detail/hash_map.hpp"
00038 #include "asio/detail/mutex.hpp"
00039 #include "asio/detail/task_io_service.hpp"
00040 #include "asio/detail/thread.hpp"
00041 #include "asio/detail/reactor_op_queue.hpp"
00042 #include "asio/detail/select_interrupter.hpp"
00043 #include "asio/detail/service_base.hpp"
00044 #include "asio/detail/signal_blocker.hpp"
00045 #include "asio/detail/socket_types.hpp"
00046 #include "asio/detail/timer_queue.hpp"
00047
00048 namespace asio {
00049 namespace detail {
00050
00051 template <bool Own_Thread>
00052 class dev_poll_reactor
00053 : public asio::detail::service_base<dev_poll_reactor<Own_Thread> >
00054 {
00055 public:
00056
00057 struct per_descriptor_data
00058 {
00059 };
00060
00061
00062 dev_poll_reactor(asio::io_service& io_service)
00063 : asio::detail::service_base<
00064 dev_poll_reactor<Own_Thread> >(io_service),
00065 mutex_(),
00066 dev_poll_fd_(do_dev_poll_create()),
00067 wait_in_progress_(false),
00068 interrupter_(),
00069 read_op_queue_(),
00070 write_op_queue_(),
00071 except_op_queue_(),
00072 pending_cancellations_(),
00073 stop_thread_(false),
00074 thread_(0),
00075 shutdown_(false)
00076 {
00077
00078 if (Own_Thread)
00079 {
00080 asio::detail::signal_blocker sb;
00081 thread_ = new asio::detail::thread(
00082 bind_handler(&dev_poll_reactor::call_run_thread, this));
00083 }
00084
00085
00086 ::pollfd ev = { 0 };
00087 ev.fd = interrupter_.read_descriptor();
00088 ev.events = POLLIN | POLLERR;
00089 ev.revents = 0;
00090 ::write(dev_poll_fd_, &ev, sizeof(ev));
00091 }
00092
00093
00094 ~dev_poll_reactor()
00095 {
00096 shutdown_service();
00097 ::close(dev_poll_fd_);
00098 }
00099
00100
00101 void shutdown_service()
00102 {
00103 asio::detail::mutex::scoped_lock lock(mutex_);
00104 shutdown_ = true;
00105 stop_thread_ = true;
00106 lock.unlock();
00107
00108 if (thread_)
00109 {
00110 interrupter_.interrupt();
00111 thread_->join();
00112 delete thread_;
00113 thread_ = 0;
00114 }
00115
00116 read_op_queue_.destroy_operations();
00117 write_op_queue_.destroy_operations();
00118 except_op_queue_.destroy_operations();
00119
00120 for (std::size_t i = 0; i < timer_queues_.size(); ++i)
00121 timer_queues_[i]->destroy_timers();
00122 timer_queues_.clear();
00123 }
00124
00125
00126
00127 int register_descriptor(socket_type, per_descriptor_data&)
00128 {
00129 return 0;
00130 }
00131
00132
00133
00134 template <typename Handler>
00135 void start_read_op(socket_type descriptor, per_descriptor_data&,
00136 Handler handler, bool allow_speculative_read = true)
00137 {
00138 asio::detail::mutex::scoped_lock lock(mutex_);
00139
00140 if (shutdown_)
00141 return;
00142
00143 if (allow_speculative_read)
00144 {
00145 if (!read_op_queue_.has_operation(descriptor))
00146 {
00147 asio::error_code ec;
00148 std::size_t bytes_transferred = 0;
00149 if (handler.perform(ec, bytes_transferred))
00150 {
00151 handler.complete(ec, bytes_transferred);
00152 return;
00153 }
00154 }
00155 }
00156
00157 if (read_op_queue_.enqueue_operation(descriptor, handler))
00158 {
00159 ::pollfd& ev = add_pending_event_change(descriptor);
00160 ev.events = POLLIN | POLLERR | POLLHUP;
00161 if (write_op_queue_.has_operation(descriptor))
00162 ev.events |= POLLOUT;
00163 if (except_op_queue_.has_operation(descriptor))
00164 ev.events |= POLLPRI;
00165 interrupter_.interrupt();
00166 }
00167 }
00168
00169
00170
00171 template <typename Handler>
00172 void start_write_op(socket_type descriptor, per_descriptor_data&,
00173 Handler handler, bool allow_speculative_write = true)
00174 {
00175 asio::detail::mutex::scoped_lock lock(mutex_);
00176
00177 if (shutdown_)
00178 return;
00179
00180 if (allow_speculative_write)
00181 {
00182 if (!write_op_queue_.has_operation(descriptor))
00183 {
00184 asio::error_code ec;
00185 std::size_t bytes_transferred = 0;
00186 if (handler.perform(ec, bytes_transferred))
00187 {
00188 handler.complete(ec, bytes_transferred);
00189 return;
00190 }
00191 }
00192 }
00193
00194 if (write_op_queue_.enqueue_operation(descriptor, handler))
00195 {
00196 ::pollfd& ev = add_pending_event_change(descriptor);
00197 ev.events = POLLOUT | POLLERR | POLLHUP;
00198 if (read_op_queue_.has_operation(descriptor))
00199 ev.events |= POLLIN;
00200 if (except_op_queue_.has_operation(descriptor))
00201 ev.events |= POLLPRI;
00202 interrupter_.interrupt();
00203 }
00204 }
00205
00206
00207
00208 template <typename Handler>
00209 void start_except_op(socket_type descriptor,
00210 per_descriptor_data&, Handler handler)
00211 {
00212 asio::detail::mutex::scoped_lock lock(mutex_);
00213
00214 if (shutdown_)
00215 return;
00216
00217 if (except_op_queue_.enqueue_operation(descriptor, handler))
00218 {
00219 ::pollfd& ev = add_pending_event_change(descriptor);
00220 ev.events = POLLPRI | POLLERR | POLLHUP;
00221 if (read_op_queue_.has_operation(descriptor))
00222 ev.events |= POLLIN;
00223 if (write_op_queue_.has_operation(descriptor))
00224 ev.events |= POLLOUT;
00225 interrupter_.interrupt();
00226 }
00227 }
00228
00229
00230
00231 template <typename Handler>
00232 void start_connect_op(socket_type descriptor,
00233 per_descriptor_data&, Handler handler)
00234 {
00235 asio::detail::mutex::scoped_lock lock(mutex_);
00236
00237 if (shutdown_)
00238 return;
00239
00240 if (write_op_queue_.enqueue_operation(descriptor, handler))
00241 {
00242 ::pollfd& ev = add_pending_event_change(descriptor);
00243 ev.events = POLLOUT | POLLERR | POLLHUP;
00244 if (read_op_queue_.has_operation(descriptor))
00245 ev.events |= POLLIN;
00246 if (except_op_queue_.has_operation(descriptor))
00247 ev.events |= POLLPRI;
00248 interrupter_.interrupt();
00249 }
00250 }
00251
00252
00253
00254
00255 void cancel_ops(socket_type descriptor, per_descriptor_data&)
00256 {
00257 asio::detail::mutex::scoped_lock lock(mutex_);
00258 cancel_ops_unlocked(descriptor);
00259 }
00260
00261
00262
00263 void close_descriptor(socket_type descriptor, per_descriptor_data&)
00264 {
00265 asio::detail::mutex::scoped_lock lock(mutex_);
00266
00267
00268 ::pollfd& ev = add_pending_event_change(descriptor);
00269 ev.events = POLLREMOVE;
00270 interrupter_.interrupt();
00271
00272
00273 cancel_ops_unlocked(descriptor);
00274 }
00275
00276
00277 template <typename Time_Traits>
00278 void add_timer_queue(timer_queue<Time_Traits>& timer_queue)
00279 {
00280 asio::detail::mutex::scoped_lock lock(mutex_);
00281 timer_queues_.push_back(&timer_queue);
00282 }
00283
00284
00285 template <typename Time_Traits>
00286 void remove_timer_queue(timer_queue<Time_Traits>& timer_queue)
00287 {
00288 asio::detail::mutex::scoped_lock lock(mutex_);
00289 for (std::size_t i = 0; i < timer_queues_.size(); ++i)
00290 {
00291 if (timer_queues_[i] == &timer_queue)
00292 {
00293 timer_queues_.erase(timer_queues_.begin() + i);
00294 return;
00295 }
00296 }
00297 }
00298
00299
00300
00301 template <typename Time_Traits, typename Handler>
00302 void schedule_timer(timer_queue<Time_Traits>& timer_queue,
00303 const typename Time_Traits::time_type& time, Handler handler, void* token)
00304 {
00305 asio::detail::mutex::scoped_lock lock(mutex_);
00306 if (!shutdown_)
00307 if (timer_queue.enqueue_timer(time, handler, token))
00308 interrupter_.interrupt();
00309 }
00310
00311
00312
00313 template <typename Time_Traits>
00314 std::size_t cancel_timer(timer_queue<Time_Traits>& timer_queue, void* token)
00315 {
00316 asio::detail::mutex::scoped_lock lock(mutex_);
00317 std::size_t n = timer_queue.cancel_timer(token);
00318 if (n > 0)
00319 interrupter_.interrupt();
00320 return n;
00321 }
00322
00323 private:
00324 friend class task_io_service<dev_poll_reactor<Own_Thread> >;
00325
00326
00327 void run(bool block)
00328 {
00329 asio::detail::mutex::scoped_lock lock(mutex_);
00330
00331
00332
00333 read_op_queue_.perform_cancellations();
00334 write_op_queue_.perform_cancellations();
00335 except_op_queue_.perform_cancellations();
00336 for (std::size_t i = 0; i < timer_queues_.size(); ++i)
00337 timer_queues_[i]->dispatch_cancellations();
00338
00339
00340 if (stop_thread_)
00341 {
00342 complete_operations_and_timers(lock);
00343 return;
00344 }
00345
00346
00347
00348 if (!block && read_op_queue_.empty() && write_op_queue_.empty()
00349 && except_op_queue_.empty() && all_timer_queues_are_empty())
00350 {
00351 complete_operations_and_timers(lock);
00352 return;
00353 }
00354
00355
00356 std::size_t events_size = sizeof(::pollfd) * pending_event_changes_.size();
00357 errno = 0;
00358 int result = ::write(dev_poll_fd_,
00359 &pending_event_changes_[0], events_size);
00360 if (result != static_cast<int>(events_size))
00361 {
00362 for (std::size_t i = 0; i < pending_event_changes_.size(); ++i)
00363 {
00364 int descriptor = pending_event_changes_[i].fd;
00365 asio::error_code ec = asio::error_code(
00366 errno, asio::error::get_system_category());
00367 read_op_queue_.perform_all_operations(descriptor, ec);
00368 write_op_queue_.perform_all_operations(descriptor, ec);
00369 except_op_queue_.perform_all_operations(descriptor, ec);
00370 }
00371 }
00372 pending_event_changes_.clear();
00373 pending_event_change_index_.clear();
00374
00375 int timeout = block ? get_timeout() : 0;
00376 wait_in_progress_ = true;
00377 lock.unlock();
00378
00379
00380 ::pollfd events[128] = { { 0 } };
00381 ::dvpoll dp = { 0 };
00382 dp.dp_fds = events;
00383 dp.dp_nfds = 128;
00384 dp.dp_timeout = timeout;
00385 int num_events = ::ioctl(dev_poll_fd_, DP_POLL, &dp);
00386
00387 lock.lock();
00388 wait_in_progress_ = false;
00389
00390
00391 asio::detail::signal_blocker sb;
00392
00393
00394 for (int i = 0; i < num_events; ++i)
00395 {
00396 int descriptor = events[i].fd;
00397 if (descriptor == interrupter_.read_descriptor())
00398 {
00399 interrupter_.reset();
00400 }
00401 else
00402 {
00403 bool more_reads = false;
00404 bool more_writes = false;
00405 bool more_except = false;
00406 asio::error_code ec;
00407
00408
00409
00410 if (events[i].events & (POLLPRI | POLLERR | POLLHUP))
00411 more_except = except_op_queue_.perform_operation(descriptor, ec);
00412 else
00413 more_except = except_op_queue_.has_operation(descriptor);
00414
00415 if (events[i].events & (POLLIN | POLLERR | POLLHUP))
00416 more_reads = read_op_queue_.perform_operation(descriptor, ec);
00417 else
00418 more_reads = read_op_queue_.has_operation(descriptor);
00419
00420 if (events[i].events & (POLLOUT | POLLERR | POLLHUP))
00421 more_writes = write_op_queue_.perform_operation(descriptor, ec);
00422 else
00423 more_writes = write_op_queue_.has_operation(descriptor);
00424
00425 if ((events[i].events & (POLLERR | POLLHUP)) != 0
00426 && (events[i].events & ~(POLLERR | POLLHUP)) == 0
00427 && !more_except && !more_reads && !more_writes)
00428 {
00429
00430
00431
00432
00433
00434 ::pollfd ev = { 0 };
00435 ev.fd = descriptor;
00436 ev.events = POLLREMOVE;
00437 ev.revents = 0;
00438 ::write(dev_poll_fd_, &ev, sizeof(ev));
00439 }
00440 else
00441 {
00442 ::pollfd ev = { 0 };
00443 ev.fd = descriptor;
00444 ev.events = POLLERR | POLLHUP;
00445 if (more_reads)
00446 ev.events |= POLLIN;
00447 if (more_writes)
00448 ev.events |= POLLOUT;
00449 if (more_except)
00450 ev.events |= POLLPRI;
00451 ev.revents = 0;
00452 int result = ::write(dev_poll_fd_, &ev, sizeof(ev));
00453 if (result != sizeof(ev))
00454 {
00455 ec = asio::error_code(errno,
00456 asio::error::get_system_category());
00457 read_op_queue_.perform_all_operations(descriptor, ec);
00458 write_op_queue_.perform_all_operations(descriptor, ec);
00459 except_op_queue_.perform_all_operations(descriptor, ec);
00460 }
00461 }
00462 }
00463 }
00464 read_op_queue_.perform_cancellations();
00465 write_op_queue_.perform_cancellations();
00466 except_op_queue_.perform_cancellations();
00467 for (std::size_t i = 0; i < timer_queues_.size(); ++i)
00468 {
00469 timer_queues_[i]->dispatch_timers();
00470 timer_queues_[i]->dispatch_cancellations();
00471 }
00472
00473
00474 for (size_t i = 0; i < pending_cancellations_.size(); ++i)
00475 cancel_ops_unlocked(pending_cancellations_[i]);
00476 pending_cancellations_.clear();
00477
00478 complete_operations_and_timers(lock);
00479 }
00480
00481
00482 void run_thread()
00483 {
00484 asio::detail::mutex::scoped_lock lock(mutex_);
00485 while (!stop_thread_)
00486 {
00487 lock.unlock();
00488 run(true);
00489 lock.lock();
00490 }
00491 }
00492
00493
00494 static void call_run_thread(dev_poll_reactor* reactor)
00495 {
00496 reactor->run_thread();
00497 }
00498
00499
00500 void interrupt()
00501 {
00502 interrupter_.interrupt();
00503 }
00504
00505
00506
00507 static int do_dev_poll_create()
00508 {
00509 int fd = ::open("/dev/poll", O_RDWR);
00510 if (fd == -1)
00511 {
00512 boost::throw_exception(
00513 asio::system_error(
00514 asio::error_code(errno,
00515 asio::error::get_system_category()),
00516 "/dev/poll"));
00517 }
00518 return fd;
00519 }
00520
00521
00522 bool all_timer_queues_are_empty() const
00523 {
00524 for (std::size_t i = 0; i < timer_queues_.size(); ++i)
00525 if (!timer_queues_[i]->empty())
00526 return false;
00527 return true;
00528 }
00529
00530
00531
00532
00533 int get_timeout()
00534 {
00535 if (all_timer_queues_are_empty())
00536 return -1;
00537
00538
00539
00540 boost::posix_time::time_duration minimum_wait_duration
00541 = boost::posix_time::minutes(5);
00542
00543 for (std::size_t i = 0; i < timer_queues_.size(); ++i)
00544 {
00545 boost::posix_time::time_duration wait_duration
00546 = timer_queues_[i]->wait_duration();
00547 if (wait_duration < minimum_wait_duration)
00548 minimum_wait_duration = wait_duration;
00549 }
00550
00551 if (minimum_wait_duration > boost::posix_time::time_duration())
00552 {
00553 int milliseconds = minimum_wait_duration.total_milliseconds();
00554 return milliseconds > 0 ? milliseconds : 1;
00555 }
00556 else
00557 {
00558 return 0;
00559 }
00560 }
00561
00562
00563
00564
00565 void cancel_ops_unlocked(socket_type descriptor)
00566 {
00567 bool interrupt = read_op_queue_.cancel_operations(descriptor);
00568 interrupt = write_op_queue_.cancel_operations(descriptor) || interrupt;
00569 interrupt = except_op_queue_.cancel_operations(descriptor) || interrupt;
00570 if (interrupt)
00571 interrupter_.interrupt();
00572 }
00573
00574
00575
00576
00577
00578 void complete_operations_and_timers(
00579 asio::detail::mutex::scoped_lock& lock)
00580 {
00581 timer_queues_for_cleanup_ = timer_queues_;
00582 lock.unlock();
00583 read_op_queue_.complete_operations();
00584 write_op_queue_.complete_operations();
00585 except_op_queue_.complete_operations();
00586 for (std::size_t i = 0; i < timer_queues_for_cleanup_.size(); ++i)
00587 timer_queues_for_cleanup_[i]->complete_timers();
00588 }
00589
00590
00591 ::pollfd& add_pending_event_change(int descriptor)
00592 {
00593 hash_map<int, std::size_t>::iterator iter
00594 = pending_event_change_index_.find(descriptor);
00595 if (iter == pending_event_change_index_.end())
00596 {
00597 std::size_t index = pending_event_changes_.size();
00598 pending_event_changes_.reserve(pending_event_changes_.size() + 1);
00599 pending_event_change_index_.insert(std::make_pair(descriptor, index));
00600 pending_event_changes_.push_back(::pollfd());
00601 pending_event_changes_[index].fd = descriptor;
00602 pending_event_changes_[index].revents = 0;
00603 return pending_event_changes_[index];
00604 }
00605 else
00606 {
00607 return pending_event_changes_[iter->second];
00608 }
00609 }
00610
00611
00612 asio::detail::mutex mutex_;
00613
00614
00615 int dev_poll_fd_;
00616
00617
00618 std::vector< ::pollfd> pending_event_changes_;
00619
00620
00621 hash_map<int, std::size_t> pending_event_change_index_;
00622
00623
00624 bool wait_in_progress_;
00625
00626
00627 select_interrupter interrupter_;
00628
00629
00630 reactor_op_queue<socket_type> read_op_queue_;
00631
00632
00633 reactor_op_queue<socket_type> write_op_queue_;
00634
00635
00636 reactor_op_queue<socket_type> except_op_queue_;
00637
00638
00639 std::vector<timer_queue_base*> timer_queues_;
00640
00641
00642
00643 std::vector<timer_queue_base*> timer_queues_for_cleanup_;
00644
00645
00646 std::vector<socket_type> pending_cancellations_;
00647
00648
00649 bool stop_thread_;
00650
00651
00652 asio::detail::thread* thread_;
00653
00654
00655 bool shutdown_;
00656 };
00657
00658 }
00659 }
00660
00661 #endif // defined(ASIO_HAS_DEV_POLL)
00662
00663 #include "asio/detail/pop_options.hpp"
00664
00665 #endif // ASIO_DETAIL_DEV_POLL_REACTOR_HPP