$search
00001 // 00002 // reactive_socket_service.hpp 00003 // ~~~~~~~~~~~~~~~~~~~~~~~~~~~ 00004 // 00005 // Copyright (c) 2003-2008 Christopher M. Kohlhoff (chris at kohlhoff dot com) 00006 // 00007 // Distributed under the Boost Software License, Version 1.0. (See accompanying 00008 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) 00009 // 00010 00011 #ifndef ASIO_DETAIL_REACTIVE_SOCKET_SERVICE_HPP 00012 #define ASIO_DETAIL_REACTIVE_SOCKET_SERVICE_HPP 00013 00014 #if defined(_MSC_VER) && (_MSC_VER >= 1200) 00015 # pragma once 00016 #endif // defined(_MSC_VER) && (_MSC_VER >= 1200) 00017 00018 #include "asio/detail/push_options.hpp" 00019 00020 #include "asio/detail/push_options.hpp" 00021 #include <boost/shared_ptr.hpp> 00022 #include "asio/detail/pop_options.hpp" 00023 00024 #include "asio/buffer.hpp" 00025 #include "asio/error.hpp" 00026 #include "asio/io_service.hpp" 00027 #include "asio/socket_base.hpp" 00028 #include "asio/detail/bind_handler.hpp" 00029 #include "asio/detail/handler_base_from_member.hpp" 00030 #include "asio/detail/noncopyable.hpp" 00031 #include "asio/detail/service_base.hpp" 00032 #include "asio/detail/socket_holder.hpp" 00033 #include "asio/detail/socket_ops.hpp" 00034 #include "asio/detail/socket_types.hpp" 00035 00036 namespace asio { 00037 namespace detail { 00038 00039 template <typename Protocol, typename Reactor> 00040 class reactive_socket_service 00041 : public asio::detail::service_base< 00042 reactive_socket_service<Protocol, Reactor> > 00043 { 00044 public: 00045 // The protocol type. 00046 typedef Protocol protocol_type; 00047 00048 // The endpoint type. 00049 typedef typename Protocol::endpoint endpoint_type; 00050 00051 // The native type of a socket. 00052 typedef socket_type native_type; 00053 00054 // The implementation type of the socket. 00055 class implementation_type 00056 : private asio::detail::noncopyable 00057 { 00058 public: 00059 // Default constructor. 00060 implementation_type() 00061 : socket_(invalid_socket), 00062 flags_(0), 00063 protocol_(endpoint_type().protocol()) 00064 { 00065 } 00066 00067 private: 00068 // Only this service will have access to the internal values. 00069 friend class reactive_socket_service<Protocol, Reactor>; 00070 00071 // The native socket representation. 00072 socket_type socket_; 00073 00074 enum 00075 { 00076 user_set_non_blocking = 1, // The user wants a non-blocking socket. 00077 internal_non_blocking = 2, // The socket has been set non-blocking. 00078 enable_connection_aborted = 4, // User wants connection_aborted errors. 00079 user_set_linger = 8 // The user set the linger option. 00080 }; 00081 00082 // Flags indicating the current state of the socket. 00083 unsigned char flags_; 00084 00085 // The protocol associated with the socket. 00086 protocol_type protocol_; 00087 00088 // Per-descriptor data used by the reactor. 00089 typename Reactor::per_descriptor_data reactor_data_; 00090 }; 00091 00092 // The maximum number of buffers to support in a single operation. 00093 enum { max_buffers = 64 < max_iov_len ? 64 : max_iov_len }; 00094 00095 // Constructor. 00096 reactive_socket_service(asio::io_service& io_service) 00097 : asio::detail::service_base< 00098 reactive_socket_service<Protocol, Reactor> >(io_service), 00099 reactor_(asio::use_service<Reactor>(io_service)) 00100 { 00101 } 00102 00103 // Destroy all user-defined handler objects owned by the service. 00104 void shutdown_service() 00105 { 00106 } 00107 00108 // Construct a new socket implementation. 00109 void construct(implementation_type& impl) 00110 { 00111 impl.socket_ = invalid_socket; 00112 impl.flags_ = 0; 00113 } 00114 00115 // Destroy a socket implementation. 00116 void destroy(implementation_type& impl) 00117 { 00118 if (impl.socket_ != invalid_socket) 00119 { 00120 reactor_.close_descriptor(impl.socket_, impl.reactor_data_); 00121 00122 if (impl.flags_ & implementation_type::internal_non_blocking) 00123 { 00124 ioctl_arg_type non_blocking = 0; 00125 asio::error_code ignored_ec; 00126 socket_ops::ioctl(impl.socket_, FIONBIO, &non_blocking, ignored_ec); 00127 impl.flags_ &= ~implementation_type::internal_non_blocking; 00128 } 00129 00130 if (impl.flags_ & implementation_type::user_set_linger) 00131 { 00132 ::linger opt; 00133 opt.l_onoff = 0; 00134 opt.l_linger = 0; 00135 asio::error_code ignored_ec; 00136 socket_ops::setsockopt(impl.socket_, 00137 SOL_SOCKET, SO_LINGER, &opt, sizeof(opt), ignored_ec); 00138 } 00139 00140 asio::error_code ignored_ec; 00141 socket_ops::close(impl.socket_, ignored_ec); 00142 00143 impl.socket_ = invalid_socket; 00144 } 00145 } 00146 00147 // Open a new socket implementation. 00148 asio::error_code open(implementation_type& impl, 00149 const protocol_type& protocol, asio::error_code& ec) 00150 { 00151 if (is_open(impl)) 00152 { 00153 ec = asio::error::already_open; 00154 return ec; 00155 } 00156 00157 socket_holder sock(socket_ops::socket(protocol.family(), 00158 protocol.type(), protocol.protocol(), ec)); 00159 if (sock.get() == invalid_socket) 00160 return ec; 00161 00162 if (int err = reactor_.register_descriptor(sock.get(), impl.reactor_data_)) 00163 { 00164 ec = asio::error_code(err, 00165 asio::error::get_system_category()); 00166 return ec; 00167 } 00168 00169 impl.socket_ = sock.release(); 00170 impl.flags_ = 0; 00171 impl.protocol_ = protocol; 00172 ec = asio::error_code(); 00173 return ec; 00174 } 00175 00176 // Assign a native socket to a socket implementation. 00177 asio::error_code assign(implementation_type& impl, 00178 const protocol_type& protocol, const native_type& native_socket, 00179 asio::error_code& ec) 00180 { 00181 if (is_open(impl)) 00182 { 00183 ec = asio::error::already_open; 00184 return ec; 00185 } 00186 00187 if (int err = reactor_.register_descriptor( 00188 native_socket, impl.reactor_data_)) 00189 { 00190 ec = asio::error_code(err, 00191 asio::error::get_system_category()); 00192 return ec; 00193 } 00194 00195 impl.socket_ = native_socket; 00196 impl.flags_ = 0; 00197 impl.protocol_ = protocol; 00198 ec = asio::error_code(); 00199 return ec; 00200 } 00201 00202 // Determine whether the socket is open. 00203 bool is_open(const implementation_type& impl) const 00204 { 00205 return impl.socket_ != invalid_socket; 00206 } 00207 00208 // Destroy a socket implementation. 00209 asio::error_code close(implementation_type& impl, 00210 asio::error_code& ec) 00211 { 00212 if (is_open(impl)) 00213 { 00214 reactor_.close_descriptor(impl.socket_, impl.reactor_data_); 00215 00216 if (impl.flags_ & implementation_type::internal_non_blocking) 00217 { 00218 ioctl_arg_type non_blocking = 0; 00219 asio::error_code ignored_ec; 00220 socket_ops::ioctl(impl.socket_, FIONBIO, &non_blocking, ignored_ec); 00221 impl.flags_ &= ~implementation_type::internal_non_blocking; 00222 } 00223 00224 if (socket_ops::close(impl.socket_, ec) == socket_error_retval) 00225 return ec; 00226 00227 impl.socket_ = invalid_socket; 00228 } 00229 00230 ec = asio::error_code(); 00231 return ec; 00232 } 00233 00234 // Get the native socket representation. 00235 native_type native(implementation_type& impl) 00236 { 00237 return impl.socket_; 00238 } 00239 00240 // Cancel all operations associated with the socket. 00241 asio::error_code cancel(implementation_type& impl, 00242 asio::error_code& ec) 00243 { 00244 if (!is_open(impl)) 00245 { 00246 ec = asio::error::bad_descriptor; 00247 return ec; 00248 } 00249 00250 reactor_.cancel_ops(impl.socket_, impl.reactor_data_); 00251 ec = asio::error_code(); 00252 return ec; 00253 } 00254 00255 // Determine whether the socket is at the out-of-band data mark. 00256 bool at_mark(const implementation_type& impl, 00257 asio::error_code& ec) const 00258 { 00259 if (!is_open(impl)) 00260 { 00261 ec = asio::error::bad_descriptor; 00262 return false; 00263 } 00264 00265 asio::detail::ioctl_arg_type value = 0; 00266 socket_ops::ioctl(impl.socket_, SIOCATMARK, &value, ec); 00267 #if defined(ENOTTY) 00268 if (ec.value() == ENOTTY) 00269 ec = asio::error::not_socket; 00270 #endif // defined(ENOTTY) 00271 return ec ? false : value != 0; 00272 } 00273 00274 // Determine the number of bytes available for reading. 00275 std::size_t available(const implementation_type& impl, 00276 asio::error_code& ec) const 00277 { 00278 if (!is_open(impl)) 00279 { 00280 ec = asio::error::bad_descriptor; 00281 return 0; 00282 } 00283 00284 asio::detail::ioctl_arg_type value = 0; 00285 socket_ops::ioctl(impl.socket_, FIONREAD, &value, ec); 00286 #if defined(ENOTTY) 00287 if (ec.value() == ENOTTY) 00288 ec = asio::error::not_socket; 00289 #endif // defined(ENOTTY) 00290 return ec ? static_cast<std::size_t>(0) : static_cast<std::size_t>(value); 00291 } 00292 00293 // Bind the socket to the specified local endpoint. 00294 asio::error_code bind(implementation_type& impl, 00295 const endpoint_type& endpoint, asio::error_code& ec) 00296 { 00297 if (!is_open(impl)) 00298 { 00299 ec = asio::error::bad_descriptor; 00300 return ec; 00301 } 00302 00303 socket_ops::bind(impl.socket_, endpoint.data(), endpoint.size(), ec); 00304 return ec; 00305 } 00306 00307 // Place the socket into the state where it will listen for new connections. 00308 asio::error_code listen(implementation_type& impl, int backlog, 00309 asio::error_code& ec) 00310 { 00311 if (!is_open(impl)) 00312 { 00313 ec = asio::error::bad_descriptor; 00314 return ec; 00315 } 00316 00317 socket_ops::listen(impl.socket_, backlog, ec); 00318 return ec; 00319 } 00320 00321 // Set a socket option. 00322 template <typename Option> 00323 asio::error_code set_option(implementation_type& impl, 00324 const Option& option, asio::error_code& ec) 00325 { 00326 if (!is_open(impl)) 00327 { 00328 ec = asio::error::bad_descriptor; 00329 return ec; 00330 } 00331 00332 if (option.level(impl.protocol_) == custom_socket_option_level 00333 && option.name(impl.protocol_) == enable_connection_aborted_option) 00334 { 00335 if (option.size(impl.protocol_) != sizeof(int)) 00336 { 00337 ec = asio::error::invalid_argument; 00338 } 00339 else 00340 { 00341 if (*reinterpret_cast<const int*>(option.data(impl.protocol_))) 00342 impl.flags_ |= implementation_type::enable_connection_aborted; 00343 else 00344 impl.flags_ &= ~implementation_type::enable_connection_aborted; 00345 ec = asio::error_code(); 00346 } 00347 return ec; 00348 } 00349 else 00350 { 00351 if (option.level(impl.protocol_) == SOL_SOCKET 00352 && option.name(impl.protocol_) == SO_LINGER) 00353 { 00354 impl.flags_ |= implementation_type::user_set_linger; 00355 } 00356 00357 socket_ops::setsockopt(impl.socket_, 00358 option.level(impl.protocol_), option.name(impl.protocol_), 00359 option.data(impl.protocol_), option.size(impl.protocol_), ec); 00360 00361 #if defined(__MACH__) && defined(__APPLE__) \ 00362 || defined(__NetBSD__) || defined(__FreeBSD__) || defined(__OpenBSD__) 00363 // To implement portable behaviour for SO_REUSEADDR with UDP sockets we 00364 // need to also set SO_REUSEPORT on BSD-based platforms. 00365 if (!ec && impl.protocol_.type() == SOCK_DGRAM 00366 && option.level(impl.protocol_) == SOL_SOCKET 00367 && option.name(impl.protocol_) == SO_REUSEADDR) 00368 { 00369 asio::error_code ignored_ec; 00370 socket_ops::setsockopt(impl.socket_, SOL_SOCKET, SO_REUSEPORT, 00371 option.data(impl.protocol_), option.size(impl.protocol_), 00372 ignored_ec); 00373 } 00374 #endif 00375 00376 return ec; 00377 } 00378 } 00379 00380 // Set a socket option. 00381 template <typename Option> 00382 asio::error_code get_option(const implementation_type& impl, 00383 Option& option, asio::error_code& ec) const 00384 { 00385 if (!is_open(impl)) 00386 { 00387 ec = asio::error::bad_descriptor; 00388 return ec; 00389 } 00390 00391 if (option.level(impl.protocol_) == custom_socket_option_level 00392 && option.name(impl.protocol_) == enable_connection_aborted_option) 00393 { 00394 if (option.size(impl.protocol_) != sizeof(int)) 00395 { 00396 ec = asio::error::invalid_argument; 00397 } 00398 else 00399 { 00400 int* target = reinterpret_cast<int*>(option.data(impl.protocol_)); 00401 if (impl.flags_ & implementation_type::enable_connection_aborted) 00402 *target = 1; 00403 else 00404 *target = 0; 00405 option.resize(impl.protocol_, sizeof(int)); 00406 ec = asio::error_code(); 00407 } 00408 return ec; 00409 } 00410 else 00411 { 00412 size_t size = option.size(impl.protocol_); 00413 socket_ops::getsockopt(impl.socket_, 00414 option.level(impl.protocol_), option.name(impl.protocol_), 00415 option.data(impl.protocol_), &size, ec); 00416 if (!ec) 00417 option.resize(impl.protocol_, size); 00418 return ec; 00419 } 00420 } 00421 00422 // Perform an IO control command on the socket. 00423 template <typename IO_Control_Command> 00424 asio::error_code io_control(implementation_type& impl, 00425 IO_Control_Command& command, asio::error_code& ec) 00426 { 00427 if (!is_open(impl)) 00428 { 00429 ec = asio::error::bad_descriptor; 00430 return ec; 00431 } 00432 00433 if (command.name() == static_cast<int>(FIONBIO)) 00434 { 00435 if (command.get()) 00436 impl.flags_ |= implementation_type::user_set_non_blocking; 00437 else 00438 impl.flags_ &= ~implementation_type::user_set_non_blocking; 00439 ec = asio::error_code(); 00440 } 00441 else 00442 { 00443 socket_ops::ioctl(impl.socket_, command.name(), 00444 static_cast<ioctl_arg_type*>(command.data()), ec); 00445 } 00446 return ec; 00447 } 00448 00449 // Get the local endpoint. 00450 endpoint_type local_endpoint(const implementation_type& impl, 00451 asio::error_code& ec) const 00452 { 00453 if (!is_open(impl)) 00454 { 00455 ec = asio::error::bad_descriptor; 00456 return endpoint_type(); 00457 } 00458 00459 endpoint_type endpoint; 00460 std::size_t addr_len = endpoint.capacity(); 00461 if (socket_ops::getsockname(impl.socket_, endpoint.data(), &addr_len, ec)) 00462 return endpoint_type(); 00463 endpoint.resize(addr_len); 00464 return endpoint; 00465 } 00466 00467 // Get the remote endpoint. 00468 endpoint_type remote_endpoint(const implementation_type& impl, 00469 asio::error_code& ec) const 00470 { 00471 if (!is_open(impl)) 00472 { 00473 ec = asio::error::bad_descriptor; 00474 return endpoint_type(); 00475 } 00476 00477 endpoint_type endpoint; 00478 std::size_t addr_len = endpoint.capacity(); 00479 if (socket_ops::getpeername(impl.socket_, endpoint.data(), &addr_len, ec)) 00480 return endpoint_type(); 00481 endpoint.resize(addr_len); 00482 return endpoint; 00483 } 00484 00486 asio::error_code shutdown(implementation_type& impl, 00487 socket_base::shutdown_type what, asio::error_code& ec) 00488 { 00489 if (!is_open(impl)) 00490 { 00491 ec = asio::error::bad_descriptor; 00492 return ec; 00493 } 00494 00495 socket_ops::shutdown(impl.socket_, what, ec); 00496 return ec; 00497 } 00498 00499 // Send the given data to the peer. 00500 template <typename ConstBufferSequence> 00501 size_t send(implementation_type& impl, const ConstBufferSequence& buffers, 00502 socket_base::message_flags flags, asio::error_code& ec) 00503 { 00504 if (!is_open(impl)) 00505 { 00506 ec = asio::error::bad_descriptor; 00507 return 0; 00508 } 00509 00510 // Copy buffers into array. 00511 socket_ops::buf bufs[max_buffers]; 00512 typename ConstBufferSequence::const_iterator iter = buffers.begin(); 00513 typename ConstBufferSequence::const_iterator end = buffers.end(); 00514 size_t i = 0; 00515 size_t total_buffer_size = 0; 00516 for (; iter != end && i < max_buffers; ++iter, ++i) 00517 { 00518 asio::const_buffer buffer(*iter); 00519 socket_ops::init_buf(bufs[i], 00520 asio::buffer_cast<const void*>(buffer), 00521 asio::buffer_size(buffer)); 00522 total_buffer_size += asio::buffer_size(buffer); 00523 } 00524 00525 // A request to receive 0 bytes on a stream socket is a no-op. 00526 if (impl.protocol_.type() == SOCK_STREAM && total_buffer_size == 0) 00527 { 00528 ec = asio::error_code(); 00529 return 0; 00530 } 00531 00532 // Make socket non-blocking if user wants non-blocking. 00533 if (impl.flags_ & implementation_type::user_set_non_blocking) 00534 { 00535 if (!(impl.flags_ & implementation_type::internal_non_blocking)) 00536 { 00537 ioctl_arg_type non_blocking = 1; 00538 if (socket_ops::ioctl(impl.socket_, FIONBIO, &non_blocking, ec)) 00539 return 0; 00540 impl.flags_ |= implementation_type::internal_non_blocking; 00541 } 00542 } 00543 00544 // Send the data. 00545 for (;;) 00546 { 00547 // Try to complete the operation without blocking. 00548 int bytes_sent = socket_ops::send(impl.socket_, bufs, i, flags, ec); 00549 00550 // Check if operation succeeded. 00551 if (bytes_sent >= 0) 00552 return bytes_sent; 00553 00554 // Operation failed. 00555 if ((impl.flags_ & implementation_type::user_set_non_blocking) 00556 || (ec != asio::error::would_block 00557 && ec != asio::error::try_again)) 00558 return 0; 00559 00560 // Wait for socket to become ready. 00561 if (socket_ops::poll_write(impl.socket_, ec) < 0) 00562 return 0; 00563 } 00564 } 00565 00566 // Wait until data can be sent without blocking. 00567 size_t send(implementation_type& impl, const null_buffers&, 00568 socket_base::message_flags, asio::error_code& ec) 00569 { 00570 if (!is_open(impl)) 00571 { 00572 ec = asio::error::bad_descriptor; 00573 return 0; 00574 } 00575 00576 // Wait for socket to become ready. 00577 socket_ops::poll_write(impl.socket_, ec); 00578 00579 return 0; 00580 } 00581 00582 template <typename ConstBufferSequence, typename Handler> 00583 class send_operation : 00584 public handler_base_from_member<Handler> 00585 { 00586 public: 00587 send_operation(socket_type socket, asio::io_service& io_service, 00588 const ConstBufferSequence& buffers, socket_base::message_flags flags, 00589 Handler handler) 00590 : handler_base_from_member<Handler>(handler), 00591 socket_(socket), 00592 io_service_(io_service), 00593 work_(io_service), 00594 buffers_(buffers), 00595 flags_(flags) 00596 { 00597 } 00598 00599 bool perform(asio::error_code& ec, 00600 std::size_t& bytes_transferred) 00601 { 00602 // Check whether the operation was successful. 00603 if (ec) 00604 { 00605 bytes_transferred = 0; 00606 return true; 00607 } 00608 00609 // Copy buffers into array. 00610 socket_ops::buf bufs[max_buffers]; 00611 typename ConstBufferSequence::const_iterator iter = buffers_.begin(); 00612 typename ConstBufferSequence::const_iterator end = buffers_.end(); 00613 size_t i = 0; 00614 for (; iter != end && i < max_buffers; ++iter, ++i) 00615 { 00616 asio::const_buffer buffer(*iter); 00617 socket_ops::init_buf(bufs[i], 00618 asio::buffer_cast<const void*>(buffer), 00619 asio::buffer_size(buffer)); 00620 } 00621 00622 // Send the data. 00623 int bytes = socket_ops::send(socket_, bufs, i, flags_, ec); 00624 00625 // Check if we need to run the operation again. 00626 if (ec == asio::error::would_block 00627 || ec == asio::error::try_again) 00628 return false; 00629 00630 bytes_transferred = (bytes < 0 ? 0 : bytes); 00631 return true; 00632 } 00633 00634 void complete(const asio::error_code& ec, 00635 std::size_t bytes_transferred) 00636 { 00637 io_service_.post(bind_handler(this->handler_, ec, bytes_transferred)); 00638 } 00639 00640 private: 00641 socket_type socket_; 00642 asio::io_service& io_service_; 00643 asio::io_service::work work_; 00644 ConstBufferSequence buffers_; 00645 socket_base::message_flags flags_; 00646 }; 00647 00648 // Start an asynchronous send. The data being sent must be valid for the 00649 // lifetime of the asynchronous operation. 00650 template <typename ConstBufferSequence, typename Handler> 00651 void async_send(implementation_type& impl, const ConstBufferSequence& buffers, 00652 socket_base::message_flags flags, Handler handler) 00653 { 00654 if (!is_open(impl)) 00655 { 00656 this->get_io_service().post(bind_handler(handler, 00657 asio::error::bad_descriptor, 0)); 00658 } 00659 else 00660 { 00661 if (impl.protocol_.type() == SOCK_STREAM) 00662 { 00663 // Determine total size of buffers. 00664 typename ConstBufferSequence::const_iterator iter = buffers.begin(); 00665 typename ConstBufferSequence::const_iterator end = buffers.end(); 00666 size_t i = 0; 00667 size_t total_buffer_size = 0; 00668 for (; iter != end && i < max_buffers; ++iter, ++i) 00669 { 00670 asio::const_buffer buffer(*iter); 00671 total_buffer_size += asio::buffer_size(buffer); 00672 } 00673 00674 // A request to receive 0 bytes on a stream socket is a no-op. 00675 if (total_buffer_size == 0) 00676 { 00677 this->get_io_service().post(bind_handler(handler, 00678 asio::error_code(), 0)); 00679 return; 00680 } 00681 } 00682 00683 // Make socket non-blocking. 00684 if (!(impl.flags_ & implementation_type::internal_non_blocking)) 00685 { 00686 ioctl_arg_type non_blocking = 1; 00687 asio::error_code ec; 00688 if (socket_ops::ioctl(impl.socket_, FIONBIO, &non_blocking, ec)) 00689 { 00690 this->get_io_service().post(bind_handler(handler, ec, 0)); 00691 return; 00692 } 00693 impl.flags_ |= implementation_type::internal_non_blocking; 00694 } 00695 00696 reactor_.start_write_op(impl.socket_, impl.reactor_data_, 00697 send_operation<ConstBufferSequence, Handler>( 00698 impl.socket_, this->get_io_service(), buffers, flags, handler)); 00699 } 00700 } 00701 00702 template <typename Handler> 00703 class null_buffers_operation : 00704 public handler_base_from_member<Handler> 00705 { 00706 public: 00707 null_buffers_operation(asio::io_service& io_service, Handler handler) 00708 : handler_base_from_member<Handler>(handler), 00709 work_(io_service) 00710 { 00711 } 00712 00713 bool perform(asio::error_code&, 00714 std::size_t& bytes_transferred) 00715 { 00716 bytes_transferred = 0; 00717 return true; 00718 } 00719 00720 void complete(const asio::error_code& ec, 00721 std::size_t bytes_transferred) 00722 { 00723 work_.get_io_service().post(bind_handler( 00724 this->handler_, ec, bytes_transferred)); 00725 } 00726 00727 private: 00728 asio::io_service::work work_; 00729 }; 00730 00731 // Start an asynchronous wait until data can be sent without blocking. 00732 template <typename Handler> 00733 void async_send(implementation_type& impl, const null_buffers&, 00734 socket_base::message_flags, Handler handler) 00735 { 00736 if (!is_open(impl)) 00737 { 00738 this->get_io_service().post(bind_handler(handler, 00739 asio::error::bad_descriptor, 0)); 00740 } 00741 else 00742 { 00743 reactor_.start_write_op(impl.socket_, impl.reactor_data_, 00744 null_buffers_operation<Handler>(this->get_io_service(), handler), 00745 false); 00746 } 00747 } 00748 00749 // Send a datagram to the specified endpoint. Returns the number of bytes 00750 // sent. 00751 template <typename ConstBufferSequence> 00752 size_t send_to(implementation_type& impl, const ConstBufferSequence& buffers, 00753 const endpoint_type& destination, socket_base::message_flags flags, 00754 asio::error_code& ec) 00755 { 00756 if (!is_open(impl)) 00757 { 00758 ec = asio::error::bad_descriptor; 00759 return 0; 00760 } 00761 00762 // Copy buffers into array. 00763 socket_ops::buf bufs[max_buffers]; 00764 typename ConstBufferSequence::const_iterator iter = buffers.begin(); 00765 typename ConstBufferSequence::const_iterator end = buffers.end(); 00766 size_t i = 0; 00767 for (; iter != end && i < max_buffers; ++iter, ++i) 00768 { 00769 asio::const_buffer buffer(*iter); 00770 socket_ops::init_buf(bufs[i], 00771 asio::buffer_cast<const void*>(buffer), 00772 asio::buffer_size(buffer)); 00773 } 00774 00775 // Make socket non-blocking if user wants non-blocking. 00776 if (impl.flags_ & implementation_type::user_set_non_blocking) 00777 { 00778 if (!(impl.flags_ & implementation_type::internal_non_blocking)) 00779 { 00780 ioctl_arg_type non_blocking = 1; 00781 if (socket_ops::ioctl(impl.socket_, FIONBIO, &non_blocking, ec)) 00782 return 0; 00783 impl.flags_ |= implementation_type::internal_non_blocking; 00784 } 00785 } 00786 00787 // Send the data. 00788 for (;;) 00789 { 00790 // Try to complete the operation without blocking. 00791 int bytes_sent = socket_ops::sendto(impl.socket_, bufs, i, flags, 00792 destination.data(), destination.size(), ec); 00793 00794 // Check if operation succeeded. 00795 if (bytes_sent >= 0) 00796 return bytes_sent; 00797 00798 // Operation failed. 00799 if ((impl.flags_ & implementation_type::user_set_non_blocking) 00800 || (ec != asio::error::would_block 00801 && ec != asio::error::try_again)) 00802 return 0; 00803 00804 // Wait for socket to become ready. 00805 if (socket_ops::poll_write(impl.socket_, ec) < 0) 00806 return 0; 00807 } 00808 } 00809 00810 // Wait until data can be sent without blocking. 00811 size_t send_to(implementation_type& impl, const null_buffers&, 00812 socket_base::message_flags, const endpoint_type&, 00813 asio::error_code& ec) 00814 { 00815 if (!is_open(impl)) 00816 { 00817 ec = asio::error::bad_descriptor; 00818 return 0; 00819 } 00820 00821 // Wait for socket to become ready. 00822 socket_ops::poll_write(impl.socket_, ec); 00823 00824 return 0; 00825 } 00826 00827 template <typename ConstBufferSequence, typename Handler> 00828 class send_to_operation : 00829 public handler_base_from_member<Handler> 00830 { 00831 public: 00832 send_to_operation(socket_type socket, asio::io_service& io_service, 00833 const ConstBufferSequence& buffers, const endpoint_type& endpoint, 00834 socket_base::message_flags flags, Handler handler) 00835 : handler_base_from_member<Handler>(handler), 00836 socket_(socket), 00837 io_service_(io_service), 00838 work_(io_service), 00839 buffers_(buffers), 00840 destination_(endpoint), 00841 flags_(flags) 00842 { 00843 } 00844 00845 bool perform(asio::error_code& ec, 00846 std::size_t& bytes_transferred) 00847 { 00848 // Check whether the operation was successful. 00849 if (ec) 00850 { 00851 bytes_transferred = 0; 00852 return true; 00853 } 00854 00855 // Copy buffers into array. 00856 socket_ops::buf bufs[max_buffers]; 00857 typename ConstBufferSequence::const_iterator iter = buffers_.begin(); 00858 typename ConstBufferSequence::const_iterator end = buffers_.end(); 00859 size_t i = 0; 00860 for (; iter != end && i < max_buffers; ++iter, ++i) 00861 { 00862 asio::const_buffer buffer(*iter); 00863 socket_ops::init_buf(bufs[i], 00864 asio::buffer_cast<const void*>(buffer), 00865 asio::buffer_size(buffer)); 00866 } 00867 00868 // Send the data. 00869 int bytes = socket_ops::sendto(socket_, bufs, i, flags_, 00870 destination_.data(), destination_.size(), ec); 00871 00872 // Check if we need to run the operation again. 00873 if (ec == asio::error::would_block 00874 || ec == asio::error::try_again) 00875 return false; 00876 00877 bytes_transferred = (bytes < 0 ? 0 : bytes); 00878 return true; 00879 } 00880 00881 void complete(const asio::error_code& ec, 00882 std::size_t bytes_transferred) 00883 { 00884 io_service_.post(bind_handler(this->handler_, ec, bytes_transferred)); 00885 } 00886 00887 private: 00888 socket_type socket_; 00889 asio::io_service& io_service_; 00890 asio::io_service::work work_; 00891 ConstBufferSequence buffers_; 00892 endpoint_type destination_; 00893 socket_base::message_flags flags_; 00894 }; 00895 00896 // Start an asynchronous send. The data being sent must be valid for the 00897 // lifetime of the asynchronous operation. 00898 template <typename ConstBufferSequence, typename Handler> 00899 void async_send_to(implementation_type& impl, 00900 const ConstBufferSequence& buffers, 00901 const endpoint_type& destination, socket_base::message_flags flags, 00902 Handler handler) 00903 { 00904 if (!is_open(impl)) 00905 { 00906 this->get_io_service().post(bind_handler(handler, 00907 asio::error::bad_descriptor, 0)); 00908 } 00909 else 00910 { 00911 // Make socket non-blocking. 00912 if (!(impl.flags_ & implementation_type::internal_non_blocking)) 00913 { 00914 ioctl_arg_type non_blocking = 1; 00915 asio::error_code ec; 00916 if (socket_ops::ioctl(impl.socket_, FIONBIO, &non_blocking, ec)) 00917 { 00918 this->get_io_service().post(bind_handler(handler, ec, 0)); 00919 return; 00920 } 00921 impl.flags_ |= implementation_type::internal_non_blocking; 00922 } 00923 00924 reactor_.start_write_op(impl.socket_, impl.reactor_data_, 00925 send_to_operation<ConstBufferSequence, Handler>( 00926 impl.socket_, this->get_io_service(), buffers, 00927 destination, flags, handler)); 00928 } 00929 } 00930 00931 // Start an asynchronous wait until data can be sent without blocking. 00932 template <typename Handler> 00933 void async_send_to(implementation_type& impl, const null_buffers&, 00934 socket_base::message_flags, const endpoint_type&, Handler handler) 00935 { 00936 if (!is_open(impl)) 00937 { 00938 this->get_io_service().post(bind_handler(handler, 00939 asio::error::bad_descriptor, 0)); 00940 } 00941 else 00942 { 00943 reactor_.start_write_op(impl.socket_, impl.reactor_data_, 00944 null_buffers_operation<Handler>(this->get_io_service(), handler), 00945 false); 00946 } 00947 } 00948 00949 // Receive some data from the peer. Returns the number of bytes received. 00950 template <typename MutableBufferSequence> 00951 size_t receive(implementation_type& impl, 00952 const MutableBufferSequence& buffers, 00953 socket_base::message_flags flags, asio::error_code& ec) 00954 { 00955 if (!is_open(impl)) 00956 { 00957 ec = asio::error::bad_descriptor; 00958 return 0; 00959 } 00960 00961 // Copy buffers into array. 00962 socket_ops::buf bufs[max_buffers]; 00963 typename MutableBufferSequence::const_iterator iter = buffers.begin(); 00964 typename MutableBufferSequence::const_iterator end = buffers.end(); 00965 size_t i = 0; 00966 size_t total_buffer_size = 0; 00967 for (; iter != end && i < max_buffers; ++iter, ++i) 00968 { 00969 asio::mutable_buffer buffer(*iter); 00970 socket_ops::init_buf(bufs[i], 00971 asio::buffer_cast<void*>(buffer), 00972 asio::buffer_size(buffer)); 00973 total_buffer_size += asio::buffer_size(buffer); 00974 } 00975 00976 // A request to receive 0 bytes on a stream socket is a no-op. 00977 if (impl.protocol_.type() == SOCK_STREAM && total_buffer_size == 0) 00978 { 00979 ec = asio::error_code(); 00980 return 0; 00981 } 00982 00983 // Make socket non-blocking if user wants non-blocking. 00984 if (impl.flags_ & implementation_type::user_set_non_blocking) 00985 { 00986 if (!(impl.flags_ & implementation_type::internal_non_blocking)) 00987 { 00988 ioctl_arg_type non_blocking = 1; 00989 if (socket_ops::ioctl(impl.socket_, FIONBIO, &non_blocking, ec)) 00990 return 0; 00991 impl.flags_ |= implementation_type::internal_non_blocking; 00992 } 00993 } 00994 00995 // Receive some data. 00996 for (;;) 00997 { 00998 // Try to complete the operation without blocking. 00999 int bytes_recvd = socket_ops::recv(impl.socket_, bufs, i, flags, ec); 01000 01001 // Check if operation succeeded. 01002 if (bytes_recvd > 0) 01003 return bytes_recvd; 01004 01005 // Check for EOF. 01006 if (bytes_recvd == 0 && impl.protocol_.type() == SOCK_STREAM) 01007 { 01008 ec = asio::error::eof; 01009 return 0; 01010 } 01011 01012 // Operation failed. 01013 if ((impl.flags_ & implementation_type::user_set_non_blocking) 01014 || (ec != asio::error::would_block 01015 && ec != asio::error::try_again)) 01016 return 0; 01017 01018 // Wait for socket to become ready. 01019 if (socket_ops::poll_read(impl.socket_, ec) < 0) 01020 return 0; 01021 } 01022 } 01023 01024 // Wait until data can be received without blocking. 01025 size_t receive(implementation_type& impl, const null_buffers&, 01026 socket_base::message_flags, asio::error_code& ec) 01027 { 01028 if (!is_open(impl)) 01029 { 01030 ec = asio::error::bad_descriptor; 01031 return 0; 01032 } 01033 01034 // Wait for socket to become ready. 01035 socket_ops::poll_read(impl.socket_, ec); 01036 01037 return 0; 01038 } 01039 01040 template <typename MutableBufferSequence, typename Handler> 01041 class receive_operation : 01042 public handler_base_from_member<Handler> 01043 { 01044 public: 01045 receive_operation(socket_type socket, int protocol_type, 01046 asio::io_service& io_service, 01047 const MutableBufferSequence& buffers, 01048 socket_base::message_flags flags, Handler handler) 01049 : handler_base_from_member<Handler>(handler), 01050 socket_(socket), 01051 protocol_type_(protocol_type), 01052 io_service_(io_service), 01053 work_(io_service), 01054 buffers_(buffers), 01055 flags_(flags) 01056 { 01057 } 01058 01059 bool perform(asio::error_code& ec, 01060 std::size_t& bytes_transferred) 01061 { 01062 // Check whether the operation was successful. 01063 if (ec) 01064 { 01065 bytes_transferred = 0; 01066 return true; 01067 } 01068 01069 // Copy buffers into array. 01070 socket_ops::buf bufs[max_buffers]; 01071 typename MutableBufferSequence::const_iterator iter = buffers_.begin(); 01072 typename MutableBufferSequence::const_iterator end = buffers_.end(); 01073 size_t i = 0; 01074 for (; iter != end && i < max_buffers; ++iter, ++i) 01075 { 01076 asio::mutable_buffer buffer(*iter); 01077 socket_ops::init_buf(bufs[i], 01078 asio::buffer_cast<void*>(buffer), 01079 asio::buffer_size(buffer)); 01080 } 01081 01082 // Receive some data. 01083 int bytes = socket_ops::recv(socket_, bufs, i, flags_, ec); 01084 if (bytes == 0 && protocol_type_ == SOCK_STREAM) 01085 ec = asio::error::eof; 01086 01087 // Check if we need to run the operation again. 01088 if (ec == asio::error::would_block 01089 || ec == asio::error::try_again) 01090 return false; 01091 01092 bytes_transferred = (bytes < 0 ? 0 : bytes); 01093 return true; 01094 } 01095 01096 void complete(const asio::error_code& ec, 01097 std::size_t bytes_transferred) 01098 { 01099 io_service_.post(bind_handler(this->handler_, ec, bytes_transferred)); 01100 } 01101 01102 private: 01103 socket_type socket_; 01104 int protocol_type_; 01105 asio::io_service& io_service_; 01106 asio::io_service::work work_; 01107 MutableBufferSequence buffers_; 01108 socket_base::message_flags flags_; 01109 }; 01110 01111 // Start an asynchronous receive. The buffer for the data being received 01112 // must be valid for the lifetime of the asynchronous operation. 01113 template <typename MutableBufferSequence, typename Handler> 01114 void async_receive(implementation_type& impl, 01115 const MutableBufferSequence& buffers, 01116 socket_base::message_flags flags, Handler handler) 01117 { 01118 if (!is_open(impl)) 01119 { 01120 this->get_io_service().post(bind_handler(handler, 01121 asio::error::bad_descriptor, 0)); 01122 } 01123 else 01124 { 01125 if (impl.protocol_.type() == SOCK_STREAM) 01126 { 01127 // Determine total size of buffers. 01128 typename MutableBufferSequence::const_iterator iter = buffers.begin(); 01129 typename MutableBufferSequence::const_iterator end = buffers.end(); 01130 size_t i = 0; 01131 size_t total_buffer_size = 0; 01132 for (; iter != end && i < max_buffers; ++iter, ++i) 01133 { 01134 asio::mutable_buffer buffer(*iter); 01135 total_buffer_size += asio::buffer_size(buffer); 01136 } 01137 01138 // A request to receive 0 bytes on a stream socket is a no-op. 01139 if (total_buffer_size == 0) 01140 { 01141 this->get_io_service().post(bind_handler(handler, 01142 asio::error_code(), 0)); 01143 return; 01144 } 01145 } 01146 01147 // Make socket non-blocking. 01148 if (!(impl.flags_ & implementation_type::internal_non_blocking)) 01149 { 01150 ioctl_arg_type non_blocking = 1; 01151 asio::error_code ec; 01152 if (socket_ops::ioctl(impl.socket_, FIONBIO, &non_blocking, ec)) 01153 { 01154 this->get_io_service().post(bind_handler(handler, ec, 0)); 01155 return; 01156 } 01157 impl.flags_ |= implementation_type::internal_non_blocking; 01158 } 01159 01160 if (flags & socket_base::message_out_of_band) 01161 { 01162 reactor_.start_except_op(impl.socket_, impl.reactor_data_, 01163 receive_operation<MutableBufferSequence, Handler>( 01164 impl.socket_, impl.protocol_.type(), 01165 this->get_io_service(), buffers, flags, handler)); 01166 } 01167 else 01168 { 01169 reactor_.start_read_op(impl.socket_, impl.reactor_data_, 01170 receive_operation<MutableBufferSequence, Handler>( 01171 impl.socket_, impl.protocol_.type(), 01172 this->get_io_service(), buffers, flags, handler)); 01173 } 01174 } 01175 } 01176 01177 // Wait until data can be received without blocking. 01178 template <typename Handler> 01179 void async_receive(implementation_type& impl, const null_buffers&, 01180 socket_base::message_flags flags, Handler handler) 01181 { 01182 if (!is_open(impl)) 01183 { 01184 this->get_io_service().post(bind_handler(handler, 01185 asio::error::bad_descriptor, 0)); 01186 } 01187 else if (flags & socket_base::message_out_of_band) 01188 { 01189 reactor_.start_except_op(impl.socket_, impl.reactor_data_, 01190 null_buffers_operation<Handler>(this->get_io_service(), handler)); 01191 } 01192 else 01193 { 01194 reactor_.start_read_op(impl.socket_, impl.reactor_data_, 01195 null_buffers_operation<Handler>(this->get_io_service(), handler), 01196 false); 01197 } 01198 } 01199 01200 // Receive a datagram with the endpoint of the sender. Returns the number of 01201 // bytes received. 01202 template <typename MutableBufferSequence> 01203 size_t receive_from(implementation_type& impl, 01204 const MutableBufferSequence& buffers, 01205 endpoint_type& sender_endpoint, socket_base::message_flags flags, 01206 asio::error_code& ec) 01207 { 01208 if (!is_open(impl)) 01209 { 01210 ec = asio::error::bad_descriptor; 01211 return 0; 01212 } 01213 01214 // Copy buffers into array. 01215 socket_ops::buf bufs[max_buffers]; 01216 typename MutableBufferSequence::const_iterator iter = buffers.begin(); 01217 typename MutableBufferSequence::const_iterator end = buffers.end(); 01218 size_t i = 0; 01219 for (; iter != end && i < max_buffers; ++iter, ++i) 01220 { 01221 asio::mutable_buffer buffer(*iter); 01222 socket_ops::init_buf(bufs[i], 01223 asio::buffer_cast<void*>(buffer), 01224 asio::buffer_size(buffer)); 01225 } 01226 01227 // Make socket non-blocking if user wants non-blocking. 01228 if (impl.flags_ & implementation_type::user_set_non_blocking) 01229 { 01230 if (!(impl.flags_ & implementation_type::internal_non_blocking)) 01231 { 01232 ioctl_arg_type non_blocking = 1; 01233 if (socket_ops::ioctl(impl.socket_, FIONBIO, &non_blocking, ec)) 01234 return 0; 01235 impl.flags_ |= implementation_type::internal_non_blocking; 01236 } 01237 } 01238 01239 // Receive some data. 01240 for (;;) 01241 { 01242 // Try to complete the operation without blocking. 01243 std::size_t addr_len = sender_endpoint.capacity(); 01244 int bytes_recvd = socket_ops::recvfrom(impl.socket_, bufs, i, flags, 01245 sender_endpoint.data(), &addr_len, ec); 01246 01247 // Check if operation succeeded. 01248 if (bytes_recvd > 0) 01249 { 01250 sender_endpoint.resize(addr_len); 01251 return bytes_recvd; 01252 } 01253 01254 // Check for EOF. 01255 if (bytes_recvd == 0 && impl.protocol_.type() == SOCK_STREAM) 01256 { 01257 ec = asio::error::eof; 01258 return 0; 01259 } 01260 01261 // Operation failed. 01262 if ((impl.flags_ & implementation_type::user_set_non_blocking) 01263 || (ec != asio::error::would_block 01264 && ec != asio::error::try_again)) 01265 return 0; 01266 01267 // Wait for socket to become ready. 01268 if (socket_ops::poll_read(impl.socket_, ec) < 0) 01269 return 0; 01270 } 01271 } 01272 01273 // Wait until data can be received without blocking. 01274 size_t receive_from(implementation_type& impl, const null_buffers&, 01275 endpoint_type& sender_endpoint, socket_base::message_flags, 01276 asio::error_code& ec) 01277 { 01278 if (!is_open(impl)) 01279 { 01280 ec = asio::error::bad_descriptor; 01281 return 0; 01282 } 01283 01284 // Wait for socket to become ready. 01285 socket_ops::poll_read(impl.socket_, ec); 01286 01287 // Reset endpoint since it can be given no sensible value at this time. 01288 sender_endpoint = endpoint_type(); 01289 01290 return 0; 01291 } 01292 01293 template <typename MutableBufferSequence, typename Handler> 01294 class receive_from_operation : 01295 public handler_base_from_member<Handler> 01296 { 01297 public: 01298 receive_from_operation(socket_type socket, int protocol_type, 01299 asio::io_service& io_service, 01300 const MutableBufferSequence& buffers, endpoint_type& endpoint, 01301 socket_base::message_flags flags, Handler handler) 01302 : handler_base_from_member<Handler>(handler), 01303 socket_(socket), 01304 protocol_type_(protocol_type), 01305 io_service_(io_service), 01306 work_(io_service), 01307 buffers_(buffers), 01308 sender_endpoint_(endpoint), 01309 flags_(flags) 01310 { 01311 } 01312 01313 bool perform(asio::error_code& ec, 01314 std::size_t& bytes_transferred) 01315 { 01316 // Check whether the operation was successful. 01317 if (ec) 01318 { 01319 bytes_transferred = 0; 01320 return true; 01321 } 01322 01323 // Copy buffers into array. 01324 socket_ops::buf bufs[max_buffers]; 01325 typename MutableBufferSequence::const_iterator iter = buffers_.begin(); 01326 typename MutableBufferSequence::const_iterator end = buffers_.end(); 01327 size_t i = 0; 01328 for (; iter != end && i < max_buffers; ++iter, ++i) 01329 { 01330 asio::mutable_buffer buffer(*iter); 01331 socket_ops::init_buf(bufs[i], 01332 asio::buffer_cast<void*>(buffer), 01333 asio::buffer_size(buffer)); 01334 } 01335 01336 // Receive some data. 01337 std::size_t addr_len = sender_endpoint_.capacity(); 01338 int bytes = socket_ops::recvfrom(socket_, bufs, i, flags_, 01339 sender_endpoint_.data(), &addr_len, ec); 01340 if (bytes == 0 && protocol_type_ == SOCK_STREAM) 01341 ec = asio::error::eof; 01342 01343 // Check if we need to run the operation again. 01344 if (ec == asio::error::would_block 01345 || ec == asio::error::try_again) 01346 return false; 01347 01348 sender_endpoint_.resize(addr_len); 01349 bytes_transferred = (bytes < 0 ? 0 : bytes); 01350 return true; 01351 } 01352 01353 void complete(const asio::error_code& ec, 01354 std::size_t bytes_transferred) 01355 { 01356 io_service_.post(bind_handler(this->handler_, ec, bytes_transferred)); 01357 } 01358 01359 private: 01360 socket_type socket_; 01361 int protocol_type_; 01362 asio::io_service& io_service_; 01363 asio::io_service::work work_; 01364 MutableBufferSequence buffers_; 01365 endpoint_type& sender_endpoint_; 01366 socket_base::message_flags flags_; 01367 }; 01368 01369 // Start an asynchronous receive. The buffer for the data being received and 01370 // the sender_endpoint object must both be valid for the lifetime of the 01371 // asynchronous operation. 01372 template <typename MutableBufferSequence, typename Handler> 01373 void async_receive_from(implementation_type& impl, 01374 const MutableBufferSequence& buffers, endpoint_type& sender_endpoint, 01375 socket_base::message_flags flags, Handler handler) 01376 { 01377 if (!is_open(impl)) 01378 { 01379 this->get_io_service().post(bind_handler(handler, 01380 asio::error::bad_descriptor, 0)); 01381 } 01382 else 01383 { 01384 // Make socket non-blocking. 01385 if (!(impl.flags_ & implementation_type::internal_non_blocking)) 01386 { 01387 ioctl_arg_type non_blocking = 1; 01388 asio::error_code ec; 01389 if (socket_ops::ioctl(impl.socket_, FIONBIO, &non_blocking, ec)) 01390 { 01391 this->get_io_service().post(bind_handler(handler, ec, 0)); 01392 return; 01393 } 01394 impl.flags_ |= implementation_type::internal_non_blocking; 01395 } 01396 01397 reactor_.start_read_op(impl.socket_, impl.reactor_data_, 01398 receive_from_operation<MutableBufferSequence, Handler>( 01399 impl.socket_, impl.protocol_.type(), this->get_io_service(), 01400 buffers, sender_endpoint, flags, handler)); 01401 } 01402 } 01403 01404 // Wait until data can be received without blocking. 01405 template <typename Handler> 01406 void async_receive_from(implementation_type& impl, 01407 const null_buffers&, endpoint_type& sender_endpoint, 01408 socket_base::message_flags flags, Handler handler) 01409 { 01410 if (!is_open(impl)) 01411 { 01412 this->get_io_service().post(bind_handler(handler, 01413 asio::error::bad_descriptor, 0)); 01414 } 01415 else 01416 { 01417 // Reset endpoint since it can be given no sensible value at this time. 01418 sender_endpoint = endpoint_type(); 01419 01420 if (flags & socket_base::message_out_of_band) 01421 { 01422 reactor_.start_except_op(impl.socket_, impl.reactor_data_, 01423 null_buffers_operation<Handler>(this->get_io_service(), handler)); 01424 } 01425 else 01426 { 01427 reactor_.start_read_op(impl.socket_, impl.reactor_data_, 01428 null_buffers_operation<Handler>(this->get_io_service(), handler), 01429 false); 01430 } 01431 } 01432 } 01433 01434 // Accept a new connection. 01435 template <typename Socket> 01436 asio::error_code accept(implementation_type& impl, 01437 Socket& peer, endpoint_type* peer_endpoint, asio::error_code& ec) 01438 { 01439 if (!is_open(impl)) 01440 { 01441 ec = asio::error::bad_descriptor; 01442 return ec; 01443 } 01444 01445 // We cannot accept a socket that is already open. 01446 if (peer.is_open()) 01447 { 01448 ec = asio::error::already_open; 01449 return ec; 01450 } 01451 01452 // Make socket non-blocking if user wants non-blocking. 01453 if (impl.flags_ & implementation_type::user_set_non_blocking) 01454 { 01455 if (!(impl.flags_ & implementation_type::internal_non_blocking)) 01456 { 01457 ioctl_arg_type non_blocking = 1; 01458 if (socket_ops::ioctl(impl.socket_, FIONBIO, &non_blocking, ec)) 01459 return ec; 01460 impl.flags_ |= implementation_type::internal_non_blocking; 01461 } 01462 } 01463 01464 // Accept a socket. 01465 for (;;) 01466 { 01467 // Try to complete the operation without blocking. 01468 asio::error_code ec; 01469 socket_holder new_socket; 01470 std::size_t addr_len = 0; 01471 if (peer_endpoint) 01472 { 01473 addr_len = peer_endpoint->capacity(); 01474 new_socket.reset(socket_ops::accept(impl.socket_, 01475 peer_endpoint->data(), &addr_len, ec)); 01476 } 01477 else 01478 { 01479 new_socket.reset(socket_ops::accept(impl.socket_, 0, 0, ec)); 01480 } 01481 01482 // Check if operation succeeded. 01483 if (new_socket.get() >= 0) 01484 { 01485 if (peer_endpoint) 01486 peer_endpoint->resize(addr_len); 01487 peer.assign(impl.protocol_, new_socket.get(), ec); 01488 if (!ec) 01489 new_socket.release(); 01490 return ec; 01491 } 01492 01493 // Operation failed. 01494 if (ec == asio::error::would_block 01495 || ec == asio::error::try_again) 01496 { 01497 if (impl.flags_ & implementation_type::user_set_non_blocking) 01498 return ec; 01499 // Fall through to retry operation. 01500 } 01501 else if (ec == asio::error::connection_aborted) 01502 { 01503 if (impl.flags_ & implementation_type::enable_connection_aborted) 01504 return ec; 01505 // Fall through to retry operation. 01506 } 01507 #if defined(EPROTO) 01508 else if (ec.value() == EPROTO) 01509 { 01510 if (impl.flags_ & implementation_type::enable_connection_aborted) 01511 return ec; 01512 // Fall through to retry operation. 01513 } 01514 #endif // defined(EPROTO) 01515 else 01516 return ec; 01517 01518 // Wait for socket to become ready. 01519 if (socket_ops::poll_read(impl.socket_, ec) < 0) 01520 return ec; 01521 } 01522 } 01523 01524 template <typename Socket, typename Handler> 01525 class accept_operation : 01526 public handler_base_from_member<Handler> 01527 { 01528 public: 01529 accept_operation(socket_type socket, asio::io_service& io_service, 01530 Socket& peer, const protocol_type& protocol, 01531 endpoint_type* peer_endpoint, bool enable_connection_aborted, 01532 Handler handler) 01533 : handler_base_from_member<Handler>(handler), 01534 socket_(socket), 01535 io_service_(io_service), 01536 work_(io_service), 01537 peer_(peer), 01538 protocol_(protocol), 01539 peer_endpoint_(peer_endpoint), 01540 enable_connection_aborted_(enable_connection_aborted) 01541 { 01542 } 01543 01544 bool perform(asio::error_code& ec, std::size_t&) 01545 { 01546 // Check whether the operation was successful. 01547 if (ec) 01548 return true; 01549 01550 // Accept the waiting connection. 01551 socket_holder new_socket; 01552 std::size_t addr_len = 0; 01553 if (peer_endpoint_) 01554 { 01555 addr_len = peer_endpoint_->capacity(); 01556 new_socket.reset(socket_ops::accept(socket_, 01557 peer_endpoint_->data(), &addr_len, ec)); 01558 } 01559 else 01560 { 01561 new_socket.reset(socket_ops::accept(socket_, 0, 0, ec)); 01562 } 01563 01564 // Check if we need to run the operation again. 01565 if (ec == asio::error::would_block 01566 || ec == asio::error::try_again) 01567 return false; 01568 if (ec == asio::error::connection_aborted 01569 && !enable_connection_aborted_) 01570 return false; 01571 #if defined(EPROTO) 01572 if (ec.value() == EPROTO && !enable_connection_aborted_) 01573 return false; 01574 #endif // defined(EPROTO) 01575 01576 // Transfer ownership of the new socket to the peer object. 01577 if (!ec) 01578 { 01579 if (peer_endpoint_) 01580 peer_endpoint_->resize(addr_len); 01581 peer_.assign(protocol_, new_socket.get(), ec); 01582 if (!ec) 01583 new_socket.release(); 01584 } 01585 01586 return true; 01587 } 01588 01589 void complete(const asio::error_code& ec, std::size_t) 01590 { 01591 io_service_.post(bind_handler(this->handler_, ec)); 01592 } 01593 01594 private: 01595 socket_type socket_; 01596 asio::io_service& io_service_; 01597 asio::io_service::work work_; 01598 Socket& peer_; 01599 protocol_type protocol_; 01600 endpoint_type* peer_endpoint_; 01601 bool enable_connection_aborted_; 01602 }; 01603 01604 // Start an asynchronous accept. The peer and peer_endpoint objects 01605 // must be valid until the accept's handler is invoked. 01606 template <typename Socket, typename Handler> 01607 void async_accept(implementation_type& impl, Socket& peer, 01608 endpoint_type* peer_endpoint, Handler handler) 01609 { 01610 if (!is_open(impl)) 01611 { 01612 this->get_io_service().post(bind_handler(handler, 01613 asio::error::bad_descriptor)); 01614 } 01615 else if (peer.is_open()) 01616 { 01617 this->get_io_service().post(bind_handler(handler, 01618 asio::error::already_open)); 01619 } 01620 else 01621 { 01622 // Make socket non-blocking. 01623 if (!(impl.flags_ & implementation_type::internal_non_blocking)) 01624 { 01625 ioctl_arg_type non_blocking = 1; 01626 asio::error_code ec; 01627 if (socket_ops::ioctl(impl.socket_, FIONBIO, &non_blocking, ec)) 01628 { 01629 this->get_io_service().post(bind_handler(handler, ec)); 01630 return; 01631 } 01632 impl.flags_ |= implementation_type::internal_non_blocking; 01633 } 01634 01635 reactor_.start_read_op(impl.socket_, impl.reactor_data_, 01636 accept_operation<Socket, Handler>( 01637 impl.socket_, this->get_io_service(), 01638 peer, impl.protocol_, peer_endpoint, 01639 (impl.flags_ & implementation_type::enable_connection_aborted) != 0, 01640 handler)); 01641 } 01642 } 01643 01644 // Connect the socket to the specified endpoint. 01645 asio::error_code connect(implementation_type& impl, 01646 const endpoint_type& peer_endpoint, asio::error_code& ec) 01647 { 01648 if (!is_open(impl)) 01649 { 01650 ec = asio::error::bad_descriptor; 01651 return ec; 01652 } 01653 01654 if (impl.flags_ & implementation_type::internal_non_blocking) 01655 { 01656 // Mark the socket as blocking while we perform the connect. 01657 ioctl_arg_type non_blocking = 0; 01658 if (socket_ops::ioctl(impl.socket_, FIONBIO, &non_blocking, ec)) 01659 return ec; 01660 impl.flags_ &= ~implementation_type::internal_non_blocking; 01661 } 01662 01663 // Perform the connect operation. 01664 socket_ops::connect(impl.socket_, 01665 peer_endpoint.data(), peer_endpoint.size(), ec); 01666 return ec; 01667 } 01668 01669 template <typename Handler> 01670 class connect_operation : 01671 public handler_base_from_member<Handler> 01672 { 01673 public: 01674 connect_operation(socket_type socket, 01675 asio::io_service& io_service, Handler handler) 01676 : handler_base_from_member<Handler>(handler), 01677 socket_(socket), 01678 io_service_(io_service), 01679 work_(io_service) 01680 { 01681 } 01682 01683 bool perform(asio::error_code& ec, std::size_t&) 01684 { 01685 // Check whether the operation was successful. 01686 if (ec) 01687 return true; 01688 01689 // Get the error code from the connect operation. 01690 int connect_error = 0; 01691 size_t connect_error_len = sizeof(connect_error); 01692 if (socket_ops::getsockopt(socket_, SOL_SOCKET, SO_ERROR, 01693 &connect_error, &connect_error_len, ec) == socket_error_retval) 01694 return true; 01695 01696 // The connection failed so the handler will be posted with an error code. 01697 if (connect_error) 01698 { 01699 ec = asio::error_code(connect_error, 01700 asio::error::get_system_category()); 01701 return true; 01702 } 01703 01704 return true; 01705 } 01706 01707 void complete(const asio::error_code& ec, std::size_t) 01708 { 01709 io_service_.post(bind_handler(this->handler_, ec)); 01710 } 01711 01712 private: 01713 socket_type socket_; 01714 asio::io_service& io_service_; 01715 asio::io_service::work work_; 01716 }; 01717 01718 // Start an asynchronous connect. 01719 template <typename Handler> 01720 void async_connect(implementation_type& impl, 01721 const endpoint_type& peer_endpoint, Handler handler) 01722 { 01723 if (!is_open(impl)) 01724 { 01725 this->get_io_service().post(bind_handler(handler, 01726 asio::error::bad_descriptor)); 01727 return; 01728 } 01729 01730 // Make socket non-blocking. 01731 if (!(impl.flags_ & implementation_type::internal_non_blocking)) 01732 { 01733 ioctl_arg_type non_blocking = 1; 01734 asio::error_code ec; 01735 if (socket_ops::ioctl(impl.socket_, FIONBIO, &non_blocking, ec)) 01736 { 01737 this->get_io_service().post(bind_handler(handler, ec)); 01738 return; 01739 } 01740 impl.flags_ |= implementation_type::internal_non_blocking; 01741 } 01742 01743 // Start the connect operation. The socket is already marked as non-blocking 01744 // so the connection will take place asynchronously. 01745 asio::error_code ec; 01746 if (socket_ops::connect(impl.socket_, peer_endpoint.data(), 01747 peer_endpoint.size(), ec) == 0) 01748 { 01749 // The connect operation has finished successfully so we need to post the 01750 // handler immediately. 01751 this->get_io_service().post(bind_handler(handler, 01752 asio::error_code())); 01753 } 01754 else if (ec == asio::error::in_progress 01755 || ec == asio::error::would_block) 01756 { 01757 // The connection is happening in the background, and we need to wait 01758 // until the socket becomes writeable. 01759 reactor_.start_connect_op(impl.socket_, impl.reactor_data_, 01760 connect_operation<Handler>(impl.socket_, 01761 this->get_io_service(), handler)); 01762 } 01763 else 01764 { 01765 // The connect operation has failed, so post the handler immediately. 01766 this->get_io_service().post(bind_handler(handler, ec)); 01767 } 01768 } 01769 01770 private: 01771 // The selector that performs event demultiplexing for the service. 01772 Reactor& reactor_; 01773 }; 01774 01775 } // namespace detail 01776 } // namespace asio 01777 01778 #include "asio/detail/pop_options.hpp" 01779 01780 #endif // ASIO_DETAIL_REACTIVE_SOCKET_SERVICE_HPP