00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011 #ifndef ASIO_DETAIL_SELECT_REACTOR_HPP
00012 #define ASIO_DETAIL_SELECT_REACTOR_HPP
00013
00014 #if defined(_MSC_VER) && (_MSC_VER >= 1200)
00015 # pragma once
00016 #endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
00017
00018 #include "asio/detail/push_options.hpp"
00019
00020 #include "asio/detail/socket_types.hpp"
00021
00022 #include "asio/detail/push_options.hpp"
00023 #include <cstddef>
00024 #include <boost/config.hpp>
00025 #include <boost/date_time/posix_time/posix_time_types.hpp>
00026 #include <boost/shared_ptr.hpp>
00027 #include <vector>
00028 #include "asio/detail/pop_options.hpp"
00029
00030 #include "asio/io_service.hpp"
00031 #include "asio/detail/bind_handler.hpp"
00032 #include "asio/detail/fd_set_adapter.hpp"
00033 #include "asio/detail/mutex.hpp"
00034 #include "asio/detail/noncopyable.hpp"
00035 #include "asio/detail/reactor_op_queue.hpp"
00036 #include "asio/detail/select_interrupter.hpp"
00037 #include "asio/detail/select_reactor_fwd.hpp"
00038 #include "asio/detail/service_base.hpp"
00039 #include "asio/detail/signal_blocker.hpp"
00040 #include "asio/detail/socket_ops.hpp"
00041 #include "asio/detail/socket_types.hpp"
00042 #include "asio/detail/task_io_service.hpp"
00043 #include "asio/detail/thread.hpp"
00044 #include "asio/detail/timer_queue.hpp"
00045
00046 namespace asio {
00047 namespace detail {
00048
00049 template <bool Own_Thread>
00050 class select_reactor
00051 : public asio::detail::service_base<select_reactor<Own_Thread> >
00052 {
00053 public:
00054
00055 struct per_descriptor_data
00056 {
00057 };
00058
00059
00060 select_reactor(asio::io_service& io_service)
00061 : asio::detail::service_base<
00062 select_reactor<Own_Thread> >(io_service),
00063 mutex_(),
00064 select_in_progress_(false),
00065 interrupter_(),
00066 read_op_queue_(),
00067 write_op_queue_(),
00068 except_op_queue_(),
00069 pending_cancellations_(),
00070 stop_thread_(false),
00071 thread_(0),
00072 shutdown_(false)
00073 {
00074 if (Own_Thread)
00075 {
00076 asio::detail::signal_blocker sb;
00077 thread_ = new asio::detail::thread(
00078 bind_handler(&select_reactor::call_run_thread, this));
00079 }
00080 }
00081
00082
00083 ~select_reactor()
00084 {
00085 shutdown_service();
00086 }
00087
00088
00089 void shutdown_service()
00090 {
00091 asio::detail::mutex::scoped_lock lock(mutex_);
00092 shutdown_ = true;
00093 stop_thread_ = true;
00094 lock.unlock();
00095
00096 if (thread_)
00097 {
00098 interrupter_.interrupt();
00099 thread_->join();
00100 delete thread_;
00101 thread_ = 0;
00102 }
00103
00104 read_op_queue_.destroy_operations();
00105 write_op_queue_.destroy_operations();
00106 except_op_queue_.destroy_operations();
00107
00108 for (std::size_t i = 0; i < timer_queues_.size(); ++i)
00109 timer_queues_[i]->destroy_timers();
00110 timer_queues_.clear();
00111 }
00112
00113
00114
00115 int register_descriptor(socket_type, per_descriptor_data&)
00116 {
00117 return 0;
00118 }
00119
00120
00121
00122 template <typename Handler>
00123 void start_read_op(socket_type descriptor, per_descriptor_data&,
00124 Handler handler, bool = true)
00125 {
00126 asio::detail::mutex::scoped_lock lock(mutex_);
00127 if (!shutdown_)
00128 if (read_op_queue_.enqueue_operation(descriptor, handler))
00129 interrupter_.interrupt();
00130 }
00131
00132
00133
00134 template <typename Handler>
00135 void start_write_op(socket_type descriptor, per_descriptor_data&,
00136 Handler handler, bool = true)
00137 {
00138 asio::detail::mutex::scoped_lock lock(mutex_);
00139 if (!shutdown_)
00140 if (write_op_queue_.enqueue_operation(descriptor, handler))
00141 interrupter_.interrupt();
00142 }
00143
00144
00145
00146 template <typename Handler>
00147 void start_except_op(socket_type descriptor,
00148 per_descriptor_data&, Handler handler)
00149 {
00150 asio::detail::mutex::scoped_lock lock(mutex_);
00151 if (!shutdown_)
00152 if (except_op_queue_.enqueue_operation(descriptor, handler))
00153 interrupter_.interrupt();
00154 }
00155
00156
00157
00158
00159 template <typename Handler>
00160 class connect_handler_wrapper
00161 {
00162 public:
00163 connect_handler_wrapper(socket_type descriptor,
00164 boost::shared_ptr<bool> completed,
00165 select_reactor<Own_Thread>& reactor, Handler handler)
00166 : descriptor_(descriptor),
00167 completed_(completed),
00168 reactor_(reactor),
00169 handler_(handler)
00170 {
00171 }
00172
00173 bool perform(asio::error_code& ec,
00174 std::size_t& bytes_transferred)
00175 {
00176
00177
00178 if (*completed_)
00179 {
00180 completed_.reset();
00181 return true;
00182 }
00183
00184
00185 *completed_ = true;
00186 reactor_.enqueue_cancel_ops_unlocked(descriptor_);
00187
00188
00189 return handler_.perform(ec, bytes_transferred);
00190 }
00191
00192 void complete(const asio::error_code& ec,
00193 std::size_t bytes_transferred)
00194 {
00195 if (completed_.get())
00196 handler_.complete(ec, bytes_transferred);
00197 }
00198
00199 private:
00200 socket_type descriptor_;
00201 boost::shared_ptr<bool> completed_;
00202 select_reactor<Own_Thread>& reactor_;
00203 Handler handler_;
00204 };
00205
00206
00207
00208
00209
00210 template <typename Handler>
00211 void start_connect_op(socket_type descriptor,
00212 per_descriptor_data&, Handler handler)
00213 {
00214 asio::detail::mutex::scoped_lock lock(mutex_);
00215 if (!shutdown_)
00216 {
00217 boost::shared_ptr<bool> completed(new bool(false));
00218 connect_handler_wrapper<Handler> wrapped_handler(
00219 descriptor, completed, *this, handler);
00220 bool interrupt = write_op_queue_.enqueue_operation(
00221 descriptor, wrapped_handler);
00222 interrupt = except_op_queue_.enqueue_operation(
00223 descriptor, wrapped_handler) || interrupt;
00224 if (interrupt)
00225 interrupter_.interrupt();
00226 }
00227 }
00228
00229
00230
00231
00232 void cancel_ops(socket_type descriptor, per_descriptor_data&)
00233 {
00234 asio::detail::mutex::scoped_lock lock(mutex_);
00235 cancel_ops_unlocked(descriptor);
00236 }
00237
00238
00239
00240
00241
00242
00243 void enqueue_cancel_ops_unlocked(socket_type descriptor)
00244 {
00245 pending_cancellations_.push_back(descriptor);
00246 }
00247
00248
00249
00250 void close_descriptor(socket_type descriptor, per_descriptor_data&)
00251 {
00252 asio::detail::mutex::scoped_lock lock(mutex_);
00253 cancel_ops_unlocked(descriptor);
00254 }
00255
00256
00257 template <typename Time_Traits>
00258 void add_timer_queue(timer_queue<Time_Traits>& timer_queue)
00259 {
00260 asio::detail::mutex::scoped_lock lock(mutex_);
00261 timer_queues_.push_back(&timer_queue);
00262 }
00263
00264
00265 template <typename Time_Traits>
00266 void remove_timer_queue(timer_queue<Time_Traits>& timer_queue)
00267 {
00268 asio::detail::mutex::scoped_lock lock(mutex_);
00269 for (std::size_t i = 0; i < timer_queues_.size(); ++i)
00270 {
00271 if (timer_queues_[i] == &timer_queue)
00272 {
00273 timer_queues_.erase(timer_queues_.begin() + i);
00274 return;
00275 }
00276 }
00277 }
00278
00279
00280
00281 template <typename Time_Traits, typename Handler>
00282 void schedule_timer(timer_queue<Time_Traits>& timer_queue,
00283 const typename Time_Traits::time_type& time, Handler handler, void* token)
00284 {
00285 asio::detail::mutex::scoped_lock lock(mutex_);
00286 if (!shutdown_)
00287 if (timer_queue.enqueue_timer(time, handler, token))
00288 interrupter_.interrupt();
00289 }
00290
00291
00292
00293 template <typename Time_Traits>
00294 std::size_t cancel_timer(timer_queue<Time_Traits>& timer_queue, void* token)
00295 {
00296 asio::detail::mutex::scoped_lock lock(mutex_);
00297 std::size_t n = timer_queue.cancel_timer(token);
00298 if (n > 0)
00299 interrupter_.interrupt();
00300 return n;
00301 }
00302
00303 private:
00304 friend class task_io_service<select_reactor<Own_Thread> >;
00305
00306
00307 void run(bool block)
00308 {
00309 asio::detail::mutex::scoped_lock lock(mutex_);
00310
00311
00312
00313 read_op_queue_.perform_cancellations();
00314 write_op_queue_.perform_cancellations();
00315 except_op_queue_.perform_cancellations();
00316 for (std::size_t i = 0; i < timer_queues_.size(); ++i)
00317 timer_queues_[i]->dispatch_cancellations();
00318
00319
00320 if (stop_thread_)
00321 {
00322 complete_operations_and_timers(lock);
00323 return;
00324 }
00325
00326
00327
00328 if (!block && read_op_queue_.empty() && write_op_queue_.empty()
00329 && except_op_queue_.empty() && all_timer_queues_are_empty())
00330 {
00331 complete_operations_and_timers(lock);
00332 return;
00333 }
00334
00335
00336 fd_set_adapter read_fds;
00337 read_fds.set(interrupter_.read_descriptor());
00338 read_op_queue_.get_descriptors(read_fds);
00339 fd_set_adapter write_fds;
00340 write_op_queue_.get_descriptors(write_fds);
00341 fd_set_adapter except_fds;
00342 except_op_queue_.get_descriptors(except_fds);
00343 socket_type max_fd = read_fds.max_descriptor();
00344 if (write_fds.max_descriptor() > max_fd)
00345 max_fd = write_fds.max_descriptor();
00346 if (except_fds.max_descriptor() > max_fd)
00347 max_fd = except_fds.max_descriptor();
00348
00349
00350
00351 timeval tv_buf = { 0, 0 };
00352 timeval* tv = block ? get_timeout(tv_buf) : &tv_buf;
00353 select_in_progress_ = true;
00354 lock.unlock();
00355 asio::error_code ec;
00356 int retval = socket_ops::select(static_cast<int>(max_fd + 1),
00357 read_fds, write_fds, except_fds, tv, ec);
00358 lock.lock();
00359 select_in_progress_ = false;
00360
00361
00362 asio::detail::signal_blocker sb;
00363
00364
00365 if (retval > 0 && read_fds.is_set(interrupter_.read_descriptor()))
00366 interrupter_.reset();
00367
00368
00369 if (retval > 0)
00370 {
00371
00372
00373 except_op_queue_.perform_operations_for_descriptors(
00374 except_fds, asio::error_code());
00375 read_op_queue_.perform_operations_for_descriptors(
00376 read_fds, asio::error_code());
00377 write_op_queue_.perform_operations_for_descriptors(
00378 write_fds, asio::error_code());
00379 except_op_queue_.perform_cancellations();
00380 read_op_queue_.perform_cancellations();
00381 write_op_queue_.perform_cancellations();
00382 }
00383 for (std::size_t i = 0; i < timer_queues_.size(); ++i)
00384 {
00385 timer_queues_[i]->dispatch_timers();
00386 timer_queues_[i]->dispatch_cancellations();
00387 }
00388
00389
00390 for (size_t i = 0; i < pending_cancellations_.size(); ++i)
00391 cancel_ops_unlocked(pending_cancellations_[i]);
00392 pending_cancellations_.clear();
00393
00394 complete_operations_and_timers(lock);
00395 }
00396
00397
00398 void run_thread()
00399 {
00400 asio::detail::mutex::scoped_lock lock(mutex_);
00401 while (!stop_thread_)
00402 {
00403 lock.unlock();
00404 run(true);
00405 lock.lock();
00406 }
00407 }
00408
00409
00410 static void call_run_thread(select_reactor* reactor)
00411 {
00412 reactor->run_thread();
00413 }
00414
00415
00416 void interrupt()
00417 {
00418 interrupter_.interrupt();
00419 }
00420
00421
00422 bool all_timer_queues_are_empty() const
00423 {
00424 for (std::size_t i = 0; i < timer_queues_.size(); ++i)
00425 if (!timer_queues_[i]->empty())
00426 return false;
00427 return true;
00428 }
00429
00430
00431 timeval* get_timeout(timeval& tv)
00432 {
00433 if (all_timer_queues_are_empty())
00434 return 0;
00435
00436
00437
00438 boost::posix_time::time_duration minimum_wait_duration
00439 = boost::posix_time::minutes(5);
00440
00441 for (std::size_t i = 0; i < timer_queues_.size(); ++i)
00442 {
00443 boost::posix_time::time_duration wait_duration
00444 = timer_queues_[i]->wait_duration();
00445 if (wait_duration < minimum_wait_duration)
00446 minimum_wait_duration = wait_duration;
00447 }
00448
00449 if (minimum_wait_duration > boost::posix_time::time_duration())
00450 {
00451 tv.tv_sec = minimum_wait_duration.total_seconds();
00452 tv.tv_usec = minimum_wait_duration.total_microseconds() % 1000000;
00453 }
00454 else
00455 {
00456 tv.tv_sec = 0;
00457 tv.tv_usec = 0;
00458 }
00459
00460 return &tv;
00461 }
00462
00463
00464
00465
00466 void cancel_ops_unlocked(socket_type descriptor)
00467 {
00468 bool interrupt = read_op_queue_.cancel_operations(descriptor);
00469 interrupt = write_op_queue_.cancel_operations(descriptor) || interrupt;
00470 interrupt = except_op_queue_.cancel_operations(descriptor) || interrupt;
00471 if (interrupt)
00472 interrupter_.interrupt();
00473 }
00474
00475
00476
00477
00478
00479 void complete_operations_and_timers(
00480 asio::detail::mutex::scoped_lock& lock)
00481 {
00482 timer_queues_for_cleanup_ = timer_queues_;
00483 lock.unlock();
00484 read_op_queue_.complete_operations();
00485 write_op_queue_.complete_operations();
00486 except_op_queue_.complete_operations();
00487 for (std::size_t i = 0; i < timer_queues_for_cleanup_.size(); ++i)
00488 timer_queues_for_cleanup_[i]->complete_timers();
00489 }
00490
00491
00492 asio::detail::mutex mutex_;
00493
00494
00495 bool select_in_progress_;
00496
00497
00498 select_interrupter interrupter_;
00499
00500
00501 reactor_op_queue<socket_type> read_op_queue_;
00502
00503
00504 reactor_op_queue<socket_type> write_op_queue_;
00505
00506
00507 reactor_op_queue<socket_type> except_op_queue_;
00508
00509
00510 std::vector<timer_queue_base*> timer_queues_;
00511
00512
00513
00514 std::vector<timer_queue_base*> timer_queues_for_cleanup_;
00515
00516
00517 std::vector<socket_type> pending_cancellations_;
00518
00519
00520 bool stop_thread_;
00521
00522
00523 asio::detail::thread* thread_;
00524
00525
00526 bool shutdown_;
00527 };
00528
00529 }
00530 }
00531
00532 #include "asio/detail/pop_options.hpp"
00533
00534 #endif // ASIO_DETAIL_SELECT_REACTOR_HPP