00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011 #ifndef ASIO_DETAIL_WIN_IOCP_SOCKET_SERVICE_HPP
00012 #define ASIO_DETAIL_WIN_IOCP_SOCKET_SERVICE_HPP
00013
00014 #if defined(_MSC_VER) && (_MSC_VER >= 1200)
00015 # pragma once
00016 #endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
00017
00018 #include "asio/detail/push_options.hpp"
00019
00020 #include "asio/detail/win_iocp_io_service_fwd.hpp"
00021
00022 #if defined(ASIO_HAS_IOCP)
00023
00024 #include "asio/detail/push_options.hpp"
00025 #include <cstring>
00026 #include <boost/shared_ptr.hpp>
00027 #include <boost/type_traits/is_same.hpp>
00028 #include <boost/weak_ptr.hpp>
00029 #include "asio/detail/pop_options.hpp"
00030
00031 #include "asio/buffer.hpp"
00032 #include "asio/error.hpp"
00033 #include "asio/io_service.hpp"
00034 #include "asio/socket_base.hpp"
00035 #include "asio/detail/bind_handler.hpp"
00036 #include "asio/detail/handler_alloc_helpers.hpp"
00037 #include "asio/detail/handler_invoke_helpers.hpp"
00038 #include "asio/detail/mutex.hpp"
00039 #include "asio/detail/select_reactor.hpp"
00040 #include "asio/detail/socket_holder.hpp"
00041 #include "asio/detail/socket_ops.hpp"
00042 #include "asio/detail/socket_types.hpp"
00043 #include "asio/detail/win_iocp_io_service.hpp"
00044
00045 namespace asio {
00046 namespace detail {
00047
00048 template <typename Protocol>
00049 class win_iocp_socket_service
00050 : public asio::detail::service_base<win_iocp_socket_service<Protocol> >
00051 {
00052 public:
00053
00054 typedef Protocol protocol_type;
00055
00056
00057 typedef typename Protocol::endpoint endpoint_type;
00058
00059
00060 typedef win_iocp_io_service::operation operation;
00061
00062 struct noop_deleter { void operator()(void*) {} };
00063 typedef boost::shared_ptr<void> shared_cancel_token_type;
00064 typedef boost::weak_ptr<void> weak_cancel_token_type;
00065
00066
00067 class native_type
00068 {
00069 public:
00070 native_type(socket_type s)
00071 : socket_(s),
00072 have_remote_endpoint_(false)
00073 {
00074 }
00075
00076 native_type(socket_type s, const endpoint_type& ep)
00077 : socket_(s),
00078 have_remote_endpoint_(true),
00079 remote_endpoint_(ep)
00080 {
00081 }
00082
00083 void operator=(socket_type s)
00084 {
00085 socket_ = s;
00086 have_remote_endpoint_ = false;
00087 remote_endpoint_ = endpoint_type();
00088 }
00089
00090 operator socket_type() const
00091 {
00092 return socket_;
00093 }
00094
00095 HANDLE as_handle() const
00096 {
00097 return reinterpret_cast<HANDLE>(socket_);
00098 }
00099
00100 bool have_remote_endpoint() const
00101 {
00102 return have_remote_endpoint_;
00103 }
00104
00105 endpoint_type remote_endpoint() const
00106 {
00107 return remote_endpoint_;
00108 }
00109
00110 private:
00111 socket_type socket_;
00112 bool have_remote_endpoint_;
00113 endpoint_type remote_endpoint_;
00114 };
00115
00116
00117 typedef detail::select_reactor<true> reactor_type;
00118
00119
00120 class implementation_type
00121 {
00122 public:
00123
00124 implementation_type()
00125 : socket_(invalid_socket),
00126 flags_(0),
00127 cancel_token_(),
00128 protocol_(endpoint_type().protocol()),
00129 next_(0),
00130 prev_(0)
00131 {
00132 }
00133
00134 private:
00135
00136 friend class win_iocp_socket_service;
00137
00138
00139 native_type socket_;
00140
00141 enum
00142 {
00143 enable_connection_aborted = 1,
00144 close_might_block = 2,
00145 user_set_non_blocking = 4
00146 };
00147
00148
00149 unsigned char flags_;
00150
00151
00152
00153
00154
00155
00156
00157 shared_cancel_token_type cancel_token_;
00158
00159
00160 protocol_type protocol_;
00161
00162
00163 reactor_type::per_descriptor_data reactor_data_;
00164
00165 #if defined(ASIO_ENABLE_CANCELIO)
00166
00167
00168
00169
00170 DWORD safe_cancellation_thread_id_;
00171 #endif // defined(ASIO_ENABLE_CANCELIO)
00172
00173
00174 implementation_type* next_;
00175 implementation_type* prev_;
00176 };
00177
00178
00179 enum { max_buffers = 64 < max_iov_len ? 64 : max_iov_len };
00180
00181
00182 win_iocp_socket_service(asio::io_service& io_service)
00183 : asio::detail::service_base<
00184 win_iocp_socket_service<Protocol> >(io_service),
00185 iocp_service_(asio::use_service<win_iocp_io_service>(io_service)),
00186 reactor_(0),
00187 mutex_(),
00188 impl_list_(0)
00189 {
00190 }
00191
00192
00193 void shutdown_service()
00194 {
00195
00196 asio::detail::mutex::scoped_lock lock(mutex_);
00197 implementation_type* impl = impl_list_;
00198 while (impl)
00199 {
00200 asio::error_code ignored_ec;
00201 close_for_destruction(*impl);
00202 impl = impl->next_;
00203 }
00204 }
00205
00206
00207 void construct(implementation_type& impl)
00208 {
00209 impl.socket_ = invalid_socket;
00210 impl.flags_ = 0;
00211 impl.cancel_token_.reset();
00212 #if defined(ASIO_ENABLE_CANCELIO)
00213 impl.safe_cancellation_thread_id_ = 0;
00214 #endif // defined(ASIO_ENABLE_CANCELIO)
00215
00216
00217 asio::detail::mutex::scoped_lock lock(mutex_);
00218 impl.next_ = impl_list_;
00219 impl.prev_ = 0;
00220 if (impl_list_)
00221 impl_list_->prev_ = &impl;
00222 impl_list_ = &impl;
00223 }
00224
00225
00226 void destroy(implementation_type& impl)
00227 {
00228 close_for_destruction(impl);
00229
00230
00231 asio::detail::mutex::scoped_lock lock(mutex_);
00232 if (impl_list_ == &impl)
00233 impl_list_ = impl.next_;
00234 if (impl.prev_)
00235 impl.prev_->next_ = impl.next_;
00236 if (impl.next_)
00237 impl.next_->prev_= impl.prev_;
00238 impl.next_ = 0;
00239 impl.prev_ = 0;
00240 }
00241
00242
00243 asio::error_code open(implementation_type& impl,
00244 const protocol_type& protocol, asio::error_code& ec)
00245 {
00246 if (is_open(impl))
00247 {
00248 ec = asio::error::already_open;
00249 return ec;
00250 }
00251
00252 socket_holder sock(socket_ops::socket(protocol.family(), protocol.type(),
00253 protocol.protocol(), ec));
00254 if (sock.get() == invalid_socket)
00255 return ec;
00256
00257 HANDLE sock_as_handle = reinterpret_cast<HANDLE>(sock.get());
00258 if (iocp_service_.register_handle(sock_as_handle, ec))
00259 return ec;
00260
00261 impl.socket_ = sock.release();
00262 impl.flags_ = 0;
00263 impl.cancel_token_.reset(static_cast<void*>(0), noop_deleter());
00264 impl.protocol_ = protocol;
00265 ec = asio::error_code();
00266 return ec;
00267 }
00268
00269
00270 asio::error_code assign(implementation_type& impl,
00271 const protocol_type& protocol, const native_type& native_socket,
00272 asio::error_code& ec)
00273 {
00274 if (is_open(impl))
00275 {
00276 ec = asio::error::already_open;
00277 return ec;
00278 }
00279
00280 if (iocp_service_.register_handle(native_socket.as_handle(), ec))
00281 return ec;
00282
00283 impl.socket_ = native_socket;
00284 impl.flags_ = 0;
00285 impl.cancel_token_.reset(static_cast<void*>(0), noop_deleter());
00286 impl.protocol_ = protocol;
00287 ec = asio::error_code();
00288 return ec;
00289 }
00290
00291
00292 bool is_open(const implementation_type& impl) const
00293 {
00294 return impl.socket_ != invalid_socket;
00295 }
00296
00297
00298 asio::error_code close(implementation_type& impl,
00299 asio::error_code& ec)
00300 {
00301 if (is_open(impl))
00302 {
00303
00304
00305
00306 reactor_type* reactor = static_cast<reactor_type*>(
00307 interlocked_compare_exchange_pointer(
00308 reinterpret_cast<void**>(&reactor_), 0, 0));
00309 if (reactor)
00310 reactor->close_descriptor(impl.socket_, impl.reactor_data_);
00311
00312 if (socket_ops::close(impl.socket_, ec) == socket_error_retval)
00313 return ec;
00314
00315 impl.socket_ = invalid_socket;
00316 impl.flags_ = 0;
00317 impl.cancel_token_.reset();
00318 #if defined(ASIO_ENABLE_CANCELIO)
00319 impl.safe_cancellation_thread_id_ = 0;
00320 #endif // defined(ASIO_ENABLE_CANCELIO)
00321 }
00322
00323 ec = asio::error_code();
00324 return ec;
00325 }
00326
00327
00328 native_type native(implementation_type& impl)
00329 {
00330 return impl.socket_;
00331 }
00332
00333
00334 asio::error_code cancel(implementation_type& impl,
00335 asio::error_code& ec)
00336 {
00337 if (!is_open(impl))
00338 {
00339 ec = asio::error::bad_descriptor;
00340 return ec;
00341 }
00342 else if (FARPROC cancel_io_ex_ptr = ::GetProcAddress(
00343 ::GetModuleHandleA("KERNEL32"), "CancelIoEx"))
00344 {
00345
00346 typedef BOOL (WINAPI* cancel_io_ex_t)(HANDLE, LPOVERLAPPED);
00347 cancel_io_ex_t cancel_io_ex = (cancel_io_ex_t)cancel_io_ex_ptr;
00348 socket_type sock = impl.socket_;
00349 HANDLE sock_as_handle = reinterpret_cast<HANDLE>(sock);
00350 if (!cancel_io_ex(sock_as_handle, 0))
00351 {
00352 DWORD last_error = ::GetLastError();
00353 if (last_error == ERROR_NOT_FOUND)
00354 {
00355
00356
00357
00358 ec = asio::error_code();
00359 }
00360 else
00361 {
00362 ec = asio::error_code(last_error,
00363 asio::error::get_system_category());
00364 }
00365 }
00366 else
00367 {
00368 ec = asio::error_code();
00369 }
00370 }
00371 #if defined(ASIO_ENABLE_CANCELIO)
00372 else if (impl.safe_cancellation_thread_id_ == 0)
00373 {
00374
00375 ec = asio::error_code();
00376 }
00377 else if (impl.safe_cancellation_thread_id_ == ::GetCurrentThreadId())
00378 {
00379
00380
00381 socket_type sock = impl.socket_;
00382 HANDLE sock_as_handle = reinterpret_cast<HANDLE>(sock);
00383 if (!::CancelIo(sock_as_handle))
00384 {
00385 DWORD last_error = ::GetLastError();
00386 ec = asio::error_code(last_error,
00387 asio::error::get_system_category());
00388 }
00389 else
00390 {
00391 ec = asio::error_code();
00392 }
00393 }
00394 else
00395 {
00396
00397
00398 ec = asio::error::operation_not_supported;
00399 }
00400 #else // defined(ASIO_ENABLE_CANCELIO)
00401 else
00402 {
00403
00404 ec = asio::error::operation_not_supported;
00405 }
00406 #endif // defined(ASIO_ENABLE_CANCELIO)
00407
00408 return ec;
00409 }
00410
00411
00412 bool at_mark(const implementation_type& impl,
00413 asio::error_code& ec) const
00414 {
00415 if (!is_open(impl))
00416 {
00417 ec = asio::error::bad_descriptor;
00418 return false;
00419 }
00420
00421 asio::detail::ioctl_arg_type value = 0;
00422 socket_ops::ioctl(impl.socket_, SIOCATMARK, &value, ec);
00423 return ec ? false : value != 0;
00424 }
00425
00426
00427 std::size_t available(const implementation_type& impl,
00428 asio::error_code& ec) const
00429 {
00430 if (!is_open(impl))
00431 {
00432 ec = asio::error::bad_descriptor;
00433 return 0;
00434 }
00435
00436 asio::detail::ioctl_arg_type value = 0;
00437 socket_ops::ioctl(impl.socket_, FIONREAD, &value, ec);
00438 return ec ? static_cast<std::size_t>(0) : static_cast<std::size_t>(value);
00439 }
00440
00441
00442 asio::error_code bind(implementation_type& impl,
00443 const endpoint_type& endpoint, asio::error_code& ec)
00444 {
00445 if (!is_open(impl))
00446 {
00447 ec = asio::error::bad_descriptor;
00448 return ec;
00449 }
00450
00451 socket_ops::bind(impl.socket_, endpoint.data(), endpoint.size(), ec);
00452 return ec;
00453 }
00454
00455
00456 asio::error_code listen(implementation_type& impl, int backlog,
00457 asio::error_code& ec)
00458 {
00459 if (!is_open(impl))
00460 {
00461 ec = asio::error::bad_descriptor;
00462 return ec;
00463 }
00464
00465 socket_ops::listen(impl.socket_, backlog, ec);
00466 return ec;
00467 }
00468
00469
00470 template <typename Option>
00471 asio::error_code set_option(implementation_type& impl,
00472 const Option& option, asio::error_code& ec)
00473 {
00474 if (!is_open(impl))
00475 {
00476 ec = asio::error::bad_descriptor;
00477 return ec;
00478 }
00479
00480 if (option.level(impl.protocol_) == custom_socket_option_level
00481 && option.name(impl.protocol_) == enable_connection_aborted_option)
00482 {
00483 if (option.size(impl.protocol_) != sizeof(int))
00484 {
00485 ec = asio::error::invalid_argument;
00486 }
00487 else
00488 {
00489 if (*reinterpret_cast<const int*>(option.data(impl.protocol_)))
00490 impl.flags_ |= implementation_type::enable_connection_aborted;
00491 else
00492 impl.flags_ &= ~implementation_type::enable_connection_aborted;
00493 ec = asio::error_code();
00494 }
00495 return ec;
00496 }
00497 else
00498 {
00499 if (option.level(impl.protocol_) == SOL_SOCKET
00500 && option.name(impl.protocol_) == SO_LINGER)
00501 {
00502 const ::linger* linger_option =
00503 reinterpret_cast<const ::linger*>(option.data(impl.protocol_));
00504 if (linger_option->l_onoff != 0 && linger_option->l_linger != 0)
00505 impl.flags_ |= implementation_type::close_might_block;
00506 else
00507 impl.flags_ &= ~implementation_type::close_might_block;
00508 }
00509
00510 socket_ops::setsockopt(impl.socket_,
00511 option.level(impl.protocol_), option.name(impl.protocol_),
00512 option.data(impl.protocol_), option.size(impl.protocol_), ec);
00513 return ec;
00514 }
00515 }
00516
00517
00518 template <typename Option>
00519 asio::error_code get_option(const implementation_type& impl,
00520 Option& option, asio::error_code& ec) const
00521 {
00522 if (!is_open(impl))
00523 {
00524 ec = asio::error::bad_descriptor;
00525 return ec;
00526 }
00527
00528 if (option.level(impl.protocol_) == custom_socket_option_level
00529 && option.name(impl.protocol_) == enable_connection_aborted_option)
00530 {
00531 if (option.size(impl.protocol_) != sizeof(int))
00532 {
00533 ec = asio::error::invalid_argument;
00534 }
00535 else
00536 {
00537 int* target = reinterpret_cast<int*>(option.data(impl.protocol_));
00538 if (impl.flags_ & implementation_type::enable_connection_aborted)
00539 *target = 1;
00540 else
00541 *target = 0;
00542 option.resize(impl.protocol_, sizeof(int));
00543 ec = asio::error_code();
00544 }
00545 return ec;
00546 }
00547 else
00548 {
00549 size_t size = option.size(impl.protocol_);
00550 socket_ops::getsockopt(impl.socket_,
00551 option.level(impl.protocol_), option.name(impl.protocol_),
00552 option.data(impl.protocol_), &size, ec);
00553 if (!ec)
00554 option.resize(impl.protocol_, size);
00555 return ec;
00556 }
00557 }
00558
00559
00560 template <typename IO_Control_Command>
00561 asio::error_code io_control(implementation_type& impl,
00562 IO_Control_Command& command, asio::error_code& ec)
00563 {
00564 if (!is_open(impl))
00565 {
00566 ec = asio::error::bad_descriptor;
00567 return ec;
00568 }
00569
00570 socket_ops::ioctl(impl.socket_, command.name(),
00571 static_cast<ioctl_arg_type*>(command.data()), ec);
00572
00573 if (!ec && command.name() == static_cast<int>(FIONBIO))
00574 {
00575 if (command.get())
00576 impl.flags_ |= implementation_type::user_set_non_blocking;
00577 else
00578 impl.flags_ &= ~implementation_type::user_set_non_blocking;
00579 }
00580
00581 return ec;
00582 }
00583
00584
00585 endpoint_type local_endpoint(const implementation_type& impl,
00586 asio::error_code& ec) const
00587 {
00588 if (!is_open(impl))
00589 {
00590 ec = asio::error::bad_descriptor;
00591 return endpoint_type();
00592 }
00593
00594 endpoint_type endpoint;
00595 std::size_t addr_len = endpoint.capacity();
00596 if (socket_ops::getsockname(impl.socket_, endpoint.data(), &addr_len, ec))
00597 return endpoint_type();
00598 endpoint.resize(addr_len);
00599 return endpoint;
00600 }
00601
00602
00603 endpoint_type remote_endpoint(const implementation_type& impl,
00604 asio::error_code& ec) const
00605 {
00606 if (!is_open(impl))
00607 {
00608 ec = asio::error::bad_descriptor;
00609 return endpoint_type();
00610 }
00611
00612 if (impl.socket_.have_remote_endpoint())
00613 {
00614
00615 DWORD connect_time = 0;
00616 size_t connect_time_len = sizeof(connect_time);
00617 if (socket_ops::getsockopt(impl.socket_, SOL_SOCKET, SO_CONNECT_TIME,
00618 &connect_time, &connect_time_len, ec) == socket_error_retval)
00619 {
00620 return endpoint_type();
00621 }
00622 if (connect_time == 0xFFFFFFFF)
00623 {
00624 ec = asio::error::not_connected;
00625 return endpoint_type();
00626 }
00627
00628 ec = asio::error_code();
00629 return impl.socket_.remote_endpoint();
00630 }
00631 else
00632 {
00633 endpoint_type endpoint;
00634 std::size_t addr_len = endpoint.capacity();
00635 if (socket_ops::getpeername(impl.socket_, endpoint.data(), &addr_len, ec))
00636 return endpoint_type();
00637 endpoint.resize(addr_len);
00638 return endpoint;
00639 }
00640 }
00641
00643 asio::error_code shutdown(implementation_type& impl,
00644 socket_base::shutdown_type what, asio::error_code& ec)
00645 {
00646 if (!is_open(impl))
00647 {
00648 ec = asio::error::bad_descriptor;
00649 return ec;
00650 }
00651
00652 socket_ops::shutdown(impl.socket_, what, ec);
00653 return ec;
00654 }
00655
00656
00657 template <typename ConstBufferSequence>
00658 size_t send(implementation_type& impl, const ConstBufferSequence& buffers,
00659 socket_base::message_flags flags, asio::error_code& ec)
00660 {
00661 if (!is_open(impl))
00662 {
00663 ec = asio::error::bad_descriptor;
00664 return 0;
00665 }
00666
00667
00668 ::WSABUF bufs[max_buffers];
00669 typename ConstBufferSequence::const_iterator iter = buffers.begin();
00670 typename ConstBufferSequence::const_iterator end = buffers.end();
00671 DWORD i = 0;
00672 size_t total_buffer_size = 0;
00673 for (; iter != end && i < max_buffers; ++iter, ++i)
00674 {
00675 asio::const_buffer buffer(*iter);
00676 bufs[i].len = static_cast<u_long>(asio::buffer_size(buffer));
00677 bufs[i].buf = const_cast<char*>(
00678 asio::buffer_cast<const char*>(buffer));
00679 total_buffer_size += asio::buffer_size(buffer);
00680 }
00681
00682
00683 if (impl.protocol_.type() == SOCK_STREAM && total_buffer_size == 0)
00684 {
00685 ec = asio::error_code();
00686 return 0;
00687 }
00688
00689
00690 DWORD bytes_transferred = 0;
00691 int result = ::WSASend(impl.socket_, bufs,
00692 i, &bytes_transferred, flags, 0, 0);
00693 if (result != 0)
00694 {
00695 DWORD last_error = ::WSAGetLastError();
00696 if (last_error == ERROR_NETNAME_DELETED)
00697 last_error = WSAECONNRESET;
00698 else if (last_error == ERROR_PORT_UNREACHABLE)
00699 last_error = WSAECONNREFUSED;
00700 ec = asio::error_code(last_error,
00701 asio::error::get_system_category());
00702 return 0;
00703 }
00704
00705 ec = asio::error_code();
00706 return bytes_transferred;
00707 }
00708
00709
00710 size_t send(implementation_type& impl, const null_buffers&,
00711 socket_base::message_flags, asio::error_code& ec)
00712 {
00713 if (!is_open(impl))
00714 {
00715 ec = asio::error::bad_descriptor;
00716 return 0;
00717 }
00718
00719
00720 socket_ops::poll_write(impl.socket_, ec);
00721
00722 return 0;
00723 }
00724
00725 template <typename ConstBufferSequence, typename Handler>
00726 class send_operation
00727 : public operation
00728 {
00729 public:
00730 send_operation(win_iocp_io_service& io_service,
00731 weak_cancel_token_type cancel_token,
00732 const ConstBufferSequence& buffers, Handler handler)
00733 : operation(io_service,
00734 &send_operation<ConstBufferSequence, Handler>::do_completion_impl,
00735 &send_operation<ConstBufferSequence, Handler>::destroy_impl),
00736 work_(io_service.get_io_service()),
00737 cancel_token_(cancel_token),
00738 buffers_(buffers),
00739 handler_(handler)
00740 {
00741 }
00742
00743 private:
00744 static void do_completion_impl(operation* op,
00745 DWORD last_error, size_t bytes_transferred)
00746 {
00747
00748 typedef send_operation<ConstBufferSequence, Handler> op_type;
00749 op_type* handler_op(static_cast<op_type*>(op));
00750 typedef handler_alloc_traits<Handler, op_type> alloc_traits;
00751 handler_ptr<alloc_traits> ptr(handler_op->handler_, handler_op);
00752
00753 #if defined(ASIO_ENABLE_BUFFER_DEBUGGING)
00754
00755 typename ConstBufferSequence::const_iterator iter
00756 = handler_op->buffers_.begin();
00757 typename ConstBufferSequence::const_iterator end
00758 = handler_op->buffers_.end();
00759 while (iter != end)
00760 {
00761 asio::const_buffer buffer(*iter);
00762 asio::buffer_cast<const char*>(buffer);
00763 ++iter;
00764 }
00765 #endif // defined(ASIO_ENABLE_BUFFER_DEBUGGING)
00766
00767
00768 asio::error_code ec(last_error,
00769 asio::error::get_system_category());
00770 if (ec.value() == ERROR_NETNAME_DELETED)
00771 {
00772 if (handler_op->cancel_token_.expired())
00773 ec = asio::error::operation_aborted;
00774 else
00775 ec = asio::error::connection_reset;
00776 }
00777 else if (ec.value() == ERROR_PORT_UNREACHABLE)
00778 {
00779 ec = asio::error::connection_refused;
00780 }
00781
00782
00783
00784 Handler handler(handler_op->handler_);
00785
00786
00787 ptr.reset();
00788
00789
00790 asio_handler_invoke_helpers::invoke(
00791 detail::bind_handler(handler, ec, bytes_transferred), &handler);
00792 }
00793
00794 static void destroy_impl(operation* op)
00795 {
00796
00797 typedef send_operation<ConstBufferSequence, Handler> op_type;
00798 op_type* handler_op(static_cast<op_type*>(op));
00799 typedef handler_alloc_traits<Handler, op_type> alloc_traits;
00800 handler_ptr<alloc_traits> ptr(handler_op->handler_, handler_op);
00801
00802
00803
00804
00805
00806 Handler handler(handler_op->handler_);
00807 (void)handler;
00808
00809
00810 ptr.reset();
00811 }
00812
00813 asio::io_service::work work_;
00814 weak_cancel_token_type cancel_token_;
00815 ConstBufferSequence buffers_;
00816 Handler handler_;
00817 };
00818
00819
00820
00821 template <typename ConstBufferSequence, typename Handler>
00822 void async_send(implementation_type& impl, const ConstBufferSequence& buffers,
00823 socket_base::message_flags flags, Handler handler)
00824 {
00825 if (!is_open(impl))
00826 {
00827 this->get_io_service().post(bind_handler(handler,
00828 asio::error::bad_descriptor, 0));
00829 return;
00830 }
00831
00832 #if defined(ASIO_ENABLE_CANCELIO)
00833
00834 if (impl.safe_cancellation_thread_id_ == 0)
00835 impl.safe_cancellation_thread_id_ = ::GetCurrentThreadId();
00836 else if (impl.safe_cancellation_thread_id_ != ::GetCurrentThreadId())
00837 impl.safe_cancellation_thread_id_ = ~DWORD(0);
00838 #endif // defined(ASIO_ENABLE_CANCELIO)
00839
00840
00841 typedef send_operation<ConstBufferSequence, Handler> value_type;
00842 typedef handler_alloc_traits<Handler, value_type> alloc_traits;
00843 raw_handler_ptr<alloc_traits> raw_ptr(handler);
00844 handler_ptr<alloc_traits> ptr(raw_ptr, iocp_service_,
00845 impl.cancel_token_, buffers, handler);
00846
00847
00848 ::WSABUF bufs[max_buffers];
00849 typename ConstBufferSequence::const_iterator iter = buffers.begin();
00850 typename ConstBufferSequence::const_iterator end = buffers.end();
00851 DWORD i = 0;
00852 size_t total_buffer_size = 0;
00853 for (; iter != end && i < max_buffers; ++iter, ++i)
00854 {
00855 asio::const_buffer buffer(*iter);
00856 bufs[i].len = static_cast<u_long>(asio::buffer_size(buffer));
00857 bufs[i].buf = const_cast<char*>(
00858 asio::buffer_cast<const char*>(buffer));
00859 total_buffer_size += asio::buffer_size(buffer);
00860 }
00861
00862
00863 if (impl.protocol_.type() == SOCK_STREAM && total_buffer_size == 0)
00864 {
00865 asio::io_service::work work(this->get_io_service());
00866 ptr.reset();
00867 asio::error_code error;
00868 iocp_service_.post(bind_handler(handler, error, 0));
00869 return;
00870 }
00871
00872
00873 DWORD bytes_transferred = 0;
00874 int result = ::WSASend(impl.socket_, bufs, i,
00875 &bytes_transferred, flags, ptr.get(), 0);
00876 DWORD last_error = ::WSAGetLastError();
00877
00878
00879 if (result != 0 && last_error != WSA_IO_PENDING)
00880 {
00881 asio::io_service::work work(this->get_io_service());
00882 ptr.reset();
00883 asio::error_code ec(last_error,
00884 asio::error::get_system_category());
00885 iocp_service_.post(bind_handler(handler, ec, bytes_transferred));
00886 }
00887 else
00888 {
00889 ptr.release();
00890 }
00891 }
00892
00893 template <typename Handler>
00894 class null_buffers_operation
00895 {
00896 public:
00897 null_buffers_operation(asio::io_service& io_service, Handler handler)
00898 : work_(io_service),
00899 handler_(handler)
00900 {
00901 }
00902
00903 bool perform(asio::error_code&,
00904 std::size_t& bytes_transferred)
00905 {
00906 bytes_transferred = 0;
00907 return true;
00908 }
00909
00910 void complete(const asio::error_code& ec,
00911 std::size_t bytes_transferred)
00912 {
00913 work_.get_io_service().post(bind_handler(
00914 handler_, ec, bytes_transferred));
00915 }
00916
00917 private:
00918 asio::io_service::work work_;
00919 Handler handler_;
00920 };
00921
00922
00923 template <typename Handler>
00924 void async_send(implementation_type& impl, const null_buffers&,
00925 socket_base::message_flags, Handler handler)
00926 {
00927 if (!is_open(impl))
00928 {
00929 this->get_io_service().post(bind_handler(handler,
00930 asio::error::bad_descriptor, 0));
00931 }
00932 else
00933 {
00934
00935 reactor_type* reactor = static_cast<reactor_type*>(
00936 interlocked_compare_exchange_pointer(
00937 reinterpret_cast<void**>(&reactor_), 0, 0));
00938 if (!reactor)
00939 {
00940 reactor = &(asio::use_service<reactor_type>(
00941 this->get_io_service()));
00942 interlocked_exchange_pointer(
00943 reinterpret_cast<void**>(&reactor_), reactor);
00944 }
00945
00946 reactor->start_write_op(impl.socket_, impl.reactor_data_,
00947 null_buffers_operation<Handler>(this->get_io_service(), handler),
00948 false);
00949 }
00950 }
00951
00952
00953
00954 template <typename ConstBufferSequence>
00955 size_t send_to(implementation_type& impl, const ConstBufferSequence& buffers,
00956 const endpoint_type& destination, socket_base::message_flags flags,
00957 asio::error_code& ec)
00958 {
00959 if (!is_open(impl))
00960 {
00961 ec = asio::error::bad_descriptor;
00962 return 0;
00963 }
00964
00965
00966 ::WSABUF bufs[max_buffers];
00967 typename ConstBufferSequence::const_iterator iter = buffers.begin();
00968 typename ConstBufferSequence::const_iterator end = buffers.end();
00969 DWORD i = 0;
00970 for (; iter != end && i < max_buffers; ++iter, ++i)
00971 {
00972 asio::const_buffer buffer(*iter);
00973 bufs[i].len = static_cast<u_long>(asio::buffer_size(buffer));
00974 bufs[i].buf = const_cast<char*>(
00975 asio::buffer_cast<const char*>(buffer));
00976 }
00977
00978
00979 DWORD bytes_transferred = 0;
00980 int result = ::WSASendTo(impl.socket_, bufs, i, &bytes_transferred,
00981 flags, destination.data(), static_cast<int>(destination.size()), 0, 0);
00982 if (result != 0)
00983 {
00984 DWORD last_error = ::WSAGetLastError();
00985 if (last_error == ERROR_PORT_UNREACHABLE)
00986 last_error = WSAECONNREFUSED;
00987 ec = asio::error_code(last_error,
00988 asio::error::get_system_category());
00989 return 0;
00990 }
00991
00992 ec = asio::error_code();
00993 return bytes_transferred;
00994 }
00995
00996
00997 size_t send_to(implementation_type& impl, const null_buffers&,
00998 socket_base::message_flags, const endpoint_type&,
00999 asio::error_code& ec)
01000 {
01001 if (!is_open(impl))
01002 {
01003 ec = asio::error::bad_descriptor;
01004 return 0;
01005 }
01006
01007
01008 socket_ops::poll_write(impl.socket_, ec);
01009
01010 return 0;
01011 }
01012
01013 template <typename ConstBufferSequence, typename Handler>
01014 class send_to_operation
01015 : public operation
01016 {
01017 public:
01018 send_to_operation(win_iocp_io_service& io_service,
01019 const ConstBufferSequence& buffers, Handler handler)
01020 : operation(io_service,
01021 &send_to_operation<ConstBufferSequence, Handler>::do_completion_impl,
01022 &send_to_operation<ConstBufferSequence, Handler>::destroy_impl),
01023 work_(io_service.get_io_service()),
01024 buffers_(buffers),
01025 handler_(handler)
01026 {
01027 }
01028
01029 private:
01030 static void do_completion_impl(operation* op,
01031 DWORD last_error, size_t bytes_transferred)
01032 {
01033
01034 typedef send_to_operation<ConstBufferSequence, Handler> op_type;
01035 op_type* handler_op(static_cast<op_type*>(op));
01036 typedef handler_alloc_traits<Handler, op_type> alloc_traits;
01037 handler_ptr<alloc_traits> ptr(handler_op->handler_, handler_op);
01038
01039 #if defined(ASIO_ENABLE_BUFFER_DEBUGGING)
01040
01041 typename ConstBufferSequence::const_iterator iter
01042 = handler_op->buffers_.begin();
01043 typename ConstBufferSequence::const_iterator end
01044 = handler_op->buffers_.end();
01045 while (iter != end)
01046 {
01047 asio::const_buffer buffer(*iter);
01048 asio::buffer_cast<const char*>(buffer);
01049 ++iter;
01050 }
01051 #endif // defined(ASIO_ENABLE_BUFFER_DEBUGGING)
01052
01053
01054 asio::error_code ec(last_error,
01055 asio::error::get_system_category());
01056 if (ec.value() == ERROR_PORT_UNREACHABLE)
01057 {
01058 ec = asio::error::connection_refused;
01059 }
01060
01061
01062
01063 Handler handler(handler_op->handler_);
01064
01065
01066 ptr.reset();
01067
01068
01069 asio_handler_invoke_helpers::invoke(
01070 detail::bind_handler(handler, ec, bytes_transferred), &handler);
01071 }
01072
01073 static void destroy_impl(operation* op)
01074 {
01075
01076 typedef send_to_operation<ConstBufferSequence, Handler> op_type;
01077 op_type* handler_op(static_cast<op_type*>(op));
01078 typedef handler_alloc_traits<Handler, op_type> alloc_traits;
01079 handler_ptr<alloc_traits> ptr(handler_op->handler_, handler_op);
01080
01081
01082
01083
01084
01085 Handler handler(handler_op->handler_);
01086 (void)handler;
01087
01088
01089 ptr.reset();
01090 }
01091
01092 asio::io_service::work work_;
01093 ConstBufferSequence buffers_;
01094 Handler handler_;
01095 };
01096
01097
01098
01099 template <typename ConstBufferSequence, typename Handler>
01100 void async_send_to(implementation_type& impl,
01101 const ConstBufferSequence& buffers, const endpoint_type& destination,
01102 socket_base::message_flags flags, Handler handler)
01103 {
01104 if (!is_open(impl))
01105 {
01106 this->get_io_service().post(bind_handler(handler,
01107 asio::error::bad_descriptor, 0));
01108 return;
01109 }
01110
01111 #if defined(ASIO_ENABLE_CANCELIO)
01112
01113 if (impl.safe_cancellation_thread_id_ == 0)
01114 impl.safe_cancellation_thread_id_ = ::GetCurrentThreadId();
01115 else if (impl.safe_cancellation_thread_id_ != ::GetCurrentThreadId())
01116 impl.safe_cancellation_thread_id_ = ~DWORD(0);
01117 #endif // defined(ASIO_ENABLE_CANCELIO)
01118
01119
01120 typedef send_to_operation<ConstBufferSequence, Handler> value_type;
01121 typedef handler_alloc_traits<Handler, value_type> alloc_traits;
01122 raw_handler_ptr<alloc_traits> raw_ptr(handler);
01123 handler_ptr<alloc_traits> ptr(raw_ptr, iocp_service_, buffers, handler);
01124
01125
01126 ::WSABUF bufs[max_buffers];
01127 typename ConstBufferSequence::const_iterator iter = buffers.begin();
01128 typename ConstBufferSequence::const_iterator end = buffers.end();
01129 DWORD i = 0;
01130 for (; iter != end && i < max_buffers; ++iter, ++i)
01131 {
01132 asio::const_buffer buffer(*iter);
01133 bufs[i].len = static_cast<u_long>(asio::buffer_size(buffer));
01134 bufs[i].buf = const_cast<char*>(
01135 asio::buffer_cast<const char*>(buffer));
01136 }
01137
01138
01139 DWORD bytes_transferred = 0;
01140 int result = ::WSASendTo(impl.socket_, bufs, i, &bytes_transferred, flags,
01141 destination.data(), static_cast<int>(destination.size()), ptr.get(), 0);
01142 DWORD last_error = ::WSAGetLastError();
01143
01144
01145 if (result != 0 && last_error != WSA_IO_PENDING)
01146 {
01147 asio::io_service::work work(this->get_io_service());
01148 ptr.reset();
01149 asio::error_code ec(last_error,
01150 asio::error::get_system_category());
01151 iocp_service_.post(bind_handler(handler, ec, bytes_transferred));
01152 }
01153 else
01154 {
01155 ptr.release();
01156 }
01157 }
01158
01159
01160 template <typename Handler>
01161 void async_send_to(implementation_type& impl, const null_buffers&,
01162 socket_base::message_flags, const endpoint_type&, Handler handler)
01163 {
01164 if (!is_open(impl))
01165 {
01166 this->get_io_service().post(bind_handler(handler,
01167 asio::error::bad_descriptor, 0));
01168 }
01169 else
01170 {
01171
01172 reactor_type* reactor = static_cast<reactor_type*>(
01173 interlocked_compare_exchange_pointer(
01174 reinterpret_cast<void**>(&reactor_), 0, 0));
01175 if (!reactor)
01176 {
01177 reactor = &(asio::use_service<reactor_type>(
01178 this->get_io_service()));
01179 interlocked_exchange_pointer(
01180 reinterpret_cast<void**>(&reactor_), reactor);
01181 }
01182
01183 reactor->start_write_op(impl.socket_, impl.reactor_data_,
01184 null_buffers_operation<Handler>(this->get_io_service(), handler),
01185 false);
01186 }
01187 }
01188
01189
01190 template <typename MutableBufferSequence>
01191 size_t receive(implementation_type& impl,
01192 const MutableBufferSequence& buffers,
01193 socket_base::message_flags flags, asio::error_code& ec)
01194 {
01195 if (!is_open(impl))
01196 {
01197 ec = asio::error::bad_descriptor;
01198 return 0;
01199 }
01200
01201
01202 ::WSABUF bufs[max_buffers];
01203 typename MutableBufferSequence::const_iterator iter = buffers.begin();
01204 typename MutableBufferSequence::const_iterator end = buffers.end();
01205 DWORD i = 0;
01206 size_t total_buffer_size = 0;
01207 for (; iter != end && i < max_buffers; ++iter, ++i)
01208 {
01209 asio::mutable_buffer buffer(*iter);
01210 bufs[i].len = static_cast<u_long>(asio::buffer_size(buffer));
01211 bufs[i].buf = asio::buffer_cast<char*>(buffer);
01212 total_buffer_size += asio::buffer_size(buffer);
01213 }
01214
01215
01216 if (impl.protocol_.type() == SOCK_STREAM && total_buffer_size == 0)
01217 {
01218 ec = asio::error_code();
01219 return 0;
01220 }
01221
01222
01223 DWORD bytes_transferred = 0;
01224 DWORD recv_flags = flags;
01225 int result = ::WSARecv(impl.socket_, bufs, i,
01226 &bytes_transferred, &recv_flags, 0, 0);
01227 if (result != 0)
01228 {
01229 DWORD last_error = ::WSAGetLastError();
01230 if (last_error == ERROR_NETNAME_DELETED)
01231 last_error = WSAECONNRESET;
01232 else if (last_error == ERROR_PORT_UNREACHABLE)
01233 last_error = WSAECONNREFUSED;
01234 ec = asio::error_code(last_error,
01235 asio::error::get_system_category());
01236 return 0;
01237 }
01238 if (bytes_transferred == 0 && impl.protocol_.type() == SOCK_STREAM)
01239 {
01240 ec = asio::error::eof;
01241 return 0;
01242 }
01243
01244 ec = asio::error_code();
01245 return bytes_transferred;
01246 }
01247
01248
01249 size_t receive(implementation_type& impl, const null_buffers&,
01250 socket_base::message_flags, asio::error_code& ec)
01251 {
01252 if (!is_open(impl))
01253 {
01254 ec = asio::error::bad_descriptor;
01255 return 0;
01256 }
01257
01258
01259 socket_ops::poll_read(impl.socket_, ec);
01260
01261 return 0;
01262 }
01263
01264 template <typename MutableBufferSequence, typename Handler>
01265 class receive_operation
01266 : public operation
01267 {
01268 public:
01269 receive_operation(int protocol_type, win_iocp_io_service& io_service,
01270 weak_cancel_token_type cancel_token,
01271 const MutableBufferSequence& buffers, Handler handler)
01272 : operation(io_service,
01273 &receive_operation<
01274 MutableBufferSequence, Handler>::do_completion_impl,
01275 &receive_operation<
01276 MutableBufferSequence, Handler>::destroy_impl),
01277 protocol_type_(protocol_type),
01278 work_(io_service.get_io_service()),
01279 cancel_token_(cancel_token),
01280 buffers_(buffers),
01281 handler_(handler)
01282 {
01283 }
01284
01285 private:
01286 static void do_completion_impl(operation* op,
01287 DWORD last_error, size_t bytes_transferred)
01288 {
01289
01290 typedef receive_operation<MutableBufferSequence, Handler> op_type;
01291 op_type* handler_op(static_cast<op_type*>(op));
01292 typedef handler_alloc_traits<Handler, op_type> alloc_traits;
01293 handler_ptr<alloc_traits> ptr(handler_op->handler_, handler_op);
01294
01295 #if defined(ASIO_ENABLE_BUFFER_DEBUGGING)
01296
01297 typename MutableBufferSequence::const_iterator iter
01298 = handler_op->buffers_.begin();
01299 typename MutableBufferSequence::const_iterator end
01300 = handler_op->buffers_.end();
01301 while (iter != end)
01302 {
01303 asio::mutable_buffer buffer(*iter);
01304 asio::buffer_cast<char*>(buffer);
01305 ++iter;
01306 }
01307 #endif // defined(ASIO_ENABLE_BUFFER_DEBUGGING)
01308
01309
01310 asio::error_code ec(last_error,
01311 asio::error::get_system_category());
01312 if (ec.value() == ERROR_NETNAME_DELETED)
01313 {
01314 if (handler_op->cancel_token_.expired())
01315 ec = asio::error::operation_aborted;
01316 else
01317 ec = asio::error::connection_reset;
01318 }
01319 else if (ec.value() == ERROR_PORT_UNREACHABLE)
01320 {
01321 ec = asio::error::connection_refused;
01322 }
01323
01324
01325 else if (!ec && bytes_transferred == 0
01326 && handler_op->protocol_type_ == SOCK_STREAM
01327 && !boost::is_same<MutableBufferSequence, null_buffers>::value)
01328 {
01329 ec = asio::error::eof;
01330 }
01331
01332
01333
01334 Handler handler(handler_op->handler_);
01335
01336
01337 ptr.reset();
01338
01339
01340 asio_handler_invoke_helpers::invoke(
01341 detail::bind_handler(handler, ec, bytes_transferred), &handler);
01342 }
01343
01344 static void destroy_impl(operation* op)
01345 {
01346
01347 typedef receive_operation<MutableBufferSequence, Handler> op_type;
01348 op_type* handler_op(static_cast<op_type*>(op));
01349 typedef handler_alloc_traits<Handler, op_type> alloc_traits;
01350 handler_ptr<alloc_traits> ptr(handler_op->handler_, handler_op);
01351
01352
01353
01354
01355
01356 Handler handler(handler_op->handler_);
01357 (void)handler;
01358
01359
01360 ptr.reset();
01361 }
01362
01363 int protocol_type_;
01364 asio::io_service::work work_;
01365 weak_cancel_token_type cancel_token_;
01366 MutableBufferSequence buffers_;
01367 Handler handler_;
01368 };
01369
01370
01371
01372 template <typename MutableBufferSequence, typename Handler>
01373 void async_receive(implementation_type& impl,
01374 const MutableBufferSequence& buffers,
01375 socket_base::message_flags flags, Handler handler)
01376 {
01377 if (!is_open(impl))
01378 {
01379 this->get_io_service().post(bind_handler(handler,
01380 asio::error::bad_descriptor, 0));
01381 return;
01382 }
01383
01384 #if defined(ASIO_ENABLE_CANCELIO)
01385
01386 if (impl.safe_cancellation_thread_id_ == 0)
01387 impl.safe_cancellation_thread_id_ = ::GetCurrentThreadId();
01388 else if (impl.safe_cancellation_thread_id_ != ::GetCurrentThreadId())
01389 impl.safe_cancellation_thread_id_ = ~DWORD(0);
01390 #endif // defined(ASIO_ENABLE_CANCELIO)
01391
01392
01393 typedef receive_operation<MutableBufferSequence, Handler> value_type;
01394 typedef handler_alloc_traits<Handler, value_type> alloc_traits;
01395 raw_handler_ptr<alloc_traits> raw_ptr(handler);
01396 int protocol_type = impl.protocol_.type();
01397 handler_ptr<alloc_traits> ptr(raw_ptr, protocol_type,
01398 iocp_service_, impl.cancel_token_, buffers, handler);
01399
01400
01401 ::WSABUF bufs[max_buffers];
01402 typename MutableBufferSequence::const_iterator iter = buffers.begin();
01403 typename MutableBufferSequence::const_iterator end = buffers.end();
01404 DWORD i = 0;
01405 size_t total_buffer_size = 0;
01406 for (; iter != end && i < max_buffers; ++iter, ++i)
01407 {
01408 asio::mutable_buffer buffer(*iter);
01409 bufs[i].len = static_cast<u_long>(asio::buffer_size(buffer));
01410 bufs[i].buf = asio::buffer_cast<char*>(buffer);
01411 total_buffer_size += asio::buffer_size(buffer);
01412 }
01413
01414
01415 if (impl.protocol_.type() == SOCK_STREAM && total_buffer_size == 0)
01416 {
01417 asio::io_service::work work(this->get_io_service());
01418 ptr.reset();
01419 asio::error_code error;
01420 iocp_service_.post(bind_handler(handler, error, 0));
01421 return;
01422 }
01423
01424
01425 DWORD bytes_transferred = 0;
01426 DWORD recv_flags = flags;
01427 int result = ::WSARecv(impl.socket_, bufs, i,
01428 &bytes_transferred, &recv_flags, ptr.get(), 0);
01429 DWORD last_error = ::WSAGetLastError();
01430 if (result != 0 && last_error != WSA_IO_PENDING)
01431 {
01432 asio::io_service::work work(this->get_io_service());
01433 ptr.reset();
01434 asio::error_code ec(last_error,
01435 asio::error::get_system_category());
01436 iocp_service_.post(bind_handler(handler, ec, bytes_transferred));
01437 }
01438 else
01439 {
01440 ptr.release();
01441 }
01442 }
01443
01444
01445 template <typename Handler>
01446 void async_receive(implementation_type& impl, const null_buffers& buffers,
01447 socket_base::message_flags flags, Handler handler)
01448 {
01449 if (!is_open(impl))
01450 {
01451 this->get_io_service().post(bind_handler(handler,
01452 asio::error::bad_descriptor, 0));
01453 }
01454 else if (impl.protocol_.type() == SOCK_STREAM)
01455 {
01456
01457
01458
01459 #if defined(ASIO_ENABLE_CANCELIO)
01460
01461 if (impl.safe_cancellation_thread_id_ == 0)
01462 impl.safe_cancellation_thread_id_ = ::GetCurrentThreadId();
01463 else if (impl.safe_cancellation_thread_id_ != ::GetCurrentThreadId())
01464 impl.safe_cancellation_thread_id_ = ~DWORD(0);
01465 #endif // defined(ASIO_ENABLE_CANCELIO)
01466
01467
01468 typedef receive_operation<null_buffers, Handler> value_type;
01469 typedef handler_alloc_traits<Handler, value_type> alloc_traits;
01470 raw_handler_ptr<alloc_traits> raw_ptr(handler);
01471 int protocol_type = impl.protocol_.type();
01472 handler_ptr<alloc_traits> ptr(raw_ptr, protocol_type,
01473 iocp_service_, impl.cancel_token_, buffers, handler);
01474
01475
01476 ::WSABUF buf = { 0, 0 };
01477 DWORD bytes_transferred = 0;
01478 DWORD recv_flags = flags;
01479 int result = ::WSARecv(impl.socket_, &buf, 1,
01480 &bytes_transferred, &recv_flags, ptr.get(), 0);
01481 DWORD last_error = ::WSAGetLastError();
01482 if (result != 0 && last_error != WSA_IO_PENDING)
01483 {
01484 asio::io_service::work work(this->get_io_service());
01485 ptr.reset();
01486 asio::error_code ec(last_error,
01487 asio::error::get_system_category());
01488 iocp_service_.post(bind_handler(handler, ec, bytes_transferred));
01489 }
01490 else
01491 {
01492 ptr.release();
01493 }
01494 }
01495 else
01496 {
01497
01498 reactor_type* reactor = static_cast<reactor_type*>(
01499 interlocked_compare_exchange_pointer(
01500 reinterpret_cast<void**>(&reactor_), 0, 0));
01501 if (!reactor)
01502 {
01503 reactor = &(asio::use_service<reactor_type>(
01504 this->get_io_service()));
01505 interlocked_exchange_pointer(
01506 reinterpret_cast<void**>(&reactor_), reactor);
01507 }
01508
01509 if (flags & socket_base::message_out_of_band)
01510 {
01511 reactor->start_except_op(impl.socket_, impl.reactor_data_,
01512 null_buffers_operation<Handler>(this->get_io_service(), handler));
01513 }
01514 else
01515 {
01516 reactor->start_read_op(impl.socket_, impl.reactor_data_,
01517 null_buffers_operation<Handler>(this->get_io_service(), handler),
01518 false);
01519 }
01520 }
01521 }
01522
01523
01524
01525 template <typename MutableBufferSequence>
01526 size_t receive_from(implementation_type& impl,
01527 const MutableBufferSequence& buffers,
01528 endpoint_type& sender_endpoint, socket_base::message_flags flags,
01529 asio::error_code& ec)
01530 {
01531 if (!is_open(impl))
01532 {
01533 ec = asio::error::bad_descriptor;
01534 return 0;
01535 }
01536
01537
01538 ::WSABUF bufs[max_buffers];
01539 typename MutableBufferSequence::const_iterator iter = buffers.begin();
01540 typename MutableBufferSequence::const_iterator end = buffers.end();
01541 DWORD i = 0;
01542 for (; iter != end && i < max_buffers; ++iter, ++i)
01543 {
01544 asio::mutable_buffer buffer(*iter);
01545 bufs[i].len = static_cast<u_long>(asio::buffer_size(buffer));
01546 bufs[i].buf = asio::buffer_cast<char*>(buffer);
01547 }
01548
01549
01550 DWORD bytes_transferred = 0;
01551 DWORD recv_flags = flags;
01552 int endpoint_size = static_cast<int>(sender_endpoint.capacity());
01553 int result = ::WSARecvFrom(impl.socket_, bufs, i, &bytes_transferred,
01554 &recv_flags, sender_endpoint.data(), &endpoint_size, 0, 0);
01555 if (result != 0)
01556 {
01557 DWORD last_error = ::WSAGetLastError();
01558 if (last_error == ERROR_PORT_UNREACHABLE)
01559 last_error = WSAECONNREFUSED;
01560 ec = asio::error_code(last_error,
01561 asio::error::get_system_category());
01562 return 0;
01563 }
01564 if (bytes_transferred == 0 && impl.protocol_.type() == SOCK_STREAM)
01565 {
01566 ec = asio::error::eof;
01567 return 0;
01568 }
01569
01570 sender_endpoint.resize(static_cast<std::size_t>(endpoint_size));
01571
01572 ec = asio::error_code();
01573 return bytes_transferred;
01574 }
01575
01576
01577 size_t receive_from(implementation_type& impl,
01578 const null_buffers&, endpoint_type& sender_endpoint,
01579 socket_base::message_flags, asio::error_code& ec)
01580 {
01581 if (!is_open(impl))
01582 {
01583 ec = asio::error::bad_descriptor;
01584 return 0;
01585 }
01586
01587
01588 socket_ops::poll_read(impl.socket_, ec);
01589
01590
01591 sender_endpoint = endpoint_type();
01592
01593 return 0;
01594 }
01595
01596 template <typename MutableBufferSequence, typename Handler>
01597 class receive_from_operation
01598 : public operation
01599 {
01600 public:
01601 receive_from_operation(int protocol_type, win_iocp_io_service& io_service,
01602 endpoint_type& endpoint, const MutableBufferSequence& buffers,
01603 Handler handler)
01604 : operation(io_service,
01605 &receive_from_operation<
01606 MutableBufferSequence, Handler>::do_completion_impl,
01607 &receive_from_operation<
01608 MutableBufferSequence, Handler>::destroy_impl),
01609 protocol_type_(protocol_type),
01610 endpoint_(endpoint),
01611 endpoint_size_(static_cast<int>(endpoint.capacity())),
01612 work_(io_service.get_io_service()),
01613 buffers_(buffers),
01614 handler_(handler)
01615 {
01616 }
01617
01618 int& endpoint_size()
01619 {
01620 return endpoint_size_;
01621 }
01622
01623 private:
01624 static void do_completion_impl(operation* op,
01625 DWORD last_error, size_t bytes_transferred)
01626 {
01627
01628 typedef receive_from_operation<MutableBufferSequence, Handler> op_type;
01629 op_type* handler_op(static_cast<op_type*>(op));
01630 typedef handler_alloc_traits<Handler, op_type> alloc_traits;
01631 handler_ptr<alloc_traits> ptr(handler_op->handler_, handler_op);
01632
01633 #if defined(ASIO_ENABLE_BUFFER_DEBUGGING)
01634
01635 typename MutableBufferSequence::const_iterator iter
01636 = handler_op->buffers_.begin();
01637 typename MutableBufferSequence::const_iterator end
01638 = handler_op->buffers_.end();
01639 while (iter != end)
01640 {
01641 asio::mutable_buffer buffer(*iter);
01642 asio::buffer_cast<char*>(buffer);
01643 ++iter;
01644 }
01645 #endif // defined(ASIO_ENABLE_BUFFER_DEBUGGING)
01646
01647
01648 asio::error_code ec(last_error,
01649 asio::error::get_system_category());
01650 if (ec.value() == ERROR_PORT_UNREACHABLE)
01651 {
01652 ec = asio::error::connection_refused;
01653 }
01654
01655
01656 if (!ec && bytes_transferred == 0
01657 && handler_op->protocol_type_ == SOCK_STREAM)
01658 {
01659 ec = asio::error::eof;
01660 }
01661
01662
01663 handler_op->endpoint_.resize(handler_op->endpoint_size_);
01664
01665
01666
01667 Handler handler(handler_op->handler_);
01668
01669
01670 ptr.reset();
01671
01672
01673 asio_handler_invoke_helpers::invoke(
01674 detail::bind_handler(handler, ec, bytes_transferred), &handler);
01675 }
01676
01677 static void destroy_impl(operation* op)
01678 {
01679
01680 typedef receive_from_operation<MutableBufferSequence, Handler> op_type;
01681 op_type* handler_op(static_cast<op_type*>(op));
01682 typedef handler_alloc_traits<Handler, op_type> alloc_traits;
01683 handler_ptr<alloc_traits> ptr(handler_op->handler_, handler_op);
01684
01685
01686
01687
01688
01689 Handler handler(handler_op->handler_);
01690 (void)handler;
01691
01692
01693 ptr.reset();
01694 }
01695
01696 int protocol_type_;
01697 endpoint_type& endpoint_;
01698 int endpoint_size_;
01699 asio::io_service::work work_;
01700 MutableBufferSequence buffers_;
01701 Handler handler_;
01702 };
01703
01704
01705
01706
01707 template <typename MutableBufferSequence, typename Handler>
01708 void async_receive_from(implementation_type& impl,
01709 const MutableBufferSequence& buffers, endpoint_type& sender_endp,
01710 socket_base::message_flags flags, Handler handler)
01711 {
01712 if (!is_open(impl))
01713 {
01714 this->get_io_service().post(bind_handler(handler,
01715 asio::error::bad_descriptor, 0));
01716 return;
01717 }
01718
01719 #if defined(ASIO_ENABLE_CANCELIO)
01720
01721 if (impl.safe_cancellation_thread_id_ == 0)
01722 impl.safe_cancellation_thread_id_ = ::GetCurrentThreadId();
01723 else if (impl.safe_cancellation_thread_id_ != ::GetCurrentThreadId())
01724 impl.safe_cancellation_thread_id_ = ~DWORD(0);
01725 #endif // defined(ASIO_ENABLE_CANCELIO)
01726
01727
01728 typedef receive_from_operation<MutableBufferSequence, Handler> value_type;
01729 typedef handler_alloc_traits<Handler, value_type> alloc_traits;
01730 raw_handler_ptr<alloc_traits> raw_ptr(handler);
01731 int protocol_type = impl.protocol_.type();
01732 handler_ptr<alloc_traits> ptr(raw_ptr, protocol_type,
01733 iocp_service_, sender_endp, buffers, handler);
01734
01735
01736 ::WSABUF bufs[max_buffers];
01737 typename MutableBufferSequence::const_iterator iter = buffers.begin();
01738 typename MutableBufferSequence::const_iterator end = buffers.end();
01739 DWORD i = 0;
01740 for (; iter != end && i < max_buffers; ++iter, ++i)
01741 {
01742 asio::mutable_buffer buffer(*iter);
01743 bufs[i].len = static_cast<u_long>(asio::buffer_size(buffer));
01744 bufs[i].buf = asio::buffer_cast<char*>(buffer);
01745 }
01746
01747
01748 DWORD bytes_transferred = 0;
01749 DWORD recv_flags = flags;
01750 int result = ::WSARecvFrom(impl.socket_, bufs, i, &bytes_transferred,
01751 &recv_flags, sender_endp.data(), &ptr.get()->endpoint_size(),
01752 ptr.get(), 0);
01753 DWORD last_error = ::WSAGetLastError();
01754 if (result != 0 && last_error != WSA_IO_PENDING)
01755 {
01756 asio::io_service::work work(this->get_io_service());
01757 ptr.reset();
01758 asio::error_code ec(last_error,
01759 asio::error::get_system_category());
01760 iocp_service_.post(bind_handler(handler, ec, bytes_transferred));
01761 }
01762 else
01763 {
01764 ptr.release();
01765 }
01766 }
01767
01768
01769 template <typename Handler>
01770 void async_receive_from(implementation_type& impl,
01771 const null_buffers&, endpoint_type& sender_endpoint,
01772 socket_base::message_flags flags, Handler handler)
01773 {
01774 if (!is_open(impl))
01775 {
01776 this->get_io_service().post(bind_handler(handler,
01777 asio::error::bad_descriptor, 0));
01778 }
01779 else
01780 {
01781
01782 reactor_type* reactor = static_cast<reactor_type*>(
01783 interlocked_compare_exchange_pointer(
01784 reinterpret_cast<void**>(&reactor_), 0, 0));
01785 if (!reactor)
01786 {
01787 reactor = &(asio::use_service<reactor_type>(
01788 this->get_io_service()));
01789 interlocked_exchange_pointer(
01790 reinterpret_cast<void**>(&reactor_), reactor);
01791 }
01792
01793
01794 sender_endpoint = endpoint_type();
01795
01796 if (flags & socket_base::message_out_of_band)
01797 {
01798 reactor->start_except_op(impl.socket_, impl.reactor_data_,
01799 null_buffers_operation<Handler>(this->get_io_service(), handler));
01800 }
01801 else
01802 {
01803 reactor->start_read_op(impl.socket_, impl.reactor_data_,
01804 null_buffers_operation<Handler>(this->get_io_service(), handler),
01805 false);
01806 }
01807 }
01808 }
01809
01810
01811 template <typename Socket>
01812 asio::error_code accept(implementation_type& impl, Socket& peer,
01813 endpoint_type* peer_endpoint, asio::error_code& ec)
01814 {
01815 if (!is_open(impl))
01816 {
01817 ec = asio::error::bad_descriptor;
01818 return ec;
01819 }
01820
01821
01822 if (peer.is_open())
01823 {
01824 ec = asio::error::already_open;
01825 return ec;
01826 }
01827
01828 for (;;)
01829 {
01830 socket_holder new_socket;
01831 std::size_t addr_len = 0;
01832 if (peer_endpoint)
01833 {
01834 addr_len = peer_endpoint->capacity();
01835 new_socket.reset(socket_ops::accept(impl.socket_,
01836 peer_endpoint->data(), &addr_len, ec));
01837 }
01838 else
01839 {
01840 new_socket.reset(socket_ops::accept(impl.socket_, 0, 0, ec));
01841 }
01842
01843 if (ec)
01844 {
01845 if (ec == asio::error::connection_aborted
01846 && !(impl.flags_ & implementation_type::enable_connection_aborted))
01847 {
01848
01849 continue;
01850 }
01851 else
01852 {
01853 return ec;
01854 }
01855 }
01856
01857 if (peer_endpoint)
01858 peer_endpoint->resize(addr_len);
01859
01860 peer.assign(impl.protocol_, new_socket.get(), ec);
01861 if (!ec)
01862 new_socket.release();
01863 return ec;
01864 }
01865 }
01866
01867 template <typename Socket, typename Handler>
01868 class accept_operation
01869 : public operation
01870 {
01871 public:
01872 accept_operation(win_iocp_io_service& io_service,
01873 socket_type socket, socket_type new_socket, Socket& peer,
01874 const protocol_type& protocol, endpoint_type* peer_endpoint,
01875 bool enable_connection_aborted, Handler handler)
01876 : operation(io_service,
01877 &accept_operation<Socket, Handler>::do_completion_impl,
01878 &accept_operation<Socket, Handler>::destroy_impl),
01879 io_service_(io_service),
01880 socket_(socket),
01881 new_socket_(new_socket),
01882 peer_(peer),
01883 protocol_(protocol),
01884 peer_endpoint_(peer_endpoint),
01885 work_(io_service.get_io_service()),
01886 enable_connection_aborted_(enable_connection_aborted),
01887 handler_(handler)
01888 {
01889 }
01890
01891 socket_type new_socket()
01892 {
01893 return new_socket_.get();
01894 }
01895
01896 void* output_buffer()
01897 {
01898 return output_buffer_;
01899 }
01900
01901 DWORD address_length()
01902 {
01903 return sizeof(sockaddr_storage_type) + 16;
01904 }
01905
01906 private:
01907 static void do_completion_impl(operation* op, DWORD last_error, size_t)
01908 {
01909
01910 typedef accept_operation<Socket, Handler> op_type;
01911 op_type* handler_op(static_cast<op_type*>(op));
01912 typedef handler_alloc_traits<Handler, op_type> alloc_traits;
01913 handler_ptr<alloc_traits> ptr(handler_op->handler_, handler_op);
01914
01915
01916 if (last_error == ERROR_NETNAME_DELETED)
01917 {
01918 last_error = WSAECONNABORTED;
01919 }
01920
01921
01922
01923 if (last_error == WSAECONNABORTED
01924 && !ptr.get()->enable_connection_aborted_)
01925 {
01926
01927 ptr.get()->Internal = 0;
01928 ptr.get()->InternalHigh = 0;
01929 ptr.get()->Offset = 0;
01930 ptr.get()->OffsetHigh = 0;
01931 ptr.get()->hEvent = 0;
01932
01933
01934
01935 asio::error_code ec;
01936 ptr.get()->new_socket_.reset();
01937 ptr.get()->new_socket_.reset(socket_ops::socket(
01938 ptr.get()->protocol_.family(), ptr.get()->protocol_.type(),
01939 ptr.get()->protocol_.protocol(), ec));
01940 if (ptr.get()->new_socket() != invalid_socket)
01941 {
01942
01943 DWORD bytes_read = 0;
01944 BOOL result = ::AcceptEx(ptr.get()->socket_, ptr.get()->new_socket(),
01945 ptr.get()->output_buffer(), 0, ptr.get()->address_length(),
01946 ptr.get()->address_length(), &bytes_read, ptr.get());
01947 last_error = ::WSAGetLastError();
01948
01949
01950 if (!result && last_error != WSA_IO_PENDING)
01951 {
01952 if (last_error == ERROR_NETNAME_DELETED
01953 || last_error == WSAECONNABORTED)
01954 {
01955
01956 ptr.get()->io_service_.post_completion(ptr.get(), last_error, 0);
01957 ptr.release();
01958 return;
01959 }
01960 else
01961 {
01962
01963 }
01964 }
01965 else
01966 {
01967
01968 ptr.release();
01969 return;
01970 }
01971 }
01972 }
01973
01974
01975 endpoint_type peer_endpoint;
01976 if (last_error == 0)
01977 {
01978 LPSOCKADDR local_addr = 0;
01979 int local_addr_length = 0;
01980 LPSOCKADDR remote_addr = 0;
01981 int remote_addr_length = 0;
01982 GetAcceptExSockaddrs(handler_op->output_buffer(), 0,
01983 handler_op->address_length(), handler_op->address_length(),
01984 &local_addr, &local_addr_length, &remote_addr, &remote_addr_length);
01985 if (static_cast<std::size_t>(remote_addr_length)
01986 > peer_endpoint.capacity())
01987 {
01988 last_error = WSAEINVAL;
01989 }
01990 else
01991 {
01992 using namespace std;
01993 memcpy(peer_endpoint.data(), remote_addr, remote_addr_length);
01994 peer_endpoint.resize(static_cast<std::size_t>(remote_addr_length));
01995 }
01996 }
01997
01998
01999
02000 if (last_error == 0)
02001 {
02002 SOCKET update_ctx_param = handler_op->socket_;
02003 asio::error_code ec;
02004 if (socket_ops::setsockopt(handler_op->new_socket_.get(),
02005 SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT,
02006 &update_ctx_param, sizeof(SOCKET), ec) != 0)
02007 {
02008 last_error = ec.value();
02009 }
02010 }
02011
02012
02013
02014 if (last_error == 0)
02015 {
02016 asio::error_code ec;
02017 handler_op->peer_.assign(handler_op->protocol_,
02018 native_type(handler_op->new_socket_.get(), peer_endpoint), ec);
02019 if (ec)
02020 last_error = ec.value();
02021 else
02022 handler_op->new_socket_.release();
02023 }
02024
02025
02026 if (handler_op->peer_endpoint_)
02027 *handler_op->peer_endpoint_ = peer_endpoint;
02028
02029
02030
02031 Handler handler(handler_op->handler_);
02032
02033
02034 ptr.reset();
02035
02036
02037 asio::error_code ec(last_error,
02038 asio::error::get_system_category());
02039 asio_handler_invoke_helpers::invoke(
02040 detail::bind_handler(handler, ec), &handler);
02041 }
02042
02043 static void destroy_impl(operation* op)
02044 {
02045
02046 typedef accept_operation<Socket, Handler> op_type;
02047 op_type* handler_op(static_cast<op_type*>(op));
02048 typedef handler_alloc_traits<Handler, op_type> alloc_traits;
02049 handler_ptr<alloc_traits> ptr(handler_op->handler_, handler_op);
02050
02051
02052
02053
02054
02055 Handler handler(handler_op->handler_);
02056 (void)handler;
02057
02058
02059 ptr.reset();
02060 }
02061
02062 win_iocp_io_service& io_service_;
02063 socket_type socket_;
02064 socket_holder new_socket_;
02065 Socket& peer_;
02066 protocol_type protocol_;
02067 endpoint_type* peer_endpoint_;
02068 asio::io_service::work work_;
02069 unsigned char output_buffer_[(sizeof(sockaddr_storage_type) + 16) * 2];
02070 bool enable_connection_aborted_;
02071 Handler handler_;
02072 };
02073
02074
02075
02076 template <typename Socket, typename Handler>
02077 void async_accept(implementation_type& impl, Socket& peer,
02078 endpoint_type* peer_endpoint, Handler handler)
02079 {
02080
02081 if (!is_open(impl))
02082 {
02083 this->get_io_service().post(bind_handler(handler,
02084 asio::error::bad_descriptor));
02085 return;
02086 }
02087
02088
02089 if (peer.is_open())
02090 {
02091 this->get_io_service().post(bind_handler(handler,
02092 asio::error::already_open));
02093 return;
02094 }
02095
02096 #if defined(ASIO_ENABLE_CANCELIO)
02097
02098 if (impl.safe_cancellation_thread_id_ == 0)
02099 impl.safe_cancellation_thread_id_ = ::GetCurrentThreadId();
02100 else if (impl.safe_cancellation_thread_id_ != ::GetCurrentThreadId())
02101 impl.safe_cancellation_thread_id_ = ~DWORD(0);
02102 #endif // defined(ASIO_ENABLE_CANCELIO)
02103
02104
02105 asio::error_code ec;
02106 socket_holder sock(socket_ops::socket(impl.protocol_.family(),
02107 impl.protocol_.type(), impl.protocol_.protocol(), ec));
02108 if (sock.get() == invalid_socket)
02109 {
02110 this->get_io_service().post(bind_handler(handler, ec));
02111 return;
02112 }
02113
02114
02115 typedef accept_operation<Socket, Handler> value_type;
02116 typedef handler_alloc_traits<Handler, value_type> alloc_traits;
02117 raw_handler_ptr<alloc_traits> raw_ptr(handler);
02118 socket_type new_socket = sock.get();
02119 bool enable_connection_aborted =
02120 (impl.flags_ & implementation_type::enable_connection_aborted);
02121 handler_ptr<alloc_traits> ptr(raw_ptr,
02122 iocp_service_, impl.socket_, new_socket, peer, impl.protocol_,
02123 peer_endpoint, enable_connection_aborted, handler);
02124 sock.release();
02125
02126
02127 DWORD bytes_read = 0;
02128 BOOL result = ::AcceptEx(impl.socket_, ptr.get()->new_socket(),
02129 ptr.get()->output_buffer(), 0, ptr.get()->address_length(),
02130 ptr.get()->address_length(), &bytes_read, ptr.get());
02131 DWORD last_error = ::WSAGetLastError();
02132
02133
02134 if (!result && last_error != WSA_IO_PENDING)
02135 {
02136 if (!enable_connection_aborted
02137 && (last_error == ERROR_NETNAME_DELETED
02138 || last_error == WSAECONNABORTED))
02139 {
02140
02141
02142
02143 iocp_service_.post_completion(ptr.get(), last_error, 0);
02144 ptr.release();
02145 }
02146 else
02147 {
02148 asio::io_service::work work(this->get_io_service());
02149 ptr.reset();
02150 asio::error_code ec(last_error,
02151 asio::error::get_system_category());
02152 iocp_service_.post(bind_handler(handler, ec));
02153 }
02154 }
02155 else
02156 {
02157 ptr.release();
02158 }
02159 }
02160
02161
02162 asio::error_code connect(implementation_type& impl,
02163 const endpoint_type& peer_endpoint, asio::error_code& ec)
02164 {
02165 if (!is_open(impl))
02166 {
02167 ec = asio::error::bad_descriptor;
02168 return ec;
02169 }
02170
02171
02172 socket_ops::connect(impl.socket_,
02173 peer_endpoint.data(), peer_endpoint.size(), ec);
02174 return ec;
02175 }
02176
02177 template <typename Handler>
02178 class connect_operation
02179 {
02180 public:
02181 connect_operation(socket_type socket, bool user_set_non_blocking,
02182 asio::io_service& io_service, Handler handler)
02183 : socket_(socket),
02184 user_set_non_blocking_(user_set_non_blocking),
02185 io_service_(io_service),
02186 work_(io_service),
02187 handler_(handler)
02188 {
02189 }
02190
02191 bool perform(asio::error_code& ec,
02192 std::size_t& bytes_transferred)
02193 {
02194
02195 if (ec)
02196 return true;
02197
02198
02199 int connect_error = 0;
02200 size_t connect_error_len = sizeof(connect_error);
02201 if (socket_ops::getsockopt(socket_, SOL_SOCKET, SO_ERROR,
02202 &connect_error, &connect_error_len, ec) == socket_error_retval)
02203 return true;
02204
02205
02206 if (connect_error)
02207 {
02208 ec = asio::error_code(connect_error,
02209 asio::error::get_system_category());
02210 return true;
02211 }
02212
02213
02214 if (!user_set_non_blocking_)
02215 {
02216 ioctl_arg_type non_blocking = 0;
02217 if (socket_ops::ioctl(socket_, FIONBIO, &non_blocking, ec))
02218 return true;
02219 }
02220
02221
02222 ec = asio::error_code();
02223 return true;
02224 }
02225
02226 void complete(const asio::error_code& ec, std::size_t)
02227 {
02228 io_service_.post(bind_handler(handler_, ec));
02229 }
02230
02231 private:
02232 socket_type socket_;
02233 bool user_set_non_blocking_;
02234 asio::io_service& io_service_;
02235 asio::io_service::work work_;
02236 Handler handler_;
02237 };
02238
02239
02240 template <typename Handler>
02241 void async_connect(implementation_type& impl,
02242 const endpoint_type& peer_endpoint, Handler handler)
02243 {
02244 if (!is_open(impl))
02245 {
02246 this->get_io_service().post(bind_handler(handler,
02247 asio::error::bad_descriptor));
02248 return;
02249 }
02250
02251 #if defined(ASIO_ENABLE_CANCELIO)
02252
02253 if (impl.safe_cancellation_thread_id_ == 0)
02254 impl.safe_cancellation_thread_id_ = ::GetCurrentThreadId();
02255 else if (impl.safe_cancellation_thread_id_ != ::GetCurrentThreadId())
02256 impl.safe_cancellation_thread_id_ = ~DWORD(0);
02257 #endif // defined(ASIO_ENABLE_CANCELIO)
02258
02259
02260 reactor_type* reactor = static_cast<reactor_type*>(
02261 interlocked_compare_exchange_pointer(
02262 reinterpret_cast<void**>(&reactor_), 0, 0));
02263 if (!reactor)
02264 {
02265 reactor = &(asio::use_service<reactor_type>(
02266 this->get_io_service()));
02267 interlocked_exchange_pointer(
02268 reinterpret_cast<void**>(&reactor_), reactor);
02269 }
02270
02271
02272
02273 ioctl_arg_type non_blocking = 1;
02274 asio::error_code ec;
02275 if (socket_ops::ioctl(impl.socket_, FIONBIO, &non_blocking, ec))
02276 {
02277 this->get_io_service().post(bind_handler(handler, ec));
02278 return;
02279 }
02280
02281
02282 if (socket_ops::connect(impl.socket_, peer_endpoint.data(),
02283 peer_endpoint.size(), ec) == 0)
02284 {
02285
02286 if (!(impl.flags_ & implementation_type::user_set_non_blocking))
02287 {
02288 non_blocking = 0;
02289 socket_ops::ioctl(impl.socket_, FIONBIO, &non_blocking, ec);
02290 }
02291
02292
02293
02294 this->get_io_service().post(bind_handler(handler, ec));
02295 }
02296 else if (ec == asio::error::in_progress
02297 || ec == asio::error::would_block)
02298 {
02299
02300
02301 boost::shared_ptr<bool> completed(new bool(false));
02302 reactor->start_connect_op(impl.socket_, impl.reactor_data_,
02303 connect_operation<Handler>(
02304 impl.socket_,
02305 (impl.flags_ & implementation_type::user_set_non_blocking) != 0,
02306 this->get_io_service(), handler));
02307 }
02308 else
02309 {
02310
02311 if (!(impl.flags_ & implementation_type::user_set_non_blocking))
02312 {
02313 non_blocking = 0;
02314 asio::error_code ignored_ec;
02315 socket_ops::ioctl(impl.socket_, FIONBIO, &non_blocking, ignored_ec);
02316 }
02317
02318
02319 this->get_io_service().post(bind_handler(handler, ec));
02320 }
02321 }
02322
02323 private:
02324
02325
02326 void close_for_destruction(implementation_type& impl)
02327 {
02328 if (is_open(impl))
02329 {
02330
02331
02332
02333 reactor_type* reactor = static_cast<reactor_type*>(
02334 interlocked_compare_exchange_pointer(
02335 reinterpret_cast<void**>(&reactor_), 0, 0));
02336 if (reactor)
02337 reactor->close_descriptor(impl.socket_, impl.reactor_data_);
02338
02339
02340
02341
02342 if (impl.flags_ & implementation_type::close_might_block)
02343 {
02344 ::linger opt;
02345 opt.l_onoff = 0;
02346 opt.l_linger = 0;
02347 asio::error_code ignored_ec;
02348 socket_ops::setsockopt(impl.socket_,
02349 SOL_SOCKET, SO_LINGER, &opt, sizeof(opt), ignored_ec);
02350 }
02351
02352 asio::error_code ignored_ec;
02353 socket_ops::close(impl.socket_, ignored_ec);
02354 impl.socket_ = invalid_socket;
02355 impl.flags_ = 0;
02356 impl.cancel_token_.reset();
02357 #if defined(ASIO_ENABLE_CANCELIO)
02358 impl.safe_cancellation_thread_id_ = 0;
02359 #endif // defined(ASIO_ENABLE_CANCELIO)
02360 }
02361 }
02362
02363
02364
02365
02366
02367 void* interlocked_compare_exchange_pointer(void** dest, void* exch, void* cmp)
02368 {
02369 #if defined(_M_IX86)
02370 return reinterpret_cast<void*>(InterlockedCompareExchange(
02371 reinterpret_cast<PLONG>(dest), reinterpret_cast<LONG>(exch),
02372 reinterpret_cast<LONG>(cmp)));
02373 #else
02374 return InterlockedCompareExchangePointer(dest, exch, cmp);
02375 #endif
02376 }
02377
02378
02379
02380
02381 void* interlocked_exchange_pointer(void** dest, void* val)
02382 {
02383 #if defined(_M_IX86)
02384 return reinterpret_cast<void*>(InterlockedExchange(
02385 reinterpret_cast<PLONG>(dest), reinterpret_cast<LONG>(val)));
02386 #else
02387 return InterlockedExchangePointer(dest, val);
02388 #endif
02389 }
02390
02391
02392
02393 win_iocp_io_service& iocp_service_;
02394
02395
02396
02397 reactor_type* reactor_;
02398
02399
02400 asio::detail::mutex mutex_;
02401
02402
02403 implementation_type* impl_list_;
02404 };
02405
02406 }
02407 }
02408
02409 #endif // defined(ASIO_HAS_IOCP)
02410
02411 #include "asio/detail/pop_options.hpp"
02412
02413 #endif // ASIO_DETAIL_WIN_IOCP_SOCKET_SERVICE_HPP