win_iocp_socket_service.hpp
Go to the documentation of this file.
00001 //
00002 // win_iocp_socket_service.hpp
00003 // ~~~~~~~~~~~~~~~~~~~~~~~~~~~
00004 //
00005 // Copyright (c) 2003-2008 Christopher M. Kohlhoff (chris at kohlhoff dot com)
00006 //
00007 // Distributed under the Boost Software License, Version 1.0. (See accompanying
00008 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
00009 //
00010 
00011 #ifndef ASIO_DETAIL_WIN_IOCP_SOCKET_SERVICE_HPP
00012 #define ASIO_DETAIL_WIN_IOCP_SOCKET_SERVICE_HPP
00013 
00014 #if defined(_MSC_VER) && (_MSC_VER >= 1200)
00015 # pragma once
00016 #endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
00017 
00018 #include "asio/detail/push_options.hpp"
00019 
00020 #include "asio/detail/win_iocp_io_service_fwd.hpp"
00021 
00022 #if defined(ASIO_HAS_IOCP)
00023 
00024 #include "asio/detail/push_options.hpp"
00025 #include <cstring>
00026 #include <boost/shared_ptr.hpp>
00027 #include <boost/type_traits/is_same.hpp>
00028 #include <boost/weak_ptr.hpp>
00029 #include "asio/detail/pop_options.hpp"
00030 
00031 #include "asio/buffer.hpp"
00032 #include "asio/error.hpp"
00033 #include "asio/io_service.hpp"
00034 #include "asio/socket_base.hpp"
00035 #include "asio/detail/bind_handler.hpp"
00036 #include "asio/detail/handler_alloc_helpers.hpp"
00037 #include "asio/detail/handler_invoke_helpers.hpp"
00038 #include "asio/detail/mutex.hpp"
00039 #include "asio/detail/select_reactor.hpp"
00040 #include "asio/detail/socket_holder.hpp"
00041 #include "asio/detail/socket_ops.hpp"
00042 #include "asio/detail/socket_types.hpp"
00043 #include "asio/detail/win_iocp_io_service.hpp"
00044 
00045 namespace asio {
00046 namespace detail {
00047 
00048 template <typename Protocol>
00049 class win_iocp_socket_service
00050   : public asio::detail::service_base<win_iocp_socket_service<Protocol> >
00051 {
00052 public:
00053   // The protocol type.
00054   typedef Protocol protocol_type;
00055 
00056   // The endpoint type.
00057   typedef typename Protocol::endpoint endpoint_type;
00058 
00059   // Base class for all operations.
00060   typedef win_iocp_io_service::operation operation;
00061 
00062   struct noop_deleter { void operator()(void*) {} };
00063   typedef boost::shared_ptr<void> shared_cancel_token_type;
00064   typedef boost::weak_ptr<void> weak_cancel_token_type;
00065 
00066   // The native type of a socket.
00067   class native_type
00068   {
00069   public:
00070     native_type(socket_type s)
00071       : socket_(s),
00072         have_remote_endpoint_(false)
00073     {
00074     }
00075 
00076     native_type(socket_type s, const endpoint_type& ep)
00077       : socket_(s),
00078         have_remote_endpoint_(true),
00079         remote_endpoint_(ep)
00080     {
00081     }
00082 
00083     void operator=(socket_type s)
00084     {
00085       socket_ = s;
00086       have_remote_endpoint_ = false;
00087       remote_endpoint_ = endpoint_type();
00088     }
00089 
00090     operator socket_type() const
00091     {
00092       return socket_;
00093     }
00094 
00095     HANDLE as_handle() const
00096     {
00097       return reinterpret_cast<HANDLE>(socket_);
00098     }
00099 
00100     bool have_remote_endpoint() const
00101     {
00102       return have_remote_endpoint_;
00103     }
00104 
00105     endpoint_type remote_endpoint() const
00106     {
00107       return remote_endpoint_;
00108     }
00109 
00110   private:
00111     socket_type socket_;
00112     bool have_remote_endpoint_;
00113     endpoint_type remote_endpoint_;
00114   };
00115 
00116   // The type of the reactor used for connect operations.
00117   typedef detail::select_reactor<true> reactor_type;
00118 
00119   // The implementation type of the socket.
00120   class implementation_type
00121   {
00122   public:
00123     // Default constructor.
00124     implementation_type()
00125       : socket_(invalid_socket),
00126         flags_(0),
00127         cancel_token_(),
00128         protocol_(endpoint_type().protocol()),
00129         next_(0),
00130         prev_(0)
00131     {
00132     }
00133 
00134   private:
00135     // Only this service will have access to the internal values.
00136     friend class win_iocp_socket_service;
00137 
00138     // The native socket representation.
00139     native_type socket_;
00140 
00141     enum
00142     {
00143       enable_connection_aborted = 1, // User wants connection_aborted errors.
00144       close_might_block = 2, // User set linger option for blocking close.
00145       user_set_non_blocking = 4 // The user wants a non-blocking socket.
00146     };
00147 
00148     // Flags indicating the current state of the socket.
00149     unsigned char flags_;
00150 
00151     // We use a shared pointer as a cancellation token here to work around the
00152     // broken Windows support for cancellation. MSDN says that when you call
00153     // closesocket any outstanding WSARecv or WSASend operations will complete
00154     // with the error ERROR_OPERATION_ABORTED. In practice they complete with
00155     // ERROR_NETNAME_DELETED, which means you can't tell the difference between
00156     // a local cancellation and the socket being hard-closed by the peer.
00157     shared_cancel_token_type cancel_token_;
00158 
00159     // The protocol associated with the socket.
00160     protocol_type protocol_;
00161 
00162     // Per-descriptor data used by the reactor.
00163     reactor_type::per_descriptor_data reactor_data_;
00164 
00165 #if defined(ASIO_ENABLE_CANCELIO)
00166     // The ID of the thread from which it is safe to cancel asynchronous
00167     // operations. 0 means no asynchronous operations have been started yet.
00168     // ~0 means asynchronous operations have been started from more than one
00169     // thread, and cancellation is not supported for the socket.
00170     DWORD safe_cancellation_thread_id_;
00171 #endif // defined(ASIO_ENABLE_CANCELIO)
00172 
00173     // Pointers to adjacent socket implementations in linked list.
00174     implementation_type* next_;
00175     implementation_type* prev_;
00176   };
00177 
00178   // The maximum number of buffers to support in a single operation.
00179   enum { max_buffers = 64 < max_iov_len ? 64 : max_iov_len };
00180 
00181   // Constructor.
00182   win_iocp_socket_service(asio::io_service& io_service)
00183     : asio::detail::service_base<
00184         win_iocp_socket_service<Protocol> >(io_service),
00185       iocp_service_(asio::use_service<win_iocp_io_service>(io_service)),
00186       reactor_(0),
00187       mutex_(),
00188       impl_list_(0)
00189   {
00190   }
00191 
00192   // Destroy all user-defined handler objects owned by the service.
00193   void shutdown_service()
00194   {
00195     // Close all implementations, causing all operations to complete.
00196     asio::detail::mutex::scoped_lock lock(mutex_);
00197     implementation_type* impl = impl_list_;
00198     while (impl)
00199     {
00200       asio::error_code ignored_ec;
00201       close_for_destruction(*impl);
00202       impl = impl->next_;
00203     }
00204   }
00205 
00206   // Construct a new socket implementation.
00207   void construct(implementation_type& impl)
00208   {
00209     impl.socket_ = invalid_socket;
00210     impl.flags_ = 0;
00211     impl.cancel_token_.reset();
00212 #if defined(ASIO_ENABLE_CANCELIO)
00213     impl.safe_cancellation_thread_id_ = 0;
00214 #endif // defined(ASIO_ENABLE_CANCELIO)
00215 
00216     // Insert implementation into linked list of all implementations.
00217     asio::detail::mutex::scoped_lock lock(mutex_);
00218     impl.next_ = impl_list_;
00219     impl.prev_ = 0;
00220     if (impl_list_)
00221       impl_list_->prev_ = &impl;
00222     impl_list_ = &impl;
00223   }
00224 
00225   // Destroy a socket implementation.
00226   void destroy(implementation_type& impl)
00227   {
00228     close_for_destruction(impl);
00229 
00230     // Remove implementation from linked list of all implementations.
00231     asio::detail::mutex::scoped_lock lock(mutex_);
00232     if (impl_list_ == &impl)
00233       impl_list_ = impl.next_;
00234     if (impl.prev_)
00235       impl.prev_->next_ = impl.next_;
00236     if (impl.next_)
00237       impl.next_->prev_= impl.prev_;
00238     impl.next_ = 0;
00239     impl.prev_ = 0;
00240   }
00241 
00242   // Open a new socket implementation.
00243   asio::error_code open(implementation_type& impl,
00244       const protocol_type& protocol, asio::error_code& ec)
00245   {
00246     if (is_open(impl))
00247     {
00248       ec = asio::error::already_open;
00249       return ec;
00250     }
00251 
00252     socket_holder sock(socket_ops::socket(protocol.family(), protocol.type(),
00253           protocol.protocol(), ec));
00254     if (sock.get() == invalid_socket)
00255       return ec;
00256 
00257     HANDLE sock_as_handle = reinterpret_cast<HANDLE>(sock.get());
00258     if (iocp_service_.register_handle(sock_as_handle, ec))
00259       return ec;
00260 
00261     impl.socket_ = sock.release();
00262     impl.flags_ = 0;
00263     impl.cancel_token_.reset(static_cast<void*>(0), noop_deleter());
00264     impl.protocol_ = protocol;
00265     ec = asio::error_code();
00266     return ec;
00267   }
00268 
00269   // Assign a native socket to a socket implementation.
00270   asio::error_code assign(implementation_type& impl,
00271       const protocol_type& protocol, const native_type& native_socket,
00272       asio::error_code& ec)
00273   {
00274     if (is_open(impl))
00275     {
00276       ec = asio::error::already_open;
00277       return ec;
00278     }
00279 
00280     if (iocp_service_.register_handle(native_socket.as_handle(), ec))
00281       return ec;
00282 
00283     impl.socket_ = native_socket;
00284     impl.flags_ = 0;
00285     impl.cancel_token_.reset(static_cast<void*>(0), noop_deleter());
00286     impl.protocol_ = protocol;
00287     ec = asio::error_code();
00288     return ec;
00289   }
00290 
00291   // Determine whether the socket is open.
00292   bool is_open(const implementation_type& impl) const
00293   {
00294     return impl.socket_ != invalid_socket;
00295   }
00296 
00297   // Destroy a socket implementation.
00298   asio::error_code close(implementation_type& impl,
00299       asio::error_code& ec)
00300   {
00301     if (is_open(impl))
00302     {
00303       // Check if the reactor was created, in which case we need to close the
00304       // socket on the reactor as well to cancel any operations that might be
00305       // running there.
00306       reactor_type* reactor = static_cast<reactor_type*>(
00307             interlocked_compare_exchange_pointer(
00308               reinterpret_cast<void**>(&reactor_), 0, 0));
00309       if (reactor)
00310         reactor->close_descriptor(impl.socket_, impl.reactor_data_);
00311 
00312       if (socket_ops::close(impl.socket_, ec) == socket_error_retval)
00313         return ec;
00314 
00315       impl.socket_ = invalid_socket;
00316       impl.flags_ = 0;
00317       impl.cancel_token_.reset();
00318 #if defined(ASIO_ENABLE_CANCELIO)
00319       impl.safe_cancellation_thread_id_ = 0;
00320 #endif // defined(ASIO_ENABLE_CANCELIO)
00321     }
00322 
00323     ec = asio::error_code();
00324     return ec;
00325   }
00326 
00327   // Get the native socket representation.
00328   native_type native(implementation_type& impl)
00329   {
00330     return impl.socket_;
00331   }
00332 
00333   // Cancel all operations associated with the socket.
00334   asio::error_code cancel(implementation_type& impl,
00335       asio::error_code& ec)
00336   {
00337     if (!is_open(impl))
00338     {
00339       ec = asio::error::bad_descriptor;
00340       return ec;
00341     }
00342     else if (FARPROC cancel_io_ex_ptr = ::GetProcAddress(
00343           ::GetModuleHandleA("KERNEL32"), "CancelIoEx"))
00344     {
00345       // The version of Windows supports cancellation from any thread.
00346       typedef BOOL (WINAPI* cancel_io_ex_t)(HANDLE, LPOVERLAPPED);
00347       cancel_io_ex_t cancel_io_ex = (cancel_io_ex_t)cancel_io_ex_ptr;
00348       socket_type sock = impl.socket_;
00349       HANDLE sock_as_handle = reinterpret_cast<HANDLE>(sock);
00350       if (!cancel_io_ex(sock_as_handle, 0))
00351       {
00352         DWORD last_error = ::GetLastError();
00353         if (last_error == ERROR_NOT_FOUND)
00354         {
00355           // ERROR_NOT_FOUND means that there were no operations to be
00356           // cancelled. We swallow this error to match the behaviour on other
00357           // platforms.
00358           ec = asio::error_code();
00359         }
00360         else
00361         {
00362           ec = asio::error_code(last_error,
00363               asio::error::get_system_category());
00364         }
00365       }
00366       else
00367       {
00368         ec = asio::error_code();
00369       }
00370     }
00371 #if defined(ASIO_ENABLE_CANCELIO)
00372     else if (impl.safe_cancellation_thread_id_ == 0)
00373     {
00374       // No operations have been started, so there's nothing to cancel.
00375       ec = asio::error_code();
00376     }
00377     else if (impl.safe_cancellation_thread_id_ == ::GetCurrentThreadId())
00378     {
00379       // Asynchronous operations have been started from the current thread only,
00380       // so it is safe to try to cancel them using CancelIo.
00381       socket_type sock = impl.socket_;
00382       HANDLE sock_as_handle = reinterpret_cast<HANDLE>(sock);
00383       if (!::CancelIo(sock_as_handle))
00384       {
00385         DWORD last_error = ::GetLastError();
00386         ec = asio::error_code(last_error,
00387             asio::error::get_system_category());
00388       }
00389       else
00390       {
00391         ec = asio::error_code();
00392       }
00393     }
00394     else
00395     {
00396       // Asynchronous operations have been started from more than one thread,
00397       // so cancellation is not safe.
00398       ec = asio::error::operation_not_supported;
00399     }
00400 #else // defined(ASIO_ENABLE_CANCELIO)
00401     else
00402     {
00403       // Cancellation is not supported as CancelIo may not be used.
00404       ec = asio::error::operation_not_supported;
00405     }
00406 #endif // defined(ASIO_ENABLE_CANCELIO)
00407 
00408     return ec;
00409   }
00410 
00411   // Determine whether the socket is at the out-of-band data mark.
00412   bool at_mark(const implementation_type& impl,
00413       asio::error_code& ec) const
00414   {
00415     if (!is_open(impl))
00416     {
00417       ec = asio::error::bad_descriptor;
00418       return false;
00419     }
00420 
00421     asio::detail::ioctl_arg_type value = 0;
00422     socket_ops::ioctl(impl.socket_, SIOCATMARK, &value, ec);
00423     return ec ? false : value != 0;
00424   }
00425 
00426   // Determine the number of bytes available for reading.
00427   std::size_t available(const implementation_type& impl,
00428       asio::error_code& ec) const
00429   {
00430     if (!is_open(impl))
00431     {
00432       ec = asio::error::bad_descriptor;
00433       return 0;
00434     }
00435 
00436     asio::detail::ioctl_arg_type value = 0;
00437     socket_ops::ioctl(impl.socket_, FIONREAD, &value, ec);
00438     return ec ? static_cast<std::size_t>(0) : static_cast<std::size_t>(value);
00439   }
00440 
00441   // Bind the socket to the specified local endpoint.
00442   asio::error_code bind(implementation_type& impl,
00443       const endpoint_type& endpoint, asio::error_code& ec)
00444   {
00445     if (!is_open(impl))
00446     {
00447       ec = asio::error::bad_descriptor;
00448       return ec;
00449     }
00450 
00451     socket_ops::bind(impl.socket_, endpoint.data(), endpoint.size(), ec);
00452     return ec;
00453   }
00454 
00455   // Place the socket into the state where it will listen for new connections.
00456   asio::error_code listen(implementation_type& impl, int backlog,
00457       asio::error_code& ec)
00458   {
00459     if (!is_open(impl))
00460     {
00461       ec = asio::error::bad_descriptor;
00462       return ec;
00463     }
00464 
00465     socket_ops::listen(impl.socket_, backlog, ec);
00466     return ec;
00467   }
00468 
00469   // Set a socket option.
00470   template <typename Option>
00471   asio::error_code set_option(implementation_type& impl,
00472       const Option& option, asio::error_code& ec)
00473   {
00474     if (!is_open(impl))
00475     {
00476       ec = asio::error::bad_descriptor;
00477       return ec;
00478     }
00479 
00480     if (option.level(impl.protocol_) == custom_socket_option_level
00481         && option.name(impl.protocol_) == enable_connection_aborted_option)
00482     {
00483       if (option.size(impl.protocol_) != sizeof(int))
00484       {
00485         ec = asio::error::invalid_argument;
00486       }
00487       else
00488       {
00489         if (*reinterpret_cast<const int*>(option.data(impl.protocol_)))
00490           impl.flags_ |= implementation_type::enable_connection_aborted;
00491         else
00492           impl.flags_ &= ~implementation_type::enable_connection_aborted;
00493         ec = asio::error_code();
00494       }
00495       return ec;
00496     }
00497     else
00498     {
00499       if (option.level(impl.protocol_) == SOL_SOCKET
00500           && option.name(impl.protocol_) == SO_LINGER)
00501       {
00502         const ::linger* linger_option =
00503           reinterpret_cast<const ::linger*>(option.data(impl.protocol_));
00504         if (linger_option->l_onoff != 0 && linger_option->l_linger != 0)
00505           impl.flags_ |= implementation_type::close_might_block;
00506         else
00507           impl.flags_ &= ~implementation_type::close_might_block;
00508       }
00509 
00510       socket_ops::setsockopt(impl.socket_,
00511           option.level(impl.protocol_), option.name(impl.protocol_),
00512           option.data(impl.protocol_), option.size(impl.protocol_), ec);
00513       return ec;
00514     }
00515   }
00516 
00517   // Set a socket option.
00518   template <typename Option>
00519   asio::error_code get_option(const implementation_type& impl,
00520       Option& option, asio::error_code& ec) const
00521   {
00522     if (!is_open(impl))
00523     {
00524       ec = asio::error::bad_descriptor;
00525       return ec;
00526     }
00527 
00528     if (option.level(impl.protocol_) == custom_socket_option_level
00529         && option.name(impl.protocol_) == enable_connection_aborted_option)
00530     {
00531       if (option.size(impl.protocol_) != sizeof(int))
00532       {
00533         ec = asio::error::invalid_argument;
00534       }
00535       else
00536       {
00537         int* target = reinterpret_cast<int*>(option.data(impl.protocol_));
00538         if (impl.flags_ & implementation_type::enable_connection_aborted)
00539           *target = 1;
00540         else
00541           *target = 0;
00542         option.resize(impl.protocol_, sizeof(int));
00543         ec = asio::error_code();
00544       }
00545       return ec;
00546     }
00547     else
00548     {
00549       size_t size = option.size(impl.protocol_);
00550       socket_ops::getsockopt(impl.socket_,
00551           option.level(impl.protocol_), option.name(impl.protocol_),
00552           option.data(impl.protocol_), &size, ec);
00553       if (!ec)
00554         option.resize(impl.protocol_, size);
00555       return ec;
00556     }
00557   }
00558 
00559   // Perform an IO control command on the socket.
00560   template <typename IO_Control_Command>
00561   asio::error_code io_control(implementation_type& impl,
00562       IO_Control_Command& command, asio::error_code& ec)
00563   {
00564     if (!is_open(impl))
00565     {
00566       ec = asio::error::bad_descriptor;
00567       return ec;
00568     }
00569 
00570     socket_ops::ioctl(impl.socket_, command.name(),
00571         static_cast<ioctl_arg_type*>(command.data()), ec);
00572 
00573     if (!ec && command.name() == static_cast<int>(FIONBIO))
00574     {
00575       if (command.get())
00576         impl.flags_ |= implementation_type::user_set_non_blocking;
00577       else
00578         impl.flags_ &= ~implementation_type::user_set_non_blocking;
00579     }
00580 
00581     return ec;
00582   }
00583 
00584   // Get the local endpoint.
00585   endpoint_type local_endpoint(const implementation_type& impl,
00586       asio::error_code& ec) const
00587   {
00588     if (!is_open(impl))
00589     {
00590       ec = asio::error::bad_descriptor;
00591       return endpoint_type();
00592     }
00593 
00594     endpoint_type endpoint;
00595     std::size_t addr_len = endpoint.capacity();
00596     if (socket_ops::getsockname(impl.socket_, endpoint.data(), &addr_len, ec))
00597       return endpoint_type();
00598     endpoint.resize(addr_len);
00599     return endpoint;
00600   }
00601 
00602   // Get the remote endpoint.
00603   endpoint_type remote_endpoint(const implementation_type& impl,
00604       asio::error_code& ec) const
00605   {
00606     if (!is_open(impl))
00607     {
00608       ec = asio::error::bad_descriptor;
00609       return endpoint_type();
00610     }
00611 
00612     if (impl.socket_.have_remote_endpoint())
00613     {
00614       // Check if socket is still connected.
00615       DWORD connect_time = 0;
00616       size_t connect_time_len = sizeof(connect_time);
00617       if (socket_ops::getsockopt(impl.socket_, SOL_SOCKET, SO_CONNECT_TIME,
00618             &connect_time, &connect_time_len, ec) == socket_error_retval)
00619       {
00620         return endpoint_type();
00621       }
00622       if (connect_time == 0xFFFFFFFF)
00623       {
00624         ec = asio::error::not_connected;
00625         return endpoint_type();
00626       }
00627 
00628       ec = asio::error_code();
00629       return impl.socket_.remote_endpoint();
00630     }
00631     else
00632     {
00633       endpoint_type endpoint;
00634       std::size_t addr_len = endpoint.capacity();
00635       if (socket_ops::getpeername(impl.socket_, endpoint.data(), &addr_len, ec))
00636         return endpoint_type();
00637       endpoint.resize(addr_len);
00638       return endpoint;
00639     }
00640   }
00641 
00643   asio::error_code shutdown(implementation_type& impl,
00644       socket_base::shutdown_type what, asio::error_code& ec)
00645   {
00646     if (!is_open(impl))
00647     {
00648       ec = asio::error::bad_descriptor;
00649       return ec;
00650     }
00651 
00652     socket_ops::shutdown(impl.socket_, what, ec);
00653     return ec;
00654   }
00655 
00656   // Send the given data to the peer. Returns the number of bytes sent.
00657   template <typename ConstBufferSequence>
00658   size_t send(implementation_type& impl, const ConstBufferSequence& buffers,
00659       socket_base::message_flags flags, asio::error_code& ec)
00660   {
00661     if (!is_open(impl))
00662     {
00663       ec = asio::error::bad_descriptor;
00664       return 0;
00665     }
00666 
00667     // Copy buffers into WSABUF array.
00668     ::WSABUF bufs[max_buffers];
00669     typename ConstBufferSequence::const_iterator iter = buffers.begin();
00670     typename ConstBufferSequence::const_iterator end = buffers.end();
00671     DWORD i = 0;
00672     size_t total_buffer_size = 0;
00673     for (; iter != end && i < max_buffers; ++iter, ++i)
00674     {
00675       asio::const_buffer buffer(*iter);
00676       bufs[i].len = static_cast<u_long>(asio::buffer_size(buffer));
00677       bufs[i].buf = const_cast<char*>(
00678           asio::buffer_cast<const char*>(buffer));
00679       total_buffer_size += asio::buffer_size(buffer);
00680     }
00681 
00682     // A request to receive 0 bytes on a stream socket is a no-op.
00683     if (impl.protocol_.type() == SOCK_STREAM && total_buffer_size == 0)
00684     {
00685       ec = asio::error_code();
00686       return 0;
00687     }
00688 
00689     // Send the data.
00690     DWORD bytes_transferred = 0;
00691     int result = ::WSASend(impl.socket_, bufs,
00692         i, &bytes_transferred, flags, 0, 0);
00693     if (result != 0)
00694     {
00695       DWORD last_error = ::WSAGetLastError();
00696       if (last_error == ERROR_NETNAME_DELETED)
00697         last_error = WSAECONNRESET;
00698       else if (last_error == ERROR_PORT_UNREACHABLE)
00699         last_error = WSAECONNREFUSED;
00700       ec = asio::error_code(last_error,
00701           asio::error::get_system_category());
00702       return 0;
00703     }
00704 
00705     ec = asio::error_code();
00706     return bytes_transferred;
00707   }
00708 
00709   // Wait until data can be sent without blocking.
00710   size_t send(implementation_type& impl, const null_buffers&,
00711       socket_base::message_flags, asio::error_code& ec)
00712   {
00713     if (!is_open(impl))
00714     {
00715       ec = asio::error::bad_descriptor;
00716       return 0;
00717     }
00718 
00719     // Wait for socket to become ready.
00720     socket_ops::poll_write(impl.socket_, ec);
00721 
00722     return 0;
00723   }
00724 
00725   template <typename ConstBufferSequence, typename Handler>
00726   class send_operation
00727     : public operation
00728   {
00729   public:
00730     send_operation(win_iocp_io_service& io_service,
00731         weak_cancel_token_type cancel_token,
00732         const ConstBufferSequence& buffers, Handler handler)
00733       : operation(io_service,
00734           &send_operation<ConstBufferSequence, Handler>::do_completion_impl,
00735           &send_operation<ConstBufferSequence, Handler>::destroy_impl),
00736         work_(io_service.get_io_service()),
00737         cancel_token_(cancel_token),
00738         buffers_(buffers),
00739         handler_(handler)
00740     {
00741     }
00742 
00743   private:
00744     static void do_completion_impl(operation* op,
00745         DWORD last_error, size_t bytes_transferred)
00746     {
00747       // Take ownership of the operation object.
00748       typedef send_operation<ConstBufferSequence, Handler> op_type;
00749       op_type* handler_op(static_cast<op_type*>(op));
00750       typedef handler_alloc_traits<Handler, op_type> alloc_traits;
00751       handler_ptr<alloc_traits> ptr(handler_op->handler_, handler_op);
00752 
00753 #if defined(ASIO_ENABLE_BUFFER_DEBUGGING)
00754       // Check whether buffers are still valid.
00755       typename ConstBufferSequence::const_iterator iter
00756         = handler_op->buffers_.begin();
00757       typename ConstBufferSequence::const_iterator end
00758         = handler_op->buffers_.end();
00759       while (iter != end)
00760       {
00761         asio::const_buffer buffer(*iter);
00762         asio::buffer_cast<const char*>(buffer);
00763         ++iter;
00764       }
00765 #endif // defined(ASIO_ENABLE_BUFFER_DEBUGGING)
00766 
00767       // Map non-portable errors to their portable counterparts.
00768       asio::error_code ec(last_error,
00769           asio::error::get_system_category());
00770       if (ec.value() == ERROR_NETNAME_DELETED)
00771       {
00772         if (handler_op->cancel_token_.expired())
00773           ec = asio::error::operation_aborted;
00774         else
00775           ec = asio::error::connection_reset;
00776       }
00777       else if (ec.value() == ERROR_PORT_UNREACHABLE)
00778       {
00779         ec = asio::error::connection_refused;
00780       }
00781 
00782       // Make a copy of the handler so that the memory can be deallocated before
00783       // the upcall is made.
00784       Handler handler(handler_op->handler_);
00785 
00786       // Free the memory associated with the handler.
00787       ptr.reset();
00788 
00789       // Call the handler.
00790       asio_handler_invoke_helpers::invoke(
00791           detail::bind_handler(handler, ec, bytes_transferred), &handler);
00792     }
00793 
00794     static void destroy_impl(operation* op)
00795     {
00796       // Take ownership of the operation object.
00797       typedef send_operation<ConstBufferSequence, Handler> op_type;
00798       op_type* handler_op(static_cast<op_type*>(op));
00799       typedef handler_alloc_traits<Handler, op_type> alloc_traits;
00800       handler_ptr<alloc_traits> ptr(handler_op->handler_, handler_op);
00801 
00802       // A sub-object of the handler may be the true owner of the memory
00803       // associated with the handler. Consequently, a local copy of the handler
00804       // is required to ensure that any owning sub-object remains valid until
00805       // after we have deallocated the memory here.
00806       Handler handler(handler_op->handler_);
00807       (void)handler;
00808 
00809       // Free the memory associated with the handler.
00810       ptr.reset();
00811     }
00812 
00813     asio::io_service::work work_;
00814     weak_cancel_token_type cancel_token_;
00815     ConstBufferSequence buffers_;
00816     Handler handler_;
00817   };
00818 
00819   // Start an asynchronous send. The data being sent must be valid for the
00820   // lifetime of the asynchronous operation.
00821   template <typename ConstBufferSequence, typename Handler>
00822   void async_send(implementation_type& impl, const ConstBufferSequence& buffers,
00823       socket_base::message_flags flags, Handler handler)
00824   {
00825     if (!is_open(impl))
00826     {
00827       this->get_io_service().post(bind_handler(handler,
00828             asio::error::bad_descriptor, 0));
00829       return;
00830     }
00831 
00832 #if defined(ASIO_ENABLE_CANCELIO)
00833     // Update the ID of the thread from which cancellation is safe.
00834     if (impl.safe_cancellation_thread_id_ == 0)
00835       impl.safe_cancellation_thread_id_ = ::GetCurrentThreadId();
00836     else if (impl.safe_cancellation_thread_id_ != ::GetCurrentThreadId())
00837       impl.safe_cancellation_thread_id_ = ~DWORD(0);
00838 #endif // defined(ASIO_ENABLE_CANCELIO)
00839 
00840     // Allocate and construct an operation to wrap the handler.
00841     typedef send_operation<ConstBufferSequence, Handler> value_type;
00842     typedef handler_alloc_traits<Handler, value_type> alloc_traits;
00843     raw_handler_ptr<alloc_traits> raw_ptr(handler);
00844     handler_ptr<alloc_traits> ptr(raw_ptr, iocp_service_,
00845         impl.cancel_token_, buffers, handler);
00846 
00847     // Copy buffers into WSABUF array.
00848     ::WSABUF bufs[max_buffers];
00849     typename ConstBufferSequence::const_iterator iter = buffers.begin();
00850     typename ConstBufferSequence::const_iterator end = buffers.end();
00851     DWORD i = 0;
00852     size_t total_buffer_size = 0;
00853     for (; iter != end && i < max_buffers; ++iter, ++i)
00854     {
00855       asio::const_buffer buffer(*iter);
00856       bufs[i].len = static_cast<u_long>(asio::buffer_size(buffer));
00857       bufs[i].buf = const_cast<char*>(
00858           asio::buffer_cast<const char*>(buffer));
00859       total_buffer_size += asio::buffer_size(buffer);
00860     }
00861 
00862     // A request to receive 0 bytes on a stream socket is a no-op.
00863     if (impl.protocol_.type() == SOCK_STREAM && total_buffer_size == 0)
00864     {
00865       asio::io_service::work work(this->get_io_service());
00866       ptr.reset();
00867       asio::error_code error;
00868       iocp_service_.post(bind_handler(handler, error, 0));
00869       return;
00870     }
00871 
00872     // Send the data.
00873     DWORD bytes_transferred = 0;
00874     int result = ::WSASend(impl.socket_, bufs, i,
00875         &bytes_transferred, flags, ptr.get(), 0);
00876     DWORD last_error = ::WSAGetLastError();
00877 
00878     // Check if the operation completed immediately.
00879     if (result != 0 && last_error != WSA_IO_PENDING)
00880     {
00881       asio::io_service::work work(this->get_io_service());
00882       ptr.reset();
00883       asio::error_code ec(last_error,
00884           asio::error::get_system_category());
00885       iocp_service_.post(bind_handler(handler, ec, bytes_transferred));
00886     }
00887     else
00888     {
00889       ptr.release();
00890     }
00891   }
00892 
00893   template <typename Handler>
00894   class null_buffers_operation
00895   {
00896   public:
00897     null_buffers_operation(asio::io_service& io_service, Handler handler)
00898       : work_(io_service),
00899         handler_(handler)
00900     {
00901     }
00902 
00903     bool perform(asio::error_code&,
00904         std::size_t& bytes_transferred)
00905     {
00906       bytes_transferred = 0;
00907       return true;
00908     }
00909 
00910     void complete(const asio::error_code& ec,
00911         std::size_t bytes_transferred)
00912     {
00913       work_.get_io_service().post(bind_handler(
00914             handler_, ec, bytes_transferred));
00915     }
00916 
00917   private:
00918     asio::io_service::work work_;
00919     Handler handler_;
00920   };
00921 
00922   // Start an asynchronous wait until data can be sent without blocking.
00923   template <typename Handler>
00924   void async_send(implementation_type& impl, const null_buffers&,
00925       socket_base::message_flags, Handler handler)
00926   {
00927     if (!is_open(impl))
00928     {
00929       this->get_io_service().post(bind_handler(handler,
00930             asio::error::bad_descriptor, 0));
00931     }
00932     else
00933     {
00934       // Check if the reactor was already obtained from the io_service.
00935       reactor_type* reactor = static_cast<reactor_type*>(
00936             interlocked_compare_exchange_pointer(
00937               reinterpret_cast<void**>(&reactor_), 0, 0));
00938       if (!reactor)
00939       {
00940         reactor = &(asio::use_service<reactor_type>(
00941               this->get_io_service()));
00942         interlocked_exchange_pointer(
00943             reinterpret_cast<void**>(&reactor_), reactor);
00944       }
00945 
00946       reactor->start_write_op(impl.socket_, impl.reactor_data_,
00947           null_buffers_operation<Handler>(this->get_io_service(), handler),
00948           false);
00949     }
00950   }
00951 
00952   // Send a datagram to the specified endpoint. Returns the number of bytes
00953   // sent.
00954   template <typename ConstBufferSequence>
00955   size_t send_to(implementation_type& impl, const ConstBufferSequence& buffers,
00956       const endpoint_type& destination, socket_base::message_flags flags,
00957       asio::error_code& ec)
00958   {
00959     if (!is_open(impl))
00960     {
00961       ec = asio::error::bad_descriptor;
00962       return 0;
00963     }
00964 
00965     // Copy buffers into WSABUF array.
00966     ::WSABUF bufs[max_buffers];
00967     typename ConstBufferSequence::const_iterator iter = buffers.begin();
00968     typename ConstBufferSequence::const_iterator end = buffers.end();
00969     DWORD i = 0;
00970     for (; iter != end && i < max_buffers; ++iter, ++i)
00971     {
00972       asio::const_buffer buffer(*iter);
00973       bufs[i].len = static_cast<u_long>(asio::buffer_size(buffer));
00974       bufs[i].buf = const_cast<char*>(
00975           asio::buffer_cast<const char*>(buffer));
00976     }
00977 
00978     // Send the data.
00979     DWORD bytes_transferred = 0;
00980     int result = ::WSASendTo(impl.socket_, bufs, i, &bytes_transferred,
00981         flags, destination.data(), static_cast<int>(destination.size()), 0, 0);
00982     if (result != 0)
00983     {
00984       DWORD last_error = ::WSAGetLastError();
00985       if (last_error == ERROR_PORT_UNREACHABLE)
00986         last_error = WSAECONNREFUSED;
00987       ec = asio::error_code(last_error,
00988           asio::error::get_system_category());
00989       return 0;
00990     }
00991 
00992     ec = asio::error_code();
00993     return bytes_transferred;
00994   }
00995 
00996   // Wait until data can be sent without blocking.
00997   size_t send_to(implementation_type& impl, const null_buffers&,
00998       socket_base::message_flags, const endpoint_type&,
00999       asio::error_code& ec)
01000   {
01001     if (!is_open(impl))
01002     {
01003       ec = asio::error::bad_descriptor;
01004       return 0;
01005     }
01006 
01007     // Wait for socket to become ready.
01008     socket_ops::poll_write(impl.socket_, ec);
01009 
01010     return 0;
01011   }
01012 
01013   template <typename ConstBufferSequence, typename Handler>
01014   class send_to_operation
01015     : public operation
01016   {
01017   public:
01018     send_to_operation(win_iocp_io_service& io_service,
01019         const ConstBufferSequence& buffers, Handler handler)
01020       : operation(io_service,
01021           &send_to_operation<ConstBufferSequence, Handler>::do_completion_impl,
01022           &send_to_operation<ConstBufferSequence, Handler>::destroy_impl),
01023         work_(io_service.get_io_service()),
01024         buffers_(buffers),
01025         handler_(handler)
01026     {
01027     }
01028 
01029   private:
01030     static void do_completion_impl(operation* op,
01031         DWORD last_error, size_t bytes_transferred)
01032     {
01033       // Take ownership of the operation object.
01034       typedef send_to_operation<ConstBufferSequence, Handler> op_type;
01035       op_type* handler_op(static_cast<op_type*>(op));
01036       typedef handler_alloc_traits<Handler, op_type> alloc_traits;
01037       handler_ptr<alloc_traits> ptr(handler_op->handler_, handler_op);
01038 
01039 #if defined(ASIO_ENABLE_BUFFER_DEBUGGING)
01040       // Check whether buffers are still valid.
01041       typename ConstBufferSequence::const_iterator iter
01042         = handler_op->buffers_.begin();
01043       typename ConstBufferSequence::const_iterator end
01044         = handler_op->buffers_.end();
01045       while (iter != end)
01046       {
01047         asio::const_buffer buffer(*iter);
01048         asio::buffer_cast<const char*>(buffer);
01049         ++iter;
01050       }
01051 #endif // defined(ASIO_ENABLE_BUFFER_DEBUGGING)
01052 
01053       // Map non-portable errors to their portable counterparts.
01054       asio::error_code ec(last_error,
01055           asio::error::get_system_category());
01056       if (ec.value() == ERROR_PORT_UNREACHABLE)
01057       {
01058         ec = asio::error::connection_refused;
01059       }
01060 
01061       // Make a copy of the handler so that the memory can be deallocated before
01062       // the upcall is made.
01063       Handler handler(handler_op->handler_);
01064 
01065       // Free the memory associated with the handler.
01066       ptr.reset();
01067 
01068       // Call the handler.
01069       asio_handler_invoke_helpers::invoke(
01070           detail::bind_handler(handler, ec, bytes_transferred), &handler);
01071     }
01072 
01073     static void destroy_impl(operation* op)
01074     {
01075       // Take ownership of the operation object.
01076       typedef send_to_operation<ConstBufferSequence, Handler> op_type;
01077       op_type* handler_op(static_cast<op_type*>(op));
01078       typedef handler_alloc_traits<Handler, op_type> alloc_traits;
01079       handler_ptr<alloc_traits> ptr(handler_op->handler_, handler_op);
01080 
01081       // A sub-object of the handler may be the true owner of the memory
01082       // associated with the handler. Consequently, a local copy of the handler
01083       // is required to ensure that any owning sub-object remains valid until
01084       // after we have deallocated the memory here.
01085       Handler handler(handler_op->handler_);
01086       (void)handler;
01087 
01088       // Free the memory associated with the handler.
01089       ptr.reset();
01090     }
01091 
01092     asio::io_service::work work_;
01093     ConstBufferSequence buffers_;
01094     Handler handler_;
01095   };
01096 
01097   // Start an asynchronous send. The data being sent must be valid for the
01098   // lifetime of the asynchronous operation.
01099   template <typename ConstBufferSequence, typename Handler>
01100   void async_send_to(implementation_type& impl,
01101       const ConstBufferSequence& buffers, const endpoint_type& destination,
01102       socket_base::message_flags flags, Handler handler)
01103   {
01104     if (!is_open(impl))
01105     {
01106       this->get_io_service().post(bind_handler(handler,
01107             asio::error::bad_descriptor, 0));
01108       return;
01109     }
01110 
01111 #if defined(ASIO_ENABLE_CANCELIO)
01112     // Update the ID of the thread from which cancellation is safe.
01113     if (impl.safe_cancellation_thread_id_ == 0)
01114       impl.safe_cancellation_thread_id_ = ::GetCurrentThreadId();
01115     else if (impl.safe_cancellation_thread_id_ != ::GetCurrentThreadId())
01116       impl.safe_cancellation_thread_id_ = ~DWORD(0);
01117 #endif // defined(ASIO_ENABLE_CANCELIO)
01118 
01119     // Allocate and construct an operation to wrap the handler.
01120     typedef send_to_operation<ConstBufferSequence, Handler> value_type;
01121     typedef handler_alloc_traits<Handler, value_type> alloc_traits;
01122     raw_handler_ptr<alloc_traits> raw_ptr(handler);
01123     handler_ptr<alloc_traits> ptr(raw_ptr, iocp_service_, buffers, handler);
01124 
01125     // Copy buffers into WSABUF array.
01126     ::WSABUF bufs[max_buffers];
01127     typename ConstBufferSequence::const_iterator iter = buffers.begin();
01128     typename ConstBufferSequence::const_iterator end = buffers.end();
01129     DWORD i = 0;
01130     for (; iter != end && i < max_buffers; ++iter, ++i)
01131     {
01132       asio::const_buffer buffer(*iter);
01133       bufs[i].len = static_cast<u_long>(asio::buffer_size(buffer));
01134       bufs[i].buf = const_cast<char*>(
01135           asio::buffer_cast<const char*>(buffer));
01136     }
01137 
01138     // Send the data.
01139     DWORD bytes_transferred = 0;
01140     int result = ::WSASendTo(impl.socket_, bufs, i, &bytes_transferred, flags,
01141         destination.data(), static_cast<int>(destination.size()), ptr.get(), 0);
01142     DWORD last_error = ::WSAGetLastError();
01143 
01144     // Check if the operation completed immediately.
01145     if (result != 0 && last_error != WSA_IO_PENDING)
01146     {
01147       asio::io_service::work work(this->get_io_service());
01148       ptr.reset();
01149       asio::error_code ec(last_error,
01150           asio::error::get_system_category());
01151       iocp_service_.post(bind_handler(handler, ec, bytes_transferred));
01152     }
01153     else
01154     {
01155       ptr.release();
01156     }
01157   }
01158 
01159   // Start an asynchronous wait until data can be sent without blocking.
01160   template <typename Handler>
01161   void async_send_to(implementation_type& impl, const null_buffers&,
01162       socket_base::message_flags, const endpoint_type&, Handler handler)
01163   {
01164     if (!is_open(impl))
01165     {
01166       this->get_io_service().post(bind_handler(handler,
01167             asio::error::bad_descriptor, 0));
01168     }
01169     else
01170     {
01171       // Check if the reactor was already obtained from the io_service.
01172       reactor_type* reactor = static_cast<reactor_type*>(
01173             interlocked_compare_exchange_pointer(
01174               reinterpret_cast<void**>(&reactor_), 0, 0));
01175       if (!reactor)
01176       {
01177         reactor = &(asio::use_service<reactor_type>(
01178               this->get_io_service()));
01179         interlocked_exchange_pointer(
01180             reinterpret_cast<void**>(&reactor_), reactor);
01181       }
01182 
01183       reactor->start_write_op(impl.socket_, impl.reactor_data_,
01184           null_buffers_operation<Handler>(this->get_io_service(), handler),
01185           false);
01186     }
01187   }
01188 
01189   // Receive some data from the peer. Returns the number of bytes received.
01190   template <typename MutableBufferSequence>
01191   size_t receive(implementation_type& impl,
01192       const MutableBufferSequence& buffers,
01193       socket_base::message_flags flags, asio::error_code& ec)
01194   {
01195     if (!is_open(impl))
01196     {
01197       ec = asio::error::bad_descriptor;
01198       return 0;
01199     }
01200 
01201     // Copy buffers into WSABUF array.
01202     ::WSABUF bufs[max_buffers];
01203     typename MutableBufferSequence::const_iterator iter = buffers.begin();
01204     typename MutableBufferSequence::const_iterator end = buffers.end();
01205     DWORD i = 0;
01206     size_t total_buffer_size = 0;
01207     for (; iter != end && i < max_buffers; ++iter, ++i)
01208     {
01209       asio::mutable_buffer buffer(*iter);
01210       bufs[i].len = static_cast<u_long>(asio::buffer_size(buffer));
01211       bufs[i].buf = asio::buffer_cast<char*>(buffer);
01212       total_buffer_size += asio::buffer_size(buffer);
01213     }
01214 
01215     // A request to receive 0 bytes on a stream socket is a no-op.
01216     if (impl.protocol_.type() == SOCK_STREAM && total_buffer_size == 0)
01217     {
01218       ec = asio::error_code();
01219       return 0;
01220     }
01221 
01222     // Receive some data.
01223     DWORD bytes_transferred = 0;
01224     DWORD recv_flags = flags;
01225     int result = ::WSARecv(impl.socket_, bufs, i,
01226         &bytes_transferred, &recv_flags, 0, 0);
01227     if (result != 0)
01228     {
01229       DWORD last_error = ::WSAGetLastError();
01230       if (last_error == ERROR_NETNAME_DELETED)
01231         last_error = WSAECONNRESET;
01232       else if (last_error == ERROR_PORT_UNREACHABLE)
01233         last_error = WSAECONNREFUSED;
01234       ec = asio::error_code(last_error,
01235           asio::error::get_system_category());
01236       return 0;
01237     }
01238     if (bytes_transferred == 0 && impl.protocol_.type() == SOCK_STREAM)
01239     {
01240       ec = asio::error::eof;
01241       return 0;
01242     }
01243 
01244     ec = asio::error_code();
01245     return bytes_transferred;
01246   }
01247 
01248   // Wait until data can be received without blocking.
01249   size_t receive(implementation_type& impl, const null_buffers&,
01250       socket_base::message_flags, asio::error_code& ec)
01251   {
01252     if (!is_open(impl))
01253     {
01254       ec = asio::error::bad_descriptor;
01255       return 0;
01256     }
01257 
01258     // Wait for socket to become ready.
01259     socket_ops::poll_read(impl.socket_, ec);
01260 
01261     return 0;
01262   }
01263 
01264   template <typename MutableBufferSequence, typename Handler>
01265   class receive_operation
01266     : public operation
01267   {
01268   public:
01269     receive_operation(int protocol_type, win_iocp_io_service& io_service,
01270         weak_cancel_token_type cancel_token,
01271         const MutableBufferSequence& buffers, Handler handler)
01272       : operation(io_service,
01273           &receive_operation<
01274             MutableBufferSequence, Handler>::do_completion_impl,
01275           &receive_operation<
01276             MutableBufferSequence, Handler>::destroy_impl),
01277         protocol_type_(protocol_type),
01278         work_(io_service.get_io_service()),
01279         cancel_token_(cancel_token),
01280         buffers_(buffers),
01281         handler_(handler)
01282     {
01283     }
01284 
01285   private:
01286     static void do_completion_impl(operation* op,
01287         DWORD last_error, size_t bytes_transferred)
01288     {
01289       // Take ownership of the operation object.
01290       typedef receive_operation<MutableBufferSequence, Handler> op_type;
01291       op_type* handler_op(static_cast<op_type*>(op));
01292       typedef handler_alloc_traits<Handler, op_type> alloc_traits;
01293       handler_ptr<alloc_traits> ptr(handler_op->handler_, handler_op);
01294 
01295 #if defined(ASIO_ENABLE_BUFFER_DEBUGGING)
01296       // Check whether buffers are still valid.
01297       typename MutableBufferSequence::const_iterator iter
01298         = handler_op->buffers_.begin();
01299       typename MutableBufferSequence::const_iterator end
01300         = handler_op->buffers_.end();
01301       while (iter != end)
01302       {
01303         asio::mutable_buffer buffer(*iter);
01304         asio::buffer_cast<char*>(buffer);
01305         ++iter;
01306       }
01307 #endif // defined(ASIO_ENABLE_BUFFER_DEBUGGING)
01308 
01309       // Map non-portable errors to their portable counterparts.
01310       asio::error_code ec(last_error,
01311           asio::error::get_system_category());
01312       if (ec.value() == ERROR_NETNAME_DELETED)
01313       {
01314         if (handler_op->cancel_token_.expired())
01315           ec = asio::error::operation_aborted;
01316         else
01317           ec = asio::error::connection_reset;
01318       }
01319       else if (ec.value() == ERROR_PORT_UNREACHABLE)
01320       {
01321         ec = asio::error::connection_refused;
01322       }
01323 
01324       // Check for connection closed.
01325       else if (!ec && bytes_transferred == 0
01326           && handler_op->protocol_type_ == SOCK_STREAM
01327           && !boost::is_same<MutableBufferSequence, null_buffers>::value)
01328       {
01329         ec = asio::error::eof;
01330       }
01331 
01332       // Make a copy of the handler so that the memory can be deallocated before
01333       // the upcall is made.
01334       Handler handler(handler_op->handler_);
01335 
01336       // Free the memory associated with the handler.
01337       ptr.reset();
01338 
01339       // Call the handler.
01340       asio_handler_invoke_helpers::invoke(
01341           detail::bind_handler(handler, ec, bytes_transferred), &handler);
01342     }
01343 
01344     static void destroy_impl(operation* op)
01345     {
01346       // Take ownership of the operation object.
01347       typedef receive_operation<MutableBufferSequence, Handler> op_type;
01348       op_type* handler_op(static_cast<op_type*>(op));
01349       typedef handler_alloc_traits<Handler, op_type> alloc_traits;
01350       handler_ptr<alloc_traits> ptr(handler_op->handler_, handler_op);
01351 
01352       // A sub-object of the handler may be the true owner of the memory
01353       // associated with the handler. Consequently, a local copy of the handler
01354       // is required to ensure that any owning sub-object remains valid until
01355       // after we have deallocated the memory here.
01356       Handler handler(handler_op->handler_);
01357       (void)handler;
01358 
01359       // Free the memory associated with the handler.
01360       ptr.reset();
01361     }
01362 
01363     int protocol_type_;
01364     asio::io_service::work work_;
01365     weak_cancel_token_type cancel_token_;
01366     MutableBufferSequence buffers_;
01367     Handler handler_;
01368   };
01369 
01370   // Start an asynchronous receive. The buffer for the data being received
01371   // must be valid for the lifetime of the asynchronous operation.
01372   template <typename MutableBufferSequence, typename Handler>
01373   void async_receive(implementation_type& impl,
01374       const MutableBufferSequence& buffers,
01375       socket_base::message_flags flags, Handler handler)
01376   {
01377     if (!is_open(impl))
01378     {
01379       this->get_io_service().post(bind_handler(handler,
01380             asio::error::bad_descriptor, 0));
01381       return;
01382     }
01383 
01384 #if defined(ASIO_ENABLE_CANCELIO)
01385     // Update the ID of the thread from which cancellation is safe.
01386     if (impl.safe_cancellation_thread_id_ == 0)
01387       impl.safe_cancellation_thread_id_ = ::GetCurrentThreadId();
01388     else if (impl.safe_cancellation_thread_id_ != ::GetCurrentThreadId())
01389       impl.safe_cancellation_thread_id_ = ~DWORD(0);
01390 #endif // defined(ASIO_ENABLE_CANCELIO)
01391 
01392     // Allocate and construct an operation to wrap the handler.
01393     typedef receive_operation<MutableBufferSequence, Handler> value_type;
01394     typedef handler_alloc_traits<Handler, value_type> alloc_traits;
01395     raw_handler_ptr<alloc_traits> raw_ptr(handler);
01396     int protocol_type = impl.protocol_.type();
01397     handler_ptr<alloc_traits> ptr(raw_ptr, protocol_type,
01398         iocp_service_, impl.cancel_token_, buffers, handler);
01399 
01400     // Copy buffers into WSABUF array.
01401     ::WSABUF bufs[max_buffers];
01402     typename MutableBufferSequence::const_iterator iter = buffers.begin();
01403     typename MutableBufferSequence::const_iterator end = buffers.end();
01404     DWORD i = 0;
01405     size_t total_buffer_size = 0;
01406     for (; iter != end && i < max_buffers; ++iter, ++i)
01407     {
01408       asio::mutable_buffer buffer(*iter);
01409       bufs[i].len = static_cast<u_long>(asio::buffer_size(buffer));
01410       bufs[i].buf = asio::buffer_cast<char*>(buffer);
01411       total_buffer_size += asio::buffer_size(buffer);
01412     }
01413 
01414     // A request to receive 0 bytes on a stream socket is a no-op.
01415     if (impl.protocol_.type() == SOCK_STREAM && total_buffer_size == 0)
01416     {
01417       asio::io_service::work work(this->get_io_service());
01418       ptr.reset();
01419       asio::error_code error;
01420       iocp_service_.post(bind_handler(handler, error, 0));
01421       return;
01422     }
01423 
01424     // Receive some data.
01425     DWORD bytes_transferred = 0;
01426     DWORD recv_flags = flags;
01427     int result = ::WSARecv(impl.socket_, bufs, i,
01428         &bytes_transferred, &recv_flags, ptr.get(), 0);
01429     DWORD last_error = ::WSAGetLastError();
01430     if (result != 0 && last_error != WSA_IO_PENDING)
01431     {
01432       asio::io_service::work work(this->get_io_service());
01433       ptr.reset();
01434       asio::error_code ec(last_error,
01435           asio::error::get_system_category());
01436       iocp_service_.post(bind_handler(handler, ec, bytes_transferred));
01437     }
01438     else
01439     {
01440       ptr.release();
01441     }
01442   }
01443 
01444   // Wait until data can be received without blocking.
01445   template <typename Handler>
01446   void async_receive(implementation_type& impl, const null_buffers& buffers,
01447       socket_base::message_flags flags, Handler handler)
01448   {
01449     if (!is_open(impl))
01450     {
01451       this->get_io_service().post(bind_handler(handler,
01452             asio::error::bad_descriptor, 0));
01453     }
01454     else if (impl.protocol_.type() == SOCK_STREAM)
01455     {
01456       // For stream sockets on Windows, we may issue a 0-byte overlapped
01457       // WSARecv to wait until there is data available on the socket.
01458 
01459 #if defined(ASIO_ENABLE_CANCELIO)
01460       // Update the ID of the thread from which cancellation is safe.
01461       if (impl.safe_cancellation_thread_id_ == 0)
01462         impl.safe_cancellation_thread_id_ = ::GetCurrentThreadId();
01463       else if (impl.safe_cancellation_thread_id_ != ::GetCurrentThreadId())
01464         impl.safe_cancellation_thread_id_ = ~DWORD(0);
01465 #endif // defined(ASIO_ENABLE_CANCELIO)
01466 
01467       // Allocate and construct an operation to wrap the handler.
01468       typedef receive_operation<null_buffers, Handler> value_type;
01469       typedef handler_alloc_traits<Handler, value_type> alloc_traits;
01470       raw_handler_ptr<alloc_traits> raw_ptr(handler);
01471       int protocol_type = impl.protocol_.type();
01472       handler_ptr<alloc_traits> ptr(raw_ptr, protocol_type,
01473           iocp_service_, impl.cancel_token_, buffers, handler);
01474 
01475       // Issue a receive operation with an empty buffer.
01476       ::WSABUF buf = { 0, 0 };
01477       DWORD bytes_transferred = 0;
01478       DWORD recv_flags = flags;
01479       int result = ::WSARecv(impl.socket_, &buf, 1,
01480           &bytes_transferred, &recv_flags, ptr.get(), 0);
01481       DWORD last_error = ::WSAGetLastError();
01482       if (result != 0 && last_error != WSA_IO_PENDING)
01483       {
01484         asio::io_service::work work(this->get_io_service());
01485         ptr.reset();
01486         asio::error_code ec(last_error,
01487             asio::error::get_system_category());
01488         iocp_service_.post(bind_handler(handler, ec, bytes_transferred));
01489       }
01490       else
01491       {
01492         ptr.release();
01493       }
01494     }
01495     else
01496     {
01497       // Check if the reactor was already obtained from the io_service.
01498       reactor_type* reactor = static_cast<reactor_type*>(
01499             interlocked_compare_exchange_pointer(
01500               reinterpret_cast<void**>(&reactor_), 0, 0));
01501       if (!reactor)
01502       {
01503         reactor = &(asio::use_service<reactor_type>(
01504               this->get_io_service()));
01505         interlocked_exchange_pointer(
01506             reinterpret_cast<void**>(&reactor_), reactor);
01507       }
01508 
01509       if (flags & socket_base::message_out_of_band)
01510       {
01511         reactor->start_except_op(impl.socket_, impl.reactor_data_,
01512             null_buffers_operation<Handler>(this->get_io_service(), handler));
01513       }
01514       else
01515       {
01516         reactor->start_read_op(impl.socket_, impl.reactor_data_,
01517             null_buffers_operation<Handler>(this->get_io_service(), handler),
01518             false);
01519       }
01520     }
01521   }
01522 
01523   // Receive a datagram with the endpoint of the sender. Returns the number of
01524   // bytes received.
01525   template <typename MutableBufferSequence>
01526   size_t receive_from(implementation_type& impl,
01527       const MutableBufferSequence& buffers,
01528       endpoint_type& sender_endpoint, socket_base::message_flags flags,
01529       asio::error_code& ec)
01530   {
01531     if (!is_open(impl))
01532     {
01533       ec = asio::error::bad_descriptor;
01534       return 0;
01535     }
01536 
01537     // Copy buffers into WSABUF array.
01538     ::WSABUF bufs[max_buffers];
01539     typename MutableBufferSequence::const_iterator iter = buffers.begin();
01540     typename MutableBufferSequence::const_iterator end = buffers.end();
01541     DWORD i = 0;
01542     for (; iter != end && i < max_buffers; ++iter, ++i)
01543     {
01544       asio::mutable_buffer buffer(*iter);
01545       bufs[i].len = static_cast<u_long>(asio::buffer_size(buffer));
01546       bufs[i].buf = asio::buffer_cast<char*>(buffer);
01547     }
01548 
01549     // Receive some data.
01550     DWORD bytes_transferred = 0;
01551     DWORD recv_flags = flags;
01552     int endpoint_size = static_cast<int>(sender_endpoint.capacity());
01553     int result = ::WSARecvFrom(impl.socket_, bufs, i, &bytes_transferred,
01554         &recv_flags, sender_endpoint.data(), &endpoint_size, 0, 0);
01555     if (result != 0)
01556     {
01557       DWORD last_error = ::WSAGetLastError();
01558       if (last_error == ERROR_PORT_UNREACHABLE)
01559         last_error = WSAECONNREFUSED;
01560       ec = asio::error_code(last_error,
01561           asio::error::get_system_category());
01562       return 0;
01563     }
01564     if (bytes_transferred == 0 && impl.protocol_.type() == SOCK_STREAM)
01565     {
01566       ec = asio::error::eof;
01567       return 0;
01568     }
01569 
01570     sender_endpoint.resize(static_cast<std::size_t>(endpoint_size));
01571 
01572     ec = asio::error_code();
01573     return bytes_transferred;
01574   }
01575 
01576   // Wait until data can be received without blocking.
01577   size_t receive_from(implementation_type& impl,
01578       const null_buffers&, endpoint_type& sender_endpoint,
01579       socket_base::message_flags, asio::error_code& ec)
01580   {
01581     if (!is_open(impl))
01582     {
01583       ec = asio::error::bad_descriptor;
01584       return 0;
01585     }
01586 
01587     // Wait for socket to become ready.
01588     socket_ops::poll_read(impl.socket_, ec);
01589 
01590     // Reset endpoint since it can be given no sensible value at this time.
01591     sender_endpoint = endpoint_type();
01592 
01593     return 0;
01594   }
01595 
01596   template <typename MutableBufferSequence, typename Handler>
01597   class receive_from_operation
01598     : public operation
01599   {
01600   public:
01601     receive_from_operation(int protocol_type, win_iocp_io_service& io_service,
01602         endpoint_type& endpoint, const MutableBufferSequence& buffers,
01603         Handler handler)
01604       : operation(io_service,
01605           &receive_from_operation<
01606             MutableBufferSequence, Handler>::do_completion_impl,
01607           &receive_from_operation<
01608             MutableBufferSequence, Handler>::destroy_impl),
01609         protocol_type_(protocol_type),
01610         endpoint_(endpoint),
01611         endpoint_size_(static_cast<int>(endpoint.capacity())),
01612         work_(io_service.get_io_service()),
01613         buffers_(buffers),
01614         handler_(handler)
01615     {
01616     }
01617 
01618     int& endpoint_size()
01619     {
01620       return endpoint_size_;
01621     }
01622 
01623   private:
01624     static void do_completion_impl(operation* op,
01625         DWORD last_error, size_t bytes_transferred)
01626     {
01627       // Take ownership of the operation object.
01628       typedef receive_from_operation<MutableBufferSequence, Handler> op_type;
01629       op_type* handler_op(static_cast<op_type*>(op));
01630       typedef handler_alloc_traits<Handler, op_type> alloc_traits;
01631       handler_ptr<alloc_traits> ptr(handler_op->handler_, handler_op);
01632 
01633 #if defined(ASIO_ENABLE_BUFFER_DEBUGGING)
01634       // Check whether buffers are still valid.
01635       typename MutableBufferSequence::const_iterator iter
01636         = handler_op->buffers_.begin();
01637       typename MutableBufferSequence::const_iterator end
01638         = handler_op->buffers_.end();
01639       while (iter != end)
01640       {
01641         asio::mutable_buffer buffer(*iter);
01642         asio::buffer_cast<char*>(buffer);
01643         ++iter;
01644       }
01645 #endif // defined(ASIO_ENABLE_BUFFER_DEBUGGING)
01646 
01647       // Map non-portable errors to their portable counterparts.
01648       asio::error_code ec(last_error,
01649           asio::error::get_system_category());
01650       if (ec.value() == ERROR_PORT_UNREACHABLE)
01651       {
01652         ec = asio::error::connection_refused;
01653       }
01654 
01655       // Check for connection closed.
01656       if (!ec && bytes_transferred == 0
01657           && handler_op->protocol_type_ == SOCK_STREAM)
01658       {
01659         ec = asio::error::eof;
01660       }
01661 
01662       // Record the size of the endpoint returned by the operation.
01663       handler_op->endpoint_.resize(handler_op->endpoint_size_);
01664 
01665       // Make a copy of the handler so that the memory can be deallocated before
01666       // the upcall is made.
01667       Handler handler(handler_op->handler_);
01668 
01669       // Free the memory associated with the handler.
01670       ptr.reset();
01671 
01672       // Call the handler.
01673       asio_handler_invoke_helpers::invoke(
01674           detail::bind_handler(handler, ec, bytes_transferred), &handler);
01675     }
01676 
01677     static void destroy_impl(operation* op)
01678     {
01679       // Take ownership of the operation object.
01680       typedef receive_from_operation<MutableBufferSequence, Handler> op_type;
01681       op_type* handler_op(static_cast<op_type*>(op));
01682       typedef handler_alloc_traits<Handler, op_type> alloc_traits;
01683       handler_ptr<alloc_traits> ptr(handler_op->handler_, handler_op);
01684 
01685       // A sub-object of the handler may be the true owner of the memory
01686       // associated with the handler. Consequently, a local copy of the handler
01687       // is required to ensure that any owning sub-object remains valid until
01688       // after we have deallocated the memory here.
01689       Handler handler(handler_op->handler_);
01690       (void)handler;
01691 
01692       // Free the memory associated with the handler.
01693       ptr.reset();
01694     }
01695 
01696     int protocol_type_;
01697     endpoint_type& endpoint_;
01698     int endpoint_size_;
01699     asio::io_service::work work_;
01700     MutableBufferSequence buffers_;
01701     Handler handler_;
01702   };
01703 
01704   // Start an asynchronous receive. The buffer for the data being received and
01705   // the sender_endpoint object must both be valid for the lifetime of the
01706   // asynchronous operation.
01707   template <typename MutableBufferSequence, typename Handler>
01708   void async_receive_from(implementation_type& impl,
01709       const MutableBufferSequence& buffers, endpoint_type& sender_endp,
01710       socket_base::message_flags flags, Handler handler)
01711   {
01712     if (!is_open(impl))
01713     {
01714       this->get_io_service().post(bind_handler(handler,
01715             asio::error::bad_descriptor, 0));
01716       return;
01717     }
01718 
01719 #if defined(ASIO_ENABLE_CANCELIO)
01720     // Update the ID of the thread from which cancellation is safe.
01721     if (impl.safe_cancellation_thread_id_ == 0)
01722       impl.safe_cancellation_thread_id_ = ::GetCurrentThreadId();
01723     else if (impl.safe_cancellation_thread_id_ != ::GetCurrentThreadId())
01724       impl.safe_cancellation_thread_id_ = ~DWORD(0);
01725 #endif // defined(ASIO_ENABLE_CANCELIO)
01726 
01727     // Allocate and construct an operation to wrap the handler.
01728     typedef receive_from_operation<MutableBufferSequence, Handler> value_type;
01729     typedef handler_alloc_traits<Handler, value_type> alloc_traits;
01730     raw_handler_ptr<alloc_traits> raw_ptr(handler);
01731     int protocol_type = impl.protocol_.type();
01732     handler_ptr<alloc_traits> ptr(raw_ptr, protocol_type,
01733         iocp_service_, sender_endp, buffers, handler);
01734 
01735     // Copy buffers into WSABUF array.
01736     ::WSABUF bufs[max_buffers];
01737     typename MutableBufferSequence::const_iterator iter = buffers.begin();
01738     typename MutableBufferSequence::const_iterator end = buffers.end();
01739     DWORD i = 0;
01740     for (; iter != end && i < max_buffers; ++iter, ++i)
01741     {
01742       asio::mutable_buffer buffer(*iter);
01743       bufs[i].len = static_cast<u_long>(asio::buffer_size(buffer));
01744       bufs[i].buf = asio::buffer_cast<char*>(buffer);
01745     }
01746 
01747     // Receive some data.
01748     DWORD bytes_transferred = 0;
01749     DWORD recv_flags = flags;
01750     int result = ::WSARecvFrom(impl.socket_, bufs, i, &bytes_transferred,
01751         &recv_flags, sender_endp.data(), &ptr.get()->endpoint_size(),
01752         ptr.get(), 0);
01753     DWORD last_error = ::WSAGetLastError();
01754     if (result != 0 && last_error != WSA_IO_PENDING)
01755     {
01756       asio::io_service::work work(this->get_io_service());
01757       ptr.reset();
01758       asio::error_code ec(last_error,
01759           asio::error::get_system_category());
01760       iocp_service_.post(bind_handler(handler, ec, bytes_transferred));
01761     }
01762     else
01763     {
01764       ptr.release();
01765     }
01766   }
01767 
01768   // Wait until data can be received without blocking.
01769   template <typename Handler>
01770   void async_receive_from(implementation_type& impl,
01771       const null_buffers&, endpoint_type& sender_endpoint,
01772       socket_base::message_flags flags, Handler handler)
01773   {
01774     if (!is_open(impl))
01775     {
01776       this->get_io_service().post(bind_handler(handler,
01777             asio::error::bad_descriptor, 0));
01778     }
01779     else
01780     {
01781       // Check if the reactor was already obtained from the io_service.
01782       reactor_type* reactor = static_cast<reactor_type*>(
01783             interlocked_compare_exchange_pointer(
01784               reinterpret_cast<void**>(&reactor_), 0, 0));
01785       if (!reactor)
01786       {
01787         reactor = &(asio::use_service<reactor_type>(
01788               this->get_io_service()));
01789         interlocked_exchange_pointer(
01790             reinterpret_cast<void**>(&reactor_), reactor);
01791       }
01792 
01793       // Reset endpoint since it can be given no sensible value at this time.
01794       sender_endpoint = endpoint_type();
01795 
01796       if (flags & socket_base::message_out_of_band)
01797       {
01798         reactor->start_except_op(impl.socket_, impl.reactor_data_,
01799             null_buffers_operation<Handler>(this->get_io_service(), handler));
01800       }
01801       else
01802       {
01803         reactor->start_read_op(impl.socket_, impl.reactor_data_,
01804             null_buffers_operation<Handler>(this->get_io_service(), handler),
01805             false);
01806       }
01807     }
01808   }
01809 
01810   // Accept a new connection.
01811   template <typename Socket>
01812   asio::error_code accept(implementation_type& impl, Socket& peer,
01813       endpoint_type* peer_endpoint, asio::error_code& ec)
01814   {
01815     if (!is_open(impl))
01816     {
01817       ec = asio::error::bad_descriptor;
01818       return ec;
01819     }
01820 
01821     // We cannot accept a socket that is already open.
01822     if (peer.is_open())
01823     {
01824       ec = asio::error::already_open;
01825       return ec;
01826     }
01827 
01828     for (;;)
01829     {
01830       socket_holder new_socket;
01831       std::size_t addr_len = 0;
01832       if (peer_endpoint)
01833       {
01834         addr_len = peer_endpoint->capacity();
01835         new_socket.reset(socket_ops::accept(impl.socket_,
01836               peer_endpoint->data(), &addr_len, ec));
01837       }
01838       else
01839       {
01840         new_socket.reset(socket_ops::accept(impl.socket_, 0, 0, ec));
01841       }
01842 
01843       if (ec)
01844       {
01845         if (ec == asio::error::connection_aborted
01846             && !(impl.flags_ & implementation_type::enable_connection_aborted))
01847         {
01848           // Retry accept operation.
01849           continue;
01850         }
01851         else
01852         {
01853           return ec;
01854         }
01855       }
01856 
01857       if (peer_endpoint)
01858         peer_endpoint->resize(addr_len);
01859 
01860       peer.assign(impl.protocol_, new_socket.get(), ec);
01861       if (!ec)
01862         new_socket.release();
01863       return ec;
01864     }
01865   }
01866 
01867   template <typename Socket, typename Handler>
01868   class accept_operation
01869     : public operation
01870   {
01871   public:
01872     accept_operation(win_iocp_io_service& io_service,
01873         socket_type socket, socket_type new_socket, Socket& peer,
01874         const protocol_type& protocol, endpoint_type* peer_endpoint,
01875         bool enable_connection_aborted, Handler handler)
01876       : operation(io_service,
01877           &accept_operation<Socket, Handler>::do_completion_impl,
01878           &accept_operation<Socket, Handler>::destroy_impl),
01879         io_service_(io_service),
01880         socket_(socket),
01881         new_socket_(new_socket),
01882         peer_(peer),
01883         protocol_(protocol),
01884         peer_endpoint_(peer_endpoint),
01885         work_(io_service.get_io_service()),
01886         enable_connection_aborted_(enable_connection_aborted),
01887         handler_(handler)
01888     {
01889     }
01890 
01891     socket_type new_socket()
01892     {
01893       return new_socket_.get();
01894     }
01895 
01896     void* output_buffer()
01897     {
01898       return output_buffer_;
01899     }
01900 
01901     DWORD address_length()
01902     {
01903       return sizeof(sockaddr_storage_type) + 16;
01904     }
01905 
01906   private:
01907     static void do_completion_impl(operation* op, DWORD last_error, size_t)
01908     {
01909       // Take ownership of the operation object.
01910       typedef accept_operation<Socket, Handler> op_type;
01911       op_type* handler_op(static_cast<op_type*>(op));
01912       typedef handler_alloc_traits<Handler, op_type> alloc_traits;
01913       handler_ptr<alloc_traits> ptr(handler_op->handler_, handler_op);
01914 
01915       // Map Windows error ERROR_NETNAME_DELETED to connection_aborted.
01916       if (last_error == ERROR_NETNAME_DELETED)
01917       {
01918         last_error = WSAECONNABORTED;
01919       }
01920 
01921       // Restart the accept operation if we got the connection_aborted error
01922       // and the enable_connection_aborted socket option is not set.
01923       if (last_error == WSAECONNABORTED
01924           && !ptr.get()->enable_connection_aborted_)
01925       {
01926         // Reset OVERLAPPED structure.
01927         ptr.get()->Internal = 0;
01928         ptr.get()->InternalHigh = 0;
01929         ptr.get()->Offset = 0;
01930         ptr.get()->OffsetHigh = 0;
01931         ptr.get()->hEvent = 0;
01932 
01933         // Create a new socket for the next connection, since the AcceptEx call
01934         // fails with WSAEINVAL if we try to reuse the same socket.
01935         asio::error_code ec;
01936         ptr.get()->new_socket_.reset();
01937         ptr.get()->new_socket_.reset(socket_ops::socket(
01938               ptr.get()->protocol_.family(), ptr.get()->protocol_.type(),
01939               ptr.get()->protocol_.protocol(), ec));
01940         if (ptr.get()->new_socket() != invalid_socket)
01941         {
01942           // Accept a connection.
01943           DWORD bytes_read = 0;
01944           BOOL result = ::AcceptEx(ptr.get()->socket_, ptr.get()->new_socket(),
01945               ptr.get()->output_buffer(), 0, ptr.get()->address_length(),
01946               ptr.get()->address_length(), &bytes_read, ptr.get());
01947           last_error = ::WSAGetLastError();
01948 
01949           // Check if the operation completed immediately.
01950           if (!result && last_error != WSA_IO_PENDING)
01951           {
01952             if (last_error == ERROR_NETNAME_DELETED
01953                 || last_error == WSAECONNABORTED)
01954             {
01955               // Post this handler so that operation will be restarted again.
01956               ptr.get()->io_service_.post_completion(ptr.get(), last_error, 0);
01957               ptr.release();
01958               return;
01959             }
01960             else
01961             {
01962               // Operation already complete. Continue with rest of this handler.
01963             }
01964           }
01965           else
01966           {
01967             // Asynchronous operation has been successfully restarted.
01968             ptr.release();
01969             return;
01970           }
01971         }
01972       }
01973 
01974       // Get the address of the peer.
01975       endpoint_type peer_endpoint;
01976       if (last_error == 0)
01977       {
01978         LPSOCKADDR local_addr = 0;
01979         int local_addr_length = 0;
01980         LPSOCKADDR remote_addr = 0;
01981         int remote_addr_length = 0;
01982         GetAcceptExSockaddrs(handler_op->output_buffer(), 0,
01983             handler_op->address_length(), handler_op->address_length(),
01984             &local_addr, &local_addr_length, &remote_addr, &remote_addr_length);
01985         if (static_cast<std::size_t>(remote_addr_length)
01986             > peer_endpoint.capacity())
01987         {
01988           last_error = WSAEINVAL;
01989         }
01990         else
01991         {
01992           using namespace std; // For memcpy.
01993           memcpy(peer_endpoint.data(), remote_addr, remote_addr_length);
01994           peer_endpoint.resize(static_cast<std::size_t>(remote_addr_length));
01995         }
01996       }
01997 
01998       // Need to set the SO_UPDATE_ACCEPT_CONTEXT option so that getsockname
01999       // and getpeername will work on the accepted socket.
02000       if (last_error == 0)
02001       {
02002         SOCKET update_ctx_param = handler_op->socket_;
02003         asio::error_code ec;
02004         if (socket_ops::setsockopt(handler_op->new_socket_.get(),
02005               SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT,
02006               &update_ctx_param, sizeof(SOCKET), ec) != 0)
02007         {
02008           last_error = ec.value();
02009         }
02010       }
02011 
02012       // If the socket was successfully accepted, transfer ownership of the
02013       // socket to the peer object.
02014       if (last_error == 0)
02015       {
02016         asio::error_code ec;
02017         handler_op->peer_.assign(handler_op->protocol_,
02018             native_type(handler_op->new_socket_.get(), peer_endpoint), ec);
02019         if (ec)
02020           last_error = ec.value();
02021         else
02022           handler_op->new_socket_.release();
02023       }
02024 
02025       // Pass endpoint back to caller.
02026       if (handler_op->peer_endpoint_)
02027         *handler_op->peer_endpoint_ = peer_endpoint;
02028 
02029       // Make a copy of the handler so that the memory can be deallocated before
02030       // the upcall is made.
02031       Handler handler(handler_op->handler_);
02032 
02033       // Free the memory associated with the handler.
02034       ptr.reset();
02035 
02036       // Call the handler.
02037       asio::error_code ec(last_error,
02038           asio::error::get_system_category());
02039       asio_handler_invoke_helpers::invoke(
02040           detail::bind_handler(handler, ec), &handler);
02041     }
02042 
02043     static void destroy_impl(operation* op)
02044     {
02045       // Take ownership of the operation object.
02046       typedef accept_operation<Socket, Handler> op_type;
02047       op_type* handler_op(static_cast<op_type*>(op));
02048       typedef handler_alloc_traits<Handler, op_type> alloc_traits;
02049       handler_ptr<alloc_traits> ptr(handler_op->handler_, handler_op);
02050 
02051       // A sub-object of the handler may be the true owner of the memory
02052       // associated with the handler. Consequently, a local copy of the handler
02053       // is required to ensure that any owning sub-object remains valid until
02054       // after we have deallocated the memory here.
02055       Handler handler(handler_op->handler_);
02056       (void)handler;
02057 
02058       // Free the memory associated with the handler.
02059       ptr.reset();
02060     }
02061 
02062     win_iocp_io_service& io_service_;
02063     socket_type socket_;
02064     socket_holder new_socket_;
02065     Socket& peer_;
02066     protocol_type protocol_;
02067     endpoint_type* peer_endpoint_;
02068     asio::io_service::work work_;
02069     unsigned char output_buffer_[(sizeof(sockaddr_storage_type) + 16) * 2];
02070     bool enable_connection_aborted_;
02071     Handler handler_;
02072   };
02073 
02074   // Start an asynchronous accept. The peer and peer_endpoint objects
02075   // must be valid until the accept's handler is invoked.
02076   template <typename Socket, typename Handler>
02077   void async_accept(implementation_type& impl, Socket& peer,
02078       endpoint_type* peer_endpoint, Handler handler)
02079   {
02080     // Check whether acceptor has been initialised.
02081     if (!is_open(impl))
02082     {
02083       this->get_io_service().post(bind_handler(handler,
02084             asio::error::bad_descriptor));
02085       return;
02086     }
02087 
02088     // Check that peer socket has not already been opened.
02089     if (peer.is_open())
02090     {
02091       this->get_io_service().post(bind_handler(handler,
02092             asio::error::already_open));
02093       return;
02094     }
02095 
02096 #if defined(ASIO_ENABLE_CANCELIO)
02097     // Update the ID of the thread from which cancellation is safe.
02098     if (impl.safe_cancellation_thread_id_ == 0)
02099       impl.safe_cancellation_thread_id_ = ::GetCurrentThreadId();
02100     else if (impl.safe_cancellation_thread_id_ != ::GetCurrentThreadId())
02101       impl.safe_cancellation_thread_id_ = ~DWORD(0);
02102 #endif // defined(ASIO_ENABLE_CANCELIO)
02103 
02104     // Create a new socket for the connection.
02105     asio::error_code ec;
02106     socket_holder sock(socket_ops::socket(impl.protocol_.family(),
02107           impl.protocol_.type(), impl.protocol_.protocol(), ec));
02108     if (sock.get() == invalid_socket)
02109     {
02110       this->get_io_service().post(bind_handler(handler, ec));
02111       return;
02112     }
02113 
02114     // Allocate and construct an operation to wrap the handler.
02115     typedef accept_operation<Socket, Handler> value_type;
02116     typedef handler_alloc_traits<Handler, value_type> alloc_traits;
02117     raw_handler_ptr<alloc_traits> raw_ptr(handler);
02118     socket_type new_socket = sock.get();
02119     bool enable_connection_aborted =
02120       (impl.flags_ & implementation_type::enable_connection_aborted);
02121     handler_ptr<alloc_traits> ptr(raw_ptr,
02122         iocp_service_, impl.socket_, new_socket, peer, impl.protocol_,
02123         peer_endpoint, enable_connection_aborted, handler);
02124     sock.release();
02125 
02126     // Accept a connection.
02127     DWORD bytes_read = 0;
02128     BOOL result = ::AcceptEx(impl.socket_, ptr.get()->new_socket(),
02129         ptr.get()->output_buffer(), 0, ptr.get()->address_length(),
02130         ptr.get()->address_length(), &bytes_read, ptr.get());
02131     DWORD last_error = ::WSAGetLastError();
02132 
02133     // Check if the operation completed immediately.
02134     if (!result && last_error != WSA_IO_PENDING)
02135     {
02136       if (!enable_connection_aborted
02137           && (last_error == ERROR_NETNAME_DELETED
02138             || last_error == WSAECONNABORTED))
02139       {
02140         // Post handler so that operation will be restarted again. We do not
02141         // perform the AcceptEx again here to avoid the possibility of starving
02142         // other handlers.
02143         iocp_service_.post_completion(ptr.get(), last_error, 0);
02144         ptr.release();
02145       }
02146       else
02147       {
02148         asio::io_service::work work(this->get_io_service());
02149         ptr.reset();
02150         asio::error_code ec(last_error,
02151             asio::error::get_system_category());
02152         iocp_service_.post(bind_handler(handler, ec));
02153       }
02154     }
02155     else
02156     {
02157       ptr.release();
02158     }
02159   }
02160 
02161   // Connect the socket to the specified endpoint.
02162   asio::error_code connect(implementation_type& impl,
02163       const endpoint_type& peer_endpoint, asio::error_code& ec)
02164   {
02165     if (!is_open(impl))
02166     {
02167       ec = asio::error::bad_descriptor;
02168       return ec;
02169     }
02170 
02171     // Perform the connect operation.
02172     socket_ops::connect(impl.socket_,
02173         peer_endpoint.data(), peer_endpoint.size(), ec);
02174     return ec;
02175   }
02176 
02177   template <typename Handler>
02178   class connect_operation
02179   {
02180   public:
02181     connect_operation(socket_type socket, bool user_set_non_blocking,
02182         asio::io_service& io_service, Handler handler)
02183       : socket_(socket),
02184         user_set_non_blocking_(user_set_non_blocking),
02185         io_service_(io_service),
02186         work_(io_service),
02187         handler_(handler)
02188     {
02189     }
02190 
02191     bool perform(asio::error_code& ec,
02192         std::size_t& bytes_transferred)
02193     {
02194       // Check whether the operation was successful.
02195       if (ec)
02196         return true;
02197 
02198       // Get the error code from the connect operation.
02199       int connect_error = 0;
02200       size_t connect_error_len = sizeof(connect_error);
02201       if (socket_ops::getsockopt(socket_, SOL_SOCKET, SO_ERROR,
02202             &connect_error, &connect_error_len, ec) == socket_error_retval)
02203         return true;
02204 
02205       // If connection failed then post the handler with the error code.
02206       if (connect_error)
02207       {
02208         ec = asio::error_code(connect_error,
02209             asio::error::get_system_category());
02210         return true;
02211       }
02212 
02213       // Revert socket to blocking mode unless the user requested otherwise.
02214       if (!user_set_non_blocking_)
02215       {
02216         ioctl_arg_type non_blocking = 0;
02217         if (socket_ops::ioctl(socket_, FIONBIO, &non_blocking, ec))
02218           return true;
02219       }
02220 
02221       // Post the result of the successful connection operation.
02222       ec = asio::error_code();
02223       return true;
02224     }
02225 
02226     void complete(const asio::error_code& ec, std::size_t)
02227     {
02228       io_service_.post(bind_handler(handler_, ec));
02229     }
02230 
02231   private:
02232     socket_type socket_;
02233     bool user_set_non_blocking_;
02234     asio::io_service& io_service_;
02235     asio::io_service::work work_;
02236     Handler handler_;
02237   };
02238 
02239   // Start an asynchronous connect.
02240   template <typename Handler>
02241   void async_connect(implementation_type& impl,
02242       const endpoint_type& peer_endpoint, Handler handler)
02243   {
02244     if (!is_open(impl))
02245     {
02246       this->get_io_service().post(bind_handler(handler,
02247             asio::error::bad_descriptor));
02248       return;
02249     }
02250 
02251 #if defined(ASIO_ENABLE_CANCELIO)
02252     // Update the ID of the thread from which cancellation is safe.
02253     if (impl.safe_cancellation_thread_id_ == 0)
02254       impl.safe_cancellation_thread_id_ = ::GetCurrentThreadId();
02255     else if (impl.safe_cancellation_thread_id_ != ::GetCurrentThreadId())
02256       impl.safe_cancellation_thread_id_ = ~DWORD(0);
02257 #endif // defined(ASIO_ENABLE_CANCELIO)
02258 
02259     // Check if the reactor was already obtained from the io_service.
02260     reactor_type* reactor = static_cast<reactor_type*>(
02261           interlocked_compare_exchange_pointer(
02262             reinterpret_cast<void**>(&reactor_), 0, 0));
02263     if (!reactor)
02264     {
02265       reactor = &(asio::use_service<reactor_type>(
02266             this->get_io_service()));
02267       interlocked_exchange_pointer(
02268           reinterpret_cast<void**>(&reactor_), reactor);
02269     }
02270 
02271     // Mark the socket as non-blocking so that the connection will take place
02272     // asynchronously.
02273     ioctl_arg_type non_blocking = 1;
02274     asio::error_code ec;
02275     if (socket_ops::ioctl(impl.socket_, FIONBIO, &non_blocking, ec))
02276     {
02277       this->get_io_service().post(bind_handler(handler, ec));
02278       return;
02279     }
02280 
02281     // Start the connect operation.
02282     if (socket_ops::connect(impl.socket_, peer_endpoint.data(),
02283           peer_endpoint.size(), ec) == 0)
02284     {
02285       // Revert socket to blocking mode unless the user requested otherwise.
02286       if (!(impl.flags_ & implementation_type::user_set_non_blocking))
02287       {
02288         non_blocking = 0;
02289         socket_ops::ioctl(impl.socket_, FIONBIO, &non_blocking, ec);
02290       }
02291 
02292       // The connect operation has finished successfully so we need to post the
02293       // handler immediately.
02294       this->get_io_service().post(bind_handler(handler, ec));
02295     }
02296     else if (ec == asio::error::in_progress
02297         || ec == asio::error::would_block)
02298     {
02299       // The connection is happening in the background, and we need to wait
02300       // until the socket becomes writeable.
02301       boost::shared_ptr<bool> completed(new bool(false));
02302       reactor->start_connect_op(impl.socket_, impl.reactor_data_,
02303           connect_operation<Handler>(
02304             impl.socket_,
02305             (impl.flags_ & implementation_type::user_set_non_blocking) != 0,
02306             this->get_io_service(), handler));
02307     }
02308     else
02309     {
02310       // Revert socket to blocking mode unless the user requested otherwise.
02311       if (!(impl.flags_ & implementation_type::user_set_non_blocking))
02312       {
02313         non_blocking = 0;
02314         asio::error_code ignored_ec;
02315         socket_ops::ioctl(impl.socket_, FIONBIO, &non_blocking, ignored_ec);
02316       }
02317 
02318       // The connect operation has failed, so post the handler immediately.
02319       this->get_io_service().post(bind_handler(handler, ec));
02320     }
02321   }
02322 
02323 private:
02324   // Helper function to close a socket when the associated object is being
02325   // destroyed.
02326   void close_for_destruction(implementation_type& impl)
02327   {
02328     if (is_open(impl))
02329     {
02330       // Check if the reactor was created, in which case we need to close the
02331       // socket on the reactor as well to cancel any operations that might be
02332       // running there.
02333       reactor_type* reactor = static_cast<reactor_type*>(
02334             interlocked_compare_exchange_pointer(
02335               reinterpret_cast<void**>(&reactor_), 0, 0));
02336       if (reactor)
02337         reactor->close_descriptor(impl.socket_, impl.reactor_data_);
02338 
02339       // The socket destructor must not block. If the user has changed the
02340       // linger option to block in the foreground, we will change it back to the
02341       // default so that the closure is performed in the background.
02342       if (impl.flags_ & implementation_type::close_might_block)
02343       {
02344         ::linger opt;
02345         opt.l_onoff = 0;
02346         opt.l_linger = 0;
02347         asio::error_code ignored_ec;
02348         socket_ops::setsockopt(impl.socket_,
02349             SOL_SOCKET, SO_LINGER, &opt, sizeof(opt), ignored_ec);
02350       }
02351 
02352       asio::error_code ignored_ec;
02353       socket_ops::close(impl.socket_, ignored_ec);
02354       impl.socket_ = invalid_socket;
02355       impl.flags_ = 0;
02356       impl.cancel_token_.reset();
02357 #if defined(ASIO_ENABLE_CANCELIO)
02358       impl.safe_cancellation_thread_id_ = 0;
02359 #endif // defined(ASIO_ENABLE_CANCELIO)
02360     }
02361   }
02362 
02363   // Helper function to emulate InterlockedCompareExchangePointer functionality
02364   // for:
02365   // - very old Platform SDKs; and
02366   // - platform SDKs where MSVC's /Wp64 option causes spurious warnings.
02367   void* interlocked_compare_exchange_pointer(void** dest, void* exch, void* cmp)
02368   {
02369 #if defined(_M_IX86)
02370     return reinterpret_cast<void*>(InterlockedCompareExchange(
02371           reinterpret_cast<PLONG>(dest), reinterpret_cast<LONG>(exch),
02372           reinterpret_cast<LONG>(cmp)));
02373 #else
02374     return InterlockedCompareExchangePointer(dest, exch, cmp);
02375 #endif
02376   }
02377 
02378   // Helper function to emulate InterlockedExchangePointer functionality for:
02379   // - very old Platform SDKs; and
02380   // - platform SDKs where MSVC's /Wp64 option causes spurious warnings.
02381   void* interlocked_exchange_pointer(void** dest, void* val)
02382   {
02383 #if defined(_M_IX86)
02384     return reinterpret_cast<void*>(InterlockedExchange(
02385           reinterpret_cast<PLONG>(dest), reinterpret_cast<LONG>(val)));
02386 #else
02387     return InterlockedExchangePointer(dest, val);
02388 #endif
02389   }
02390 
02391   // The IOCP service used for running asynchronous operations and dispatching
02392   // handlers.
02393   win_iocp_io_service& iocp_service_;
02394 
02395   // The reactor used for performing connect operations. This object is created
02396   // only if needed.
02397   reactor_type* reactor_;
02398 
02399   // Mutex to protect access to the linked list of implementations. 
02400   asio::detail::mutex mutex_;
02401 
02402   // The head of a linked list of all implementations.
02403   implementation_type* impl_list_;
02404 };
02405 
02406 } // namespace detail
02407 } // namespace asio
02408 
02409 #endif // defined(ASIO_HAS_IOCP)
02410 
02411 #include "asio/detail/pop_options.hpp"
02412 
02413 #endif // ASIO_DETAIL_WIN_IOCP_SOCKET_SERVICE_HPP
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines


Castor
Author(s): Carpe Noctem
autogenerated on Fri Nov 8 2013 11:05:40