$search
00001 // 00002 // reactive_descriptor_service.hpp 00003 // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 00004 // 00005 // Copyright (c) 2003-2008 Christopher M. Kohlhoff (chris at kohlhoff dot com) 00006 // 00007 // Distributed under the Boost Software License, Version 1.0. (See accompanying 00008 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) 00009 // 00010 00011 #ifndef ASIO_DETAIL_REACTIVE_DESCRIPTOR_SERVICE_HPP 00012 #define ASIO_DETAIL_REACTIVE_DESCRIPTOR_SERVICE_HPP 00013 00014 #if defined(_MSC_VER) && (_MSC_VER >= 1200) 00015 # pragma once 00016 #endif // defined(_MSC_VER) && (_MSC_VER >= 1200) 00017 00018 #include "asio/detail/push_options.hpp" 00019 00020 #include "asio/buffer.hpp" 00021 #include "asio/error.hpp" 00022 #include "asio/io_service.hpp" 00023 #include "asio/detail/bind_handler.hpp" 00024 #include "asio/detail/handler_base_from_member.hpp" 00025 #include "asio/detail/noncopyable.hpp" 00026 #include "asio/detail/service_base.hpp" 00027 #include "asio/detail/descriptor_ops.hpp" 00028 00029 #if !defined(BOOST_WINDOWS) && !defined(__CYGWIN__) 00030 00031 namespace asio { 00032 namespace detail { 00033 00034 template <typename Reactor> 00035 class reactive_descriptor_service 00036 : public asio::detail::service_base< 00037 reactive_descriptor_service<Reactor> > 00038 { 00039 public: 00040 // The native type of a descriptor. 00041 typedef int native_type; 00042 00043 // The implementation type of the descriptor. 00044 class implementation_type 00045 : private asio::detail::noncopyable 00046 { 00047 public: 00048 // Default constructor. 00049 implementation_type() 00050 : descriptor_(-1), 00051 flags_(0) 00052 { 00053 } 00054 00055 private: 00056 // Only this service will have access to the internal values. 00057 friend class reactive_descriptor_service<Reactor>; 00058 00059 // The native descriptor representation. 00060 int descriptor_; 00061 00062 enum 00063 { 00064 user_set_non_blocking = 1, // The user wants a non-blocking descriptor. 00065 internal_non_blocking = 2 // The descriptor has been set non-blocking. 00066 }; 00067 00068 // Flags indicating the current state of the descriptor. 00069 unsigned char flags_; 00070 00071 // Per-descriptor data used by the reactor. 00072 typename Reactor::per_descriptor_data reactor_data_; 00073 }; 00074 00075 // The maximum number of buffers to support in a single operation. 00076 enum { max_buffers = 64 < max_iov_len ? 64 : max_iov_len }; 00077 00078 // Constructor. 00079 reactive_descriptor_service(asio::io_service& io_service) 00080 : asio::detail::service_base< 00081 reactive_descriptor_service<Reactor> >(io_service), 00082 reactor_(asio::use_service<Reactor>(io_service)) 00083 { 00084 } 00085 00086 // Destroy all user-defined handler objects owned by the service. 00087 void shutdown_service() 00088 { 00089 } 00090 00091 // Construct a new descriptor implementation. 00092 void construct(implementation_type& impl) 00093 { 00094 impl.descriptor_ = -1; 00095 impl.flags_ = 0; 00096 } 00097 00098 // Destroy a descriptor implementation. 00099 void destroy(implementation_type& impl) 00100 { 00101 if (impl.descriptor_ != -1) 00102 { 00103 reactor_.close_descriptor(impl.descriptor_, impl.reactor_data_); 00104 00105 if (impl.flags_ & implementation_type::internal_non_blocking) 00106 { 00107 ioctl_arg_type non_blocking = 0; 00108 asio::error_code ignored_ec; 00109 descriptor_ops::ioctl(impl.descriptor_, 00110 FIONBIO, &non_blocking, ignored_ec); 00111 impl.flags_ &= ~implementation_type::internal_non_blocking; 00112 } 00113 00114 asio::error_code ignored_ec; 00115 descriptor_ops::close(impl.descriptor_, ignored_ec); 00116 00117 impl.descriptor_ = -1; 00118 } 00119 } 00120 00121 // Assign a native descriptor to a descriptor implementation. 00122 asio::error_code assign(implementation_type& impl, 00123 const native_type& native_descriptor, asio::error_code& ec) 00124 { 00125 if (is_open(impl)) 00126 { 00127 ec = asio::error::already_open; 00128 return ec; 00129 } 00130 00131 if (int err = reactor_.register_descriptor( 00132 native_descriptor, impl.reactor_data_)) 00133 { 00134 ec = asio::error_code(err, 00135 asio::error::get_system_category()); 00136 return ec; 00137 } 00138 00139 impl.descriptor_ = native_descriptor; 00140 impl.flags_ = 0; 00141 ec = asio::error_code(); 00142 return ec; 00143 } 00144 00145 // Determine whether the descriptor is open. 00146 bool is_open(const implementation_type& impl) const 00147 { 00148 return impl.descriptor_ != -1; 00149 } 00150 00151 // Destroy a descriptor implementation. 00152 asio::error_code close(implementation_type& impl, 00153 asio::error_code& ec) 00154 { 00155 if (is_open(impl)) 00156 { 00157 reactor_.close_descriptor(impl.descriptor_, impl.reactor_data_); 00158 00159 if (impl.flags_ & implementation_type::internal_non_blocking) 00160 { 00161 ioctl_arg_type non_blocking = 0; 00162 asio::error_code ignored_ec; 00163 descriptor_ops::ioctl(impl.descriptor_, 00164 FIONBIO, &non_blocking, ignored_ec); 00165 impl.flags_ &= ~implementation_type::internal_non_blocking; 00166 } 00167 00168 if (descriptor_ops::close(impl.descriptor_, ec) == -1) 00169 return ec; 00170 00171 impl.descriptor_ = -1; 00172 } 00173 00174 ec = asio::error_code(); 00175 return ec; 00176 } 00177 00178 // Get the native descriptor representation. 00179 native_type native(const implementation_type& impl) const 00180 { 00181 return impl.descriptor_; 00182 } 00183 00184 // Cancel all operations associated with the descriptor. 00185 asio::error_code cancel(implementation_type& impl, 00186 asio::error_code& ec) 00187 { 00188 if (!is_open(impl)) 00189 { 00190 ec = asio::error::bad_descriptor; 00191 return ec; 00192 } 00193 00194 reactor_.cancel_ops(impl.descriptor_, impl.reactor_data_); 00195 ec = asio::error_code(); 00196 return ec; 00197 } 00198 00199 // Perform an IO control command on the descriptor. 00200 template <typename IO_Control_Command> 00201 asio::error_code io_control(implementation_type& impl, 00202 IO_Control_Command& command, asio::error_code& ec) 00203 { 00204 if (!is_open(impl)) 00205 { 00206 ec = asio::error::bad_descriptor; 00207 return ec; 00208 } 00209 00210 if (command.name() == static_cast<int>(FIONBIO)) 00211 { 00212 if (command.get()) 00213 impl.flags_ |= implementation_type::user_set_non_blocking; 00214 else 00215 impl.flags_ &= ~implementation_type::user_set_non_blocking; 00216 ec = asio::error_code(); 00217 } 00218 else 00219 { 00220 descriptor_ops::ioctl(impl.descriptor_, command.name(), 00221 static_cast<ioctl_arg_type*>(command.data()), ec); 00222 } 00223 return ec; 00224 } 00225 00226 // Write some data to the descriptor. 00227 template <typename ConstBufferSequence> 00228 size_t write_some(implementation_type& impl, 00229 const ConstBufferSequence& buffers, asio::error_code& ec) 00230 { 00231 if (!is_open(impl)) 00232 { 00233 ec = asio::error::bad_descriptor; 00234 return 0; 00235 } 00236 00237 // Copy buffers into array. 00238 descriptor_ops::buf bufs[max_buffers]; 00239 typename ConstBufferSequence::const_iterator iter = buffers.begin(); 00240 typename ConstBufferSequence::const_iterator end = buffers.end(); 00241 size_t i = 0; 00242 size_t total_buffer_size = 0; 00243 for (; iter != end && i < max_buffers; ++iter, ++i) 00244 { 00245 asio::const_buffer buffer(*iter); 00246 descriptor_ops::init_buf(bufs[i], 00247 asio::buffer_cast<const void*>(buffer), 00248 asio::buffer_size(buffer)); 00249 total_buffer_size += asio::buffer_size(buffer); 00250 } 00251 00252 // A request to read_some 0 bytes on a stream is a no-op. 00253 if (total_buffer_size == 0) 00254 { 00255 ec = asio::error_code(); 00256 return 0; 00257 } 00258 00259 // Make descriptor non-blocking if user wants non-blocking. 00260 if (impl.flags_ & implementation_type::user_set_non_blocking) 00261 { 00262 if (!(impl.flags_ & implementation_type::internal_non_blocking)) 00263 { 00264 ioctl_arg_type non_blocking = 1; 00265 if (descriptor_ops::ioctl(impl.descriptor_, 00266 FIONBIO, &non_blocking, ec)) 00267 return 0; 00268 impl.flags_ |= implementation_type::internal_non_blocking; 00269 } 00270 } 00271 00272 // Send the data. 00273 for (;;) 00274 { 00275 // Try to complete the operation without blocking. 00276 int bytes_sent = descriptor_ops::gather_write( 00277 impl.descriptor_, bufs, i, ec); 00278 00279 // Check if operation succeeded. 00280 if (bytes_sent >= 0) 00281 return bytes_sent; 00282 00283 // Operation failed. 00284 if ((impl.flags_ & implementation_type::user_set_non_blocking) 00285 || (ec != asio::error::would_block 00286 && ec != asio::error::try_again)) 00287 return 0; 00288 00289 // Wait for descriptor to become ready. 00290 if (descriptor_ops::poll_write(impl.descriptor_, ec) < 0) 00291 return 0; 00292 } 00293 } 00294 00295 // Wait until data can be written without blocking. 00296 size_t write_some(implementation_type& impl, 00297 const null_buffers&, asio::error_code& ec) 00298 { 00299 if (!is_open(impl)) 00300 { 00301 ec = asio::error::bad_descriptor; 00302 return 0; 00303 } 00304 00305 // Wait for descriptor to become ready. 00306 descriptor_ops::poll_write(impl.descriptor_, ec); 00307 00308 return 0; 00309 } 00310 00311 template <typename ConstBufferSequence, typename Handler> 00312 class write_operation : 00313 public handler_base_from_member<Handler> 00314 { 00315 public: 00316 write_operation(int descriptor, asio::io_service& io_service, 00317 const ConstBufferSequence& buffers, Handler handler) 00318 : handler_base_from_member<Handler>(handler), 00319 descriptor_(descriptor), 00320 io_service_(io_service), 00321 work_(io_service), 00322 buffers_(buffers) 00323 { 00324 } 00325 00326 bool perform(asio::error_code& ec, 00327 std::size_t& bytes_transferred) 00328 { 00329 // Check whether the operation was successful. 00330 if (ec) 00331 { 00332 bytes_transferred = 0; 00333 return true; 00334 } 00335 00336 // Copy buffers into array. 00337 descriptor_ops::buf bufs[max_buffers]; 00338 typename ConstBufferSequence::const_iterator iter = buffers_.begin(); 00339 typename ConstBufferSequence::const_iterator end = buffers_.end(); 00340 size_t i = 0; 00341 for (; iter != end && i < max_buffers; ++iter, ++i) 00342 { 00343 asio::const_buffer buffer(*iter); 00344 descriptor_ops::init_buf(bufs[i], 00345 asio::buffer_cast<const void*>(buffer), 00346 asio::buffer_size(buffer)); 00347 } 00348 00349 // Write the data. 00350 int bytes = descriptor_ops::gather_write(descriptor_, bufs, i, ec); 00351 00352 // Check if we need to run the operation again. 00353 if (ec == asio::error::would_block 00354 || ec == asio::error::try_again) 00355 return false; 00356 00357 bytes_transferred = (bytes < 0 ? 0 : bytes); 00358 return true; 00359 } 00360 00361 void complete(const asio::error_code& ec, 00362 std::size_t bytes_transferred) 00363 { 00364 io_service_.post(bind_handler(this->handler_, ec, bytes_transferred)); 00365 } 00366 00367 private: 00368 int descriptor_; 00369 asio::io_service& io_service_; 00370 asio::io_service::work work_; 00371 ConstBufferSequence buffers_; 00372 }; 00373 00374 // Start an asynchronous write. The data being sent must be valid for the 00375 // lifetime of the asynchronous operation. 00376 template <typename ConstBufferSequence, typename Handler> 00377 void async_write_some(implementation_type& impl, 00378 const ConstBufferSequence& buffers, Handler handler) 00379 { 00380 if (!is_open(impl)) 00381 { 00382 this->get_io_service().post(bind_handler(handler, 00383 asio::error::bad_descriptor, 0)); 00384 } 00385 else 00386 { 00387 // Determine total size of buffers. 00388 typename ConstBufferSequence::const_iterator iter = buffers.begin(); 00389 typename ConstBufferSequence::const_iterator end = buffers.end(); 00390 size_t i = 0; 00391 size_t total_buffer_size = 0; 00392 for (; iter != end && i < max_buffers; ++iter, ++i) 00393 { 00394 asio::const_buffer buffer(*iter); 00395 total_buffer_size += asio::buffer_size(buffer); 00396 } 00397 00398 // A request to read_some 0 bytes on a stream is a no-op. 00399 if (total_buffer_size == 0) 00400 { 00401 this->get_io_service().post(bind_handler(handler, 00402 asio::error_code(), 0)); 00403 return; 00404 } 00405 00406 // Make descriptor non-blocking. 00407 if (!(impl.flags_ & implementation_type::internal_non_blocking)) 00408 { 00409 ioctl_arg_type non_blocking = 1; 00410 asio::error_code ec; 00411 if (descriptor_ops::ioctl(impl.descriptor_, FIONBIO, &non_blocking, ec)) 00412 { 00413 this->get_io_service().post(bind_handler(handler, ec, 0)); 00414 return; 00415 } 00416 impl.flags_ |= implementation_type::internal_non_blocking; 00417 } 00418 00419 reactor_.start_write_op(impl.descriptor_, impl.reactor_data_, 00420 write_operation<ConstBufferSequence, Handler>( 00421 impl.descriptor_, this->get_io_service(), buffers, handler)); 00422 } 00423 } 00424 00425 template <typename Handler> 00426 class null_buffers_operation : 00427 public handler_base_from_member<Handler> 00428 { 00429 public: 00430 null_buffers_operation(asio::io_service& io_service, Handler handler) 00431 : handler_base_from_member<Handler>(handler), 00432 work_(io_service) 00433 { 00434 } 00435 00436 bool perform(asio::error_code&, 00437 std::size_t& bytes_transferred) 00438 { 00439 bytes_transferred = 0; 00440 return true; 00441 } 00442 00443 void complete(const asio::error_code& ec, 00444 std::size_t bytes_transferred) 00445 { 00446 work_.get_io_service().post(bind_handler( 00447 this->handler_, ec, bytes_transferred)); 00448 } 00449 00450 private: 00451 asio::io_service::work work_; 00452 }; 00453 00454 // Start an asynchronous wait until data can be written without blocking. 00455 template <typename Handler> 00456 void async_write_some(implementation_type& impl, 00457 const null_buffers&, Handler handler) 00458 { 00459 if (!is_open(impl)) 00460 { 00461 this->get_io_service().post(bind_handler(handler, 00462 asio::error::bad_descriptor, 0)); 00463 } 00464 else 00465 { 00466 reactor_.start_write_op(impl.descriptor_, impl.reactor_data_, 00467 null_buffers_operation<Handler>(this->get_io_service(), handler), 00468 false); 00469 } 00470 } 00471 00472 // Read some data from the stream. Returns the number of bytes read. 00473 template <typename MutableBufferSequence> 00474 size_t read_some(implementation_type& impl, 00475 const MutableBufferSequence& buffers, asio::error_code& ec) 00476 { 00477 if (!is_open(impl)) 00478 { 00479 ec = asio::error::bad_descriptor; 00480 return 0; 00481 } 00482 00483 // Copy buffers into array. 00484 descriptor_ops::buf bufs[max_buffers]; 00485 typename MutableBufferSequence::const_iterator iter = buffers.begin(); 00486 typename MutableBufferSequence::const_iterator end = buffers.end(); 00487 size_t i = 0; 00488 size_t total_buffer_size = 0; 00489 for (; iter != end && i < max_buffers; ++iter, ++i) 00490 { 00491 asio::mutable_buffer buffer(*iter); 00492 descriptor_ops::init_buf(bufs[i], 00493 asio::buffer_cast<void*>(buffer), 00494 asio::buffer_size(buffer)); 00495 total_buffer_size += asio::buffer_size(buffer); 00496 } 00497 00498 // A request to read_some 0 bytes on a stream is a no-op. 00499 if (total_buffer_size == 0) 00500 { 00501 ec = asio::error_code(); 00502 return 0; 00503 } 00504 00505 // Make descriptor non-blocking if user wants non-blocking. 00506 if (impl.flags_ & implementation_type::user_set_non_blocking) 00507 { 00508 if (!(impl.flags_ & implementation_type::internal_non_blocking)) 00509 { 00510 ioctl_arg_type non_blocking = 1; 00511 if (descriptor_ops::ioctl(impl.descriptor_, FIONBIO, &non_blocking, ec)) 00512 return 0; 00513 impl.flags_ |= implementation_type::internal_non_blocking; 00514 } 00515 } 00516 00517 // Read some data. 00518 for (;;) 00519 { 00520 // Try to complete the operation without blocking. 00521 int bytes_read = descriptor_ops::scatter_read( 00522 impl.descriptor_, bufs, i, ec); 00523 00524 // Check if operation succeeded. 00525 if (bytes_read > 0) 00526 return bytes_read; 00527 00528 // Check for EOF. 00529 if (bytes_read == 0) 00530 { 00531 ec = asio::error::eof; 00532 return 0; 00533 } 00534 00535 // Operation failed. 00536 if ((impl.flags_ & implementation_type::user_set_non_blocking) 00537 || (ec != asio::error::would_block 00538 && ec != asio::error::try_again)) 00539 return 0; 00540 00541 // Wait for descriptor to become ready. 00542 if (descriptor_ops::poll_read(impl.descriptor_, ec) < 0) 00543 return 0; 00544 } 00545 } 00546 00547 // Wait until data can be read without blocking. 00548 size_t read_some(implementation_type& impl, 00549 const null_buffers&, asio::error_code& ec) 00550 { 00551 if (!is_open(impl)) 00552 { 00553 ec = asio::error::bad_descriptor; 00554 return 0; 00555 } 00556 00557 // Wait for descriptor to become ready. 00558 descriptor_ops::poll_read(impl.descriptor_, ec); 00559 00560 return 0; 00561 } 00562 00563 template <typename MutableBufferSequence, typename Handler> 00564 class read_operation : 00565 public handler_base_from_member<Handler> 00566 { 00567 public: 00568 read_operation(int descriptor, asio::io_service& io_service, 00569 const MutableBufferSequence& buffers, Handler handler) 00570 : handler_base_from_member<Handler>(handler), 00571 descriptor_(descriptor), 00572 io_service_(io_service), 00573 work_(io_service), 00574 buffers_(buffers) 00575 { 00576 } 00577 00578 bool perform(asio::error_code& ec, 00579 std::size_t& bytes_transferred) 00580 { 00581 // Check whether the operation was successful. 00582 if (ec) 00583 { 00584 bytes_transferred = 0; 00585 return true; 00586 } 00587 00588 // Copy buffers into array. 00589 descriptor_ops::buf bufs[max_buffers]; 00590 typename MutableBufferSequence::const_iterator iter = buffers_.begin(); 00591 typename MutableBufferSequence::const_iterator end = buffers_.end(); 00592 size_t i = 0; 00593 for (; iter != end && i < max_buffers; ++iter, ++i) 00594 { 00595 asio::mutable_buffer buffer(*iter); 00596 descriptor_ops::init_buf(bufs[i], 00597 asio::buffer_cast<void*>(buffer), 00598 asio::buffer_size(buffer)); 00599 } 00600 00601 // Read some data. 00602 int bytes = descriptor_ops::scatter_read(descriptor_, bufs, i, ec); 00603 if (bytes == 0) 00604 ec = asio::error::eof; 00605 00606 // Check if we need to run the operation again. 00607 if (ec == asio::error::would_block 00608 || ec == asio::error::try_again) 00609 return false; 00610 00611 bytes_transferred = (bytes < 0 ? 0 : bytes); 00612 return true; 00613 } 00614 00615 void complete(const asio::error_code& ec, 00616 std::size_t bytes_transferred) 00617 { 00618 io_service_.post(bind_handler(this->handler_, ec, bytes_transferred)); 00619 } 00620 00621 private: 00622 int descriptor_; 00623 asio::io_service& io_service_; 00624 asio::io_service::work work_; 00625 MutableBufferSequence buffers_; 00626 }; 00627 00628 // Start an asynchronous read. The buffer for the data being read must be 00629 // valid for the lifetime of the asynchronous operation. 00630 template <typename MutableBufferSequence, typename Handler> 00631 void async_read_some(implementation_type& impl, 00632 const MutableBufferSequence& buffers, Handler handler) 00633 { 00634 if (!is_open(impl)) 00635 { 00636 this->get_io_service().post(bind_handler(handler, 00637 asio::error::bad_descriptor, 0)); 00638 } 00639 else 00640 { 00641 // Determine total size of buffers. 00642 typename MutableBufferSequence::const_iterator iter = buffers.begin(); 00643 typename MutableBufferSequence::const_iterator end = buffers.end(); 00644 size_t i = 0; 00645 size_t total_buffer_size = 0; 00646 for (; iter != end && i < max_buffers; ++iter, ++i) 00647 { 00648 asio::mutable_buffer buffer(*iter); 00649 total_buffer_size += asio::buffer_size(buffer); 00650 } 00651 00652 // A request to read_some 0 bytes on a stream is a no-op. 00653 if (total_buffer_size == 0) 00654 { 00655 this->get_io_service().post(bind_handler(handler, 00656 asio::error_code(), 0)); 00657 return; 00658 } 00659 00660 // Make descriptor non-blocking. 00661 if (!(impl.flags_ & implementation_type::internal_non_blocking)) 00662 { 00663 ioctl_arg_type non_blocking = 1; 00664 asio::error_code ec; 00665 if (descriptor_ops::ioctl(impl.descriptor_, FIONBIO, &non_blocking, ec)) 00666 { 00667 this->get_io_service().post(bind_handler(handler, ec, 0)); 00668 return; 00669 } 00670 impl.flags_ |= implementation_type::internal_non_blocking; 00671 } 00672 00673 reactor_.start_read_op(impl.descriptor_, impl.reactor_data_, 00674 read_operation<MutableBufferSequence, Handler>( 00675 impl.descriptor_, this->get_io_service(), buffers, handler)); 00676 } 00677 } 00678 00679 // Wait until data can be read without blocking. 00680 template <typename Handler> 00681 void async_read_some(implementation_type& impl, 00682 const null_buffers&, Handler handler) 00683 { 00684 if (!is_open(impl)) 00685 { 00686 this->get_io_service().post(bind_handler(handler, 00687 asio::error::bad_descriptor, 0)); 00688 } 00689 else 00690 { 00691 reactor_.start_read_op(impl.descriptor_, impl.reactor_data_, 00692 null_buffers_operation<Handler>(this->get_io_service(), handler), 00693 false); 00694 } 00695 } 00696 00697 private: 00698 // The selector that performs event demultiplexing for the service. 00699 Reactor& reactor_; 00700 }; 00701 00702 } // namespace detail 00703 } // namespace asio 00704 00705 #endif // !defined(BOOST_WINDOWS) && !defined(__CYGWIN__) 00706 00707 #include "asio/detail/pop_options.hpp" 00708 00709 #endif // ASIO_DETAIL_REACTIVE_DESCRIPTOR_SERVICE_HPP