00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012 #ifndef ASIO_DETAIL_KQUEUE_REACTOR_HPP
00013 #define ASIO_DETAIL_KQUEUE_REACTOR_HPP
00014
00015 #if defined(_MSC_VER) && (_MSC_VER >= 1200)
00016 # pragma once
00017 #endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
00018
00019 #include "asio/detail/push_options.hpp"
00020
00021 #include "asio/detail/kqueue_reactor_fwd.hpp"
00022
00023 #if defined(ASIO_HAS_KQUEUE)
00024
00025 #include "asio/detail/push_options.hpp"
00026 #include <cstddef>
00027 #include <vector>
00028 #include <sys/types.h>
00029 #include <sys/event.h>
00030 #include <sys/time.h>
00031 #include <boost/config.hpp>
00032 #include <boost/date_time/posix_time/posix_time_types.hpp>
00033 #include <boost/throw_exception.hpp>
00034 #include "asio/detail/pop_options.hpp"
00035
00036 #include "asio/error.hpp"
00037 #include "asio/io_service.hpp"
00038 #include "asio/system_error.hpp"
00039 #include "asio/detail/bind_handler.hpp"
00040 #include "asio/detail/mutex.hpp"
00041 #include "asio/detail/task_io_service.hpp"
00042 #include "asio/detail/thread.hpp"
00043 #include "asio/detail/reactor_op_queue.hpp"
00044 #include "asio/detail/select_interrupter.hpp"
00045 #include "asio/detail/service_base.hpp"
00046 #include "asio/detail/signal_blocker.hpp"
00047 #include "asio/detail/socket_types.hpp"
00048 #include "asio/detail/timer_queue.hpp"
00049
00050
00051 #if !defined(EV_OOBAND)
00052 # define EV_OOBAND EV_FLAG1
00053 #endif // !defined(EV_OOBAND)
00054
00055 namespace asio {
00056 namespace detail {
00057
00058 template <bool Own_Thread>
00059 class kqueue_reactor
00060 : public asio::detail::service_base<kqueue_reactor<Own_Thread> >
00061 {
00062 public:
00063
00064 struct per_descriptor_data
00065 {
00066 bool allow_speculative_read;
00067 bool allow_speculative_write;
00068 };
00069
00070
00071 kqueue_reactor(asio::io_service& io_service)
00072 : asio::detail::service_base<
00073 kqueue_reactor<Own_Thread> >(io_service),
00074 mutex_(),
00075 kqueue_fd_(do_kqueue_create()),
00076 wait_in_progress_(false),
00077 interrupter_(),
00078 read_op_queue_(),
00079 write_op_queue_(),
00080 except_op_queue_(),
00081 pending_cancellations_(),
00082 stop_thread_(false),
00083 thread_(0),
00084 shutdown_(false),
00085 need_kqueue_wait_(true)
00086 {
00087
00088 if (Own_Thread)
00089 {
00090 asio::detail::signal_blocker sb;
00091 thread_ = new asio::detail::thread(
00092 bind_handler(&kqueue_reactor::call_run_thread, this));
00093 }
00094
00095
00096 struct kevent event;
00097 EV_SET(&event, interrupter_.read_descriptor(),
00098 EVFILT_READ, EV_ADD, 0, 0, 0);
00099 ::kevent(kqueue_fd_, &event, 1, 0, 0, 0);
00100 }
00101
00102
00103 ~kqueue_reactor()
00104 {
00105 shutdown_service();
00106 close(kqueue_fd_);
00107 }
00108
00109
00110 void shutdown_service()
00111 {
00112 asio::detail::mutex::scoped_lock lock(mutex_);
00113 shutdown_ = true;
00114 stop_thread_ = true;
00115 lock.unlock();
00116
00117 if (thread_)
00118 {
00119 interrupter_.interrupt();
00120 thread_->join();
00121 delete thread_;
00122 thread_ = 0;
00123 }
00124
00125 read_op_queue_.destroy_operations();
00126 write_op_queue_.destroy_operations();
00127 except_op_queue_.destroy_operations();
00128
00129 for (std::size_t i = 0; i < timer_queues_.size(); ++i)
00130 timer_queues_[i]->destroy_timers();
00131 timer_queues_.clear();
00132 }
00133
00134
00135
00136 int register_descriptor(socket_type, per_descriptor_data& descriptor_data)
00137 {
00138 descriptor_data.allow_speculative_read = true;
00139 descriptor_data.allow_speculative_write = true;
00140
00141 return 0;
00142 }
00143
00144
00145
00146 template <typename Handler>
00147 void start_read_op(socket_type descriptor,
00148 per_descriptor_data& descriptor_data, Handler handler,
00149 bool allow_speculative_read = true)
00150 {
00151 if (allow_speculative_read && descriptor_data.allow_speculative_read)
00152 {
00153 asio::error_code ec;
00154 std::size_t bytes_transferred = 0;
00155 if (handler.perform(ec, bytes_transferred))
00156 {
00157 handler.complete(ec, bytes_transferred);
00158 return;
00159 }
00160
00161
00162 allow_speculative_read = false;
00163 }
00164
00165 asio::detail::mutex::scoped_lock lock(mutex_);
00166
00167 if (shutdown_)
00168 return;
00169
00170 if (!allow_speculative_read)
00171 need_kqueue_wait_ = true;
00172 else if (!read_op_queue_.has_operation(descriptor))
00173 {
00174
00175 descriptor_data.allow_speculative_read = true;
00176
00177 asio::error_code ec;
00178 std::size_t bytes_transferred = 0;
00179 if (handler.perform(ec, bytes_transferred))
00180 {
00181 handler.complete(ec, bytes_transferred);
00182 return;
00183 }
00184 }
00185
00186
00187 descriptor_data.allow_speculative_read = false;
00188
00189 if (read_op_queue_.enqueue_operation(descriptor, handler))
00190 {
00191 struct kevent event;
00192 EV_SET(&event, descriptor, EVFILT_READ, EV_ADD, 0, 0, 0);
00193 if (::kevent(kqueue_fd_, &event, 1, 0, 0, 0) == -1)
00194 {
00195 asio::error_code ec(errno,
00196 asio::error::get_system_category());
00197 read_op_queue_.perform_all_operations(descriptor, ec);
00198 }
00199 }
00200 }
00201
00202
00203
00204 template <typename Handler>
00205 void start_write_op(socket_type descriptor,
00206 per_descriptor_data& descriptor_data, Handler handler,
00207 bool allow_speculative_write = true)
00208 {
00209 if (allow_speculative_write && descriptor_data.allow_speculative_write)
00210 {
00211 asio::error_code ec;
00212 std::size_t bytes_transferred = 0;
00213 if (handler.perform(ec, bytes_transferred))
00214 {
00215 handler.complete(ec, bytes_transferred);
00216 return;
00217 }
00218
00219
00220 allow_speculative_write = false;
00221 }
00222
00223 asio::detail::mutex::scoped_lock lock(mutex_);
00224
00225 if (shutdown_)
00226 return;
00227
00228 if (!allow_speculative_write)
00229 need_kqueue_wait_ = true;
00230 else if (!write_op_queue_.has_operation(descriptor))
00231 {
00232
00233 descriptor_data.allow_speculative_write = true;
00234
00235 asio::error_code ec;
00236 std::size_t bytes_transferred = 0;
00237 if (handler.perform(ec, bytes_transferred))
00238 {
00239 handler.complete(ec, bytes_transferred);
00240 return;
00241 }
00242 }
00243
00244
00245 descriptor_data.allow_speculative_write = false;
00246
00247 if (write_op_queue_.enqueue_operation(descriptor, handler))
00248 {
00249 struct kevent event;
00250 EV_SET(&event, descriptor, EVFILT_WRITE, EV_ADD, 0, 0, 0);
00251 if (::kevent(kqueue_fd_, &event, 1, 0, 0, 0) == -1)
00252 {
00253 asio::error_code ec(errno,
00254 asio::error::get_system_category());
00255 write_op_queue_.perform_all_operations(descriptor, ec);
00256 }
00257 }
00258 }
00259
00260
00261
00262 template <typename Handler>
00263 void start_except_op(socket_type descriptor,
00264 per_descriptor_data&, Handler handler)
00265 {
00266 asio::detail::mutex::scoped_lock lock(mutex_);
00267
00268 if (shutdown_)
00269 return;
00270
00271 if (except_op_queue_.enqueue_operation(descriptor, handler))
00272 {
00273 struct kevent event;
00274 if (read_op_queue_.has_operation(descriptor))
00275 EV_SET(&event, descriptor, EVFILT_READ, EV_ADD, 0, 0, 0);
00276 else
00277 EV_SET(&event, descriptor, EVFILT_READ, EV_ADD, EV_OOBAND, 0, 0);
00278 if (::kevent(kqueue_fd_, &event, 1, 0, 0, 0) == -1)
00279 {
00280 asio::error_code ec(errno,
00281 asio::error::get_system_category());
00282 except_op_queue_.perform_all_operations(descriptor, ec);
00283 }
00284 }
00285 }
00286
00287
00288
00289 template <typename Handler>
00290 void start_connect_op(socket_type descriptor,
00291 per_descriptor_data& descriptor_data, Handler handler)
00292 {
00293 asio::detail::mutex::scoped_lock lock(mutex_);
00294
00295 if (shutdown_)
00296 return;
00297
00298
00299 descriptor_data.allow_speculative_write = false;
00300
00301 if (write_op_queue_.enqueue_operation(descriptor, handler))
00302 {
00303 struct kevent event;
00304 EV_SET(&event, descriptor, EVFILT_WRITE, EV_ADD, 0, 0, 0);
00305 if (::kevent(kqueue_fd_, &event, 1, 0, 0, 0) == -1)
00306 {
00307 asio::error_code ec(errno,
00308 asio::error::get_system_category());
00309 write_op_queue_.perform_all_operations(descriptor, ec);
00310 }
00311 }
00312 }
00313
00314
00315
00316
00317 void cancel_ops(socket_type descriptor, per_descriptor_data&)
00318 {
00319 asio::detail::mutex::scoped_lock lock(mutex_);
00320 cancel_ops_unlocked(descriptor);
00321 }
00322
00323
00324
00325 void close_descriptor(socket_type descriptor, per_descriptor_data&)
00326 {
00327 asio::detail::mutex::scoped_lock lock(mutex_);
00328
00329
00330 struct kevent event[2];
00331 EV_SET(&event[0], descriptor, EVFILT_READ, EV_DELETE, 0, 0, 0);
00332 EV_SET(&event[1], descriptor, EVFILT_WRITE, EV_DELETE, 0, 0, 0);
00333 ::kevent(kqueue_fd_, event, 2, 0, 0, 0);
00334
00335
00336 cancel_ops_unlocked(descriptor);
00337 }
00338
00339
00340 template <typename Time_Traits>
00341 void add_timer_queue(timer_queue<Time_Traits>& timer_queue)
00342 {
00343 asio::detail::mutex::scoped_lock lock(mutex_);
00344 timer_queues_.push_back(&timer_queue);
00345 }
00346
00347
00348 template <typename Time_Traits>
00349 void remove_timer_queue(timer_queue<Time_Traits>& timer_queue)
00350 {
00351 asio::detail::mutex::scoped_lock lock(mutex_);
00352 for (std::size_t i = 0; i < timer_queues_.size(); ++i)
00353 {
00354 if (timer_queues_[i] == &timer_queue)
00355 {
00356 timer_queues_.erase(timer_queues_.begin() + i);
00357 return;
00358 }
00359 }
00360 }
00361
00362
00363
00364 template <typename Time_Traits, typename Handler>
00365 void schedule_timer(timer_queue<Time_Traits>& timer_queue,
00366 const typename Time_Traits::time_type& time, Handler handler, void* token)
00367 {
00368 asio::detail::mutex::scoped_lock lock(mutex_);
00369 if (!shutdown_)
00370 if (timer_queue.enqueue_timer(time, handler, token))
00371 interrupter_.interrupt();
00372 }
00373
00374
00375
00376 template <typename Time_Traits>
00377 std::size_t cancel_timer(timer_queue<Time_Traits>& timer_queue, void* token)
00378 {
00379 asio::detail::mutex::scoped_lock lock(mutex_);
00380 std::size_t n = timer_queue.cancel_timer(token);
00381 if (n > 0)
00382 interrupter_.interrupt();
00383 return n;
00384 }
00385
00386 private:
00387 friend class task_io_service<kqueue_reactor<Own_Thread> >;
00388
00389
00390 void run(bool block)
00391 {
00392 asio::detail::mutex::scoped_lock lock(mutex_);
00393
00394
00395
00396 read_op_queue_.perform_cancellations();
00397 write_op_queue_.perform_cancellations();
00398 except_op_queue_.perform_cancellations();
00399 for (std::size_t i = 0; i < timer_queues_.size(); ++i)
00400 timer_queues_[i]->dispatch_cancellations();
00401
00402
00403 if (stop_thread_)
00404 {
00405 complete_operations_and_timers(lock);
00406 return;
00407 }
00408
00409
00410
00411 if (!block && read_op_queue_.empty() && write_op_queue_.empty()
00412 && except_op_queue_.empty() && all_timer_queues_are_empty())
00413 {
00414 complete_operations_and_timers(lock);
00415 return;
00416 }
00417
00418
00419 timespec timeout_buf = { 0, 0 };
00420 timespec* timeout = block ? get_timeout(timeout_buf) : &timeout_buf;
00421
00422 wait_in_progress_ = true;
00423 lock.unlock();
00424
00425
00426 struct kevent events[128];
00427 int num_events = (block || need_kqueue_wait_)
00428 ? kevent(kqueue_fd_, 0, 0, events, 128, timeout)
00429 : 0;
00430
00431 lock.lock();
00432 wait_in_progress_ = false;
00433
00434
00435 asio::detail::signal_blocker sb;
00436
00437
00438 for (int i = 0; i < num_events; ++i)
00439 {
00440 int descriptor = events[i].ident;
00441 if (descriptor == interrupter_.read_descriptor())
00442 {
00443 interrupter_.reset();
00444 }
00445 else if (events[i].filter == EVFILT_READ)
00446 {
00447
00448 bool more_reads = false;
00449 bool more_except = false;
00450 if (events[i].flags & EV_ERROR)
00451 {
00452 asio::error_code error(
00453 events[i].data, asio::error::get_system_category());
00454 except_op_queue_.perform_all_operations(descriptor, error);
00455 read_op_queue_.perform_all_operations(descriptor, error);
00456 }
00457 else if (events[i].flags & EV_OOBAND)
00458 {
00459 asio::error_code error;
00460 more_except = except_op_queue_.perform_operation(descriptor, error);
00461 if (events[i].data > 0)
00462 more_reads = read_op_queue_.perform_operation(descriptor, error);
00463 else
00464 more_reads = read_op_queue_.has_operation(descriptor);
00465 }
00466 else
00467 {
00468 asio::error_code error;
00469 more_reads = read_op_queue_.perform_operation(descriptor, error);
00470 more_except = except_op_queue_.has_operation(descriptor);
00471 }
00472
00473
00474 struct kevent event;
00475 if (more_reads)
00476 EV_SET(&event, descriptor, EVFILT_READ, EV_ADD, 0, 0, 0);
00477 else if (more_except)
00478 EV_SET(&event, descriptor, EVFILT_READ, EV_ADD, EV_OOBAND, 0, 0);
00479 else
00480 EV_SET(&event, descriptor, EVFILT_READ, EV_DELETE, 0, 0, 0);
00481 if (::kevent(kqueue_fd_, &event, 1, 0, 0, 0) == -1)
00482 {
00483 asio::error_code error(errno,
00484 asio::error::get_system_category());
00485 except_op_queue_.perform_all_operations(descriptor, error);
00486 read_op_queue_.perform_all_operations(descriptor, error);
00487 }
00488 }
00489 else if (events[i].filter == EVFILT_WRITE)
00490 {
00491
00492 bool more_writes = false;
00493 if (events[i].flags & EV_ERROR)
00494 {
00495 asio::error_code error(
00496 events[i].data, asio::error::get_system_category());
00497 write_op_queue_.perform_all_operations(descriptor, error);
00498 }
00499 else
00500 {
00501 asio::error_code error;
00502 more_writes = write_op_queue_.perform_operation(descriptor, error);
00503 }
00504
00505
00506 struct kevent event;
00507 if (more_writes)
00508 EV_SET(&event, descriptor, EVFILT_WRITE, EV_ADD, 0, 0, 0);
00509 else
00510 EV_SET(&event, descriptor, EVFILT_WRITE, EV_DELETE, 0, 0, 0);
00511 if (::kevent(kqueue_fd_, &event, 1, 0, 0, 0) == -1)
00512 {
00513 asio::error_code error(errno,
00514 asio::error::get_system_category());
00515 write_op_queue_.perform_all_operations(descriptor, error);
00516 }
00517 }
00518 }
00519
00520 read_op_queue_.perform_cancellations();
00521 write_op_queue_.perform_cancellations();
00522 except_op_queue_.perform_cancellations();
00523 for (std::size_t i = 0; i < timer_queues_.size(); ++i)
00524 {
00525 timer_queues_[i]->dispatch_timers();
00526 timer_queues_[i]->dispatch_cancellations();
00527 }
00528
00529
00530 for (std::size_t i = 0; i < pending_cancellations_.size(); ++i)
00531 cancel_ops_unlocked(pending_cancellations_[i]);
00532 pending_cancellations_.clear();
00533
00534
00535 need_kqueue_wait_ = !read_op_queue_.empty()
00536 || !write_op_queue_.empty() || !except_op_queue_.empty();
00537
00538 complete_operations_and_timers(lock);
00539 }
00540
00541
00542 void run_thread()
00543 {
00544 asio::detail::mutex::scoped_lock lock(mutex_);
00545 while (!stop_thread_)
00546 {
00547 lock.unlock();
00548 run(true);
00549 lock.lock();
00550 }
00551 }
00552
00553
00554 static void call_run_thread(kqueue_reactor* reactor)
00555 {
00556 reactor->run_thread();
00557 }
00558
00559
00560 void interrupt()
00561 {
00562 interrupter_.interrupt();
00563 }
00564
00565
00566
00567 static int do_kqueue_create()
00568 {
00569 int fd = kqueue();
00570 if (fd == -1)
00571 {
00572 boost::throw_exception(
00573 asio::system_error(
00574 asio::error_code(errno,
00575 asio::error::get_system_category()),
00576 "kqueue"));
00577 }
00578 return fd;
00579 }
00580
00581
00582 bool all_timer_queues_are_empty() const
00583 {
00584 for (std::size_t i = 0; i < timer_queues_.size(); ++i)
00585 if (!timer_queues_[i]->empty())
00586 return false;
00587 return true;
00588 }
00589
00590
00591 timespec* get_timeout(timespec& ts)
00592 {
00593 if (all_timer_queues_are_empty())
00594 return 0;
00595
00596
00597
00598 boost::posix_time::time_duration minimum_wait_duration
00599 = boost::posix_time::minutes(5);
00600
00601 for (std::size_t i = 0; i < timer_queues_.size(); ++i)
00602 {
00603 boost::posix_time::time_duration wait_duration
00604 = timer_queues_[i]->wait_duration();
00605 if (wait_duration < minimum_wait_duration)
00606 minimum_wait_duration = wait_duration;
00607 }
00608
00609 if (minimum_wait_duration > boost::posix_time::time_duration())
00610 {
00611 ts.tv_sec = minimum_wait_duration.total_seconds();
00612 ts.tv_nsec = minimum_wait_duration.total_nanoseconds() % 1000000000;
00613 }
00614 else
00615 {
00616 ts.tv_sec = 0;
00617 ts.tv_nsec = 0;
00618 }
00619
00620 return &ts;
00621 }
00622
00623
00624
00625
00626 void cancel_ops_unlocked(socket_type descriptor)
00627 {
00628 bool interrupt = read_op_queue_.cancel_operations(descriptor);
00629 interrupt = write_op_queue_.cancel_operations(descriptor) || interrupt;
00630 interrupt = except_op_queue_.cancel_operations(descriptor) || interrupt;
00631 if (interrupt)
00632 interrupter_.interrupt();
00633 }
00634
00635
00636
00637
00638
00639 void complete_operations_and_timers(
00640 asio::detail::mutex::scoped_lock& lock)
00641 {
00642 timer_queues_for_cleanup_ = timer_queues_;
00643 lock.unlock();
00644 read_op_queue_.complete_operations();
00645 write_op_queue_.complete_operations();
00646 except_op_queue_.complete_operations();
00647 for (std::size_t i = 0; i < timer_queues_for_cleanup_.size(); ++i)
00648 timer_queues_for_cleanup_[i]->complete_timers();
00649 }
00650
00651
00652 asio::detail::mutex mutex_;
00653
00654
00655 int kqueue_fd_;
00656
00657
00658 bool wait_in_progress_;
00659
00660
00661 select_interrupter interrupter_;
00662
00663
00664 reactor_op_queue<socket_type> read_op_queue_;
00665
00666
00667 reactor_op_queue<socket_type> write_op_queue_;
00668
00669
00670 reactor_op_queue<socket_type> except_op_queue_;
00671
00672
00673 std::vector<timer_queue_base*> timer_queues_;
00674
00675
00676
00677 std::vector<timer_queue_base*> timer_queues_for_cleanup_;
00678
00679
00680 std::vector<socket_type> pending_cancellations_;
00681
00682
00683 bool stop_thread_;
00684
00685
00686 asio::detail::thread* thread_;
00687
00688
00689 bool shutdown_;
00690
00691
00692 bool need_kqueue_wait_;
00693 };
00694
00695 }
00696 }
00697
00698 #endif // defined(ASIO_HAS_KQUEUE)
00699
00700 #include "asio/detail/pop_options.hpp"
00701
00702 #endif // ASIO_DETAIL_KQUEUE_REACTOR_HPP