reactive_socket_service.hpp
Go to the documentation of this file.
00001 //
00002 // reactive_socket_service.hpp
00003 // ~~~~~~~~~~~~~~~~~~~~~~~~~~~
00004 //
00005 // Copyright (c) 2003-2008 Christopher M. Kohlhoff (chris at kohlhoff dot com)
00006 //
00007 // Distributed under the Boost Software License, Version 1.0. (See accompanying
00008 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
00009 //
00010 
00011 #ifndef ASIO_DETAIL_REACTIVE_SOCKET_SERVICE_HPP
00012 #define ASIO_DETAIL_REACTIVE_SOCKET_SERVICE_HPP
00013 
00014 #if defined(_MSC_VER) && (_MSC_VER >= 1200)
00015 # pragma once
00016 #endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
00017 
00018 #include "asio/detail/push_options.hpp"
00019 
00020 #include "asio/detail/push_options.hpp"
00021 #include <boost/shared_ptr.hpp>
00022 #include "asio/detail/pop_options.hpp"
00023 
00024 #include "asio/buffer.hpp"
00025 #include "asio/error.hpp"
00026 #include "asio/io_service.hpp"
00027 #include "asio/socket_base.hpp"
00028 #include "asio/detail/bind_handler.hpp"
00029 #include "asio/detail/handler_base_from_member.hpp"
00030 #include "asio/detail/noncopyable.hpp"
00031 #include "asio/detail/service_base.hpp"
00032 #include "asio/detail/socket_holder.hpp"
00033 #include "asio/detail/socket_ops.hpp"
00034 #include "asio/detail/socket_types.hpp"
00035 
00036 namespace asio {
00037 namespace detail {
00038 
00039 template <typename Protocol, typename Reactor>
00040 class reactive_socket_service
00041   : public asio::detail::service_base<
00042       reactive_socket_service<Protocol, Reactor> >
00043 {
00044 public:
00045   // The protocol type.
00046   typedef Protocol protocol_type;
00047 
00048   // The endpoint type.
00049   typedef typename Protocol::endpoint endpoint_type;
00050 
00051   // The native type of a socket.
00052   typedef socket_type native_type;
00053 
00054   // The implementation type of the socket.
00055   class implementation_type
00056     : private asio::detail::noncopyable
00057   {
00058   public:
00059     // Default constructor.
00060     implementation_type()
00061       : socket_(invalid_socket),
00062         flags_(0),
00063         protocol_(endpoint_type().protocol())
00064     {
00065     }
00066 
00067   private:
00068     // Only this service will have access to the internal values.
00069     friend class reactive_socket_service<Protocol, Reactor>;
00070 
00071     // The native socket representation.
00072     socket_type socket_;
00073 
00074     enum
00075     {
00076       user_set_non_blocking = 1, // The user wants a non-blocking socket.
00077       internal_non_blocking = 2, // The socket has been set non-blocking.
00078       enable_connection_aborted = 4, // User wants connection_aborted errors.
00079       user_set_linger = 8 // The user set the linger option.
00080     };
00081 
00082     // Flags indicating the current state of the socket.
00083     unsigned char flags_;
00084 
00085     // The protocol associated with the socket.
00086     protocol_type protocol_;
00087 
00088     // Per-descriptor data used by the reactor.
00089     typename Reactor::per_descriptor_data reactor_data_;
00090   };
00091 
00092   // The maximum number of buffers to support in a single operation.
00093   enum { max_buffers = 64 < max_iov_len ? 64 : max_iov_len };
00094 
00095   // Constructor.
00096   reactive_socket_service(asio::io_service& io_service)
00097     : asio::detail::service_base<
00098         reactive_socket_service<Protocol, Reactor> >(io_service),
00099       reactor_(asio::use_service<Reactor>(io_service))
00100   {
00101   }
00102 
00103   // Destroy all user-defined handler objects owned by the service.
00104   void shutdown_service()
00105   {
00106   }
00107 
00108   // Construct a new socket implementation.
00109   void construct(implementation_type& impl)
00110   {
00111     impl.socket_ = invalid_socket;
00112     impl.flags_ = 0;
00113   }
00114 
00115   // Destroy a socket implementation.
00116   void destroy(implementation_type& impl)
00117   {
00118     if (impl.socket_ != invalid_socket)
00119     {
00120       reactor_.close_descriptor(impl.socket_, impl.reactor_data_);
00121 
00122       if (impl.flags_ & implementation_type::internal_non_blocking)
00123       {
00124         ioctl_arg_type non_blocking = 0;
00125         asio::error_code ignored_ec;
00126         socket_ops::ioctl(impl.socket_, FIONBIO, &non_blocking, ignored_ec);
00127         impl.flags_ &= ~implementation_type::internal_non_blocking;
00128       }
00129 
00130       if (impl.flags_ & implementation_type::user_set_linger)
00131       {
00132         ::linger opt;
00133         opt.l_onoff = 0;
00134         opt.l_linger = 0;
00135         asio::error_code ignored_ec;
00136         socket_ops::setsockopt(impl.socket_,
00137             SOL_SOCKET, SO_LINGER, &opt, sizeof(opt), ignored_ec);
00138       }
00139 
00140       asio::error_code ignored_ec;
00141       socket_ops::close(impl.socket_, ignored_ec);
00142 
00143       impl.socket_ = invalid_socket;
00144     }
00145   }
00146 
00147   // Open a new socket implementation.
00148   asio::error_code open(implementation_type& impl,
00149       const protocol_type& protocol, asio::error_code& ec)
00150   {
00151     if (is_open(impl))
00152     {
00153       ec = asio::error::already_open;
00154       return ec;
00155     }
00156 
00157     socket_holder sock(socket_ops::socket(protocol.family(),
00158           protocol.type(), protocol.protocol(), ec));
00159     if (sock.get() == invalid_socket)
00160       return ec;
00161 
00162     if (int err = reactor_.register_descriptor(sock.get(), impl.reactor_data_))
00163     {
00164       ec = asio::error_code(err,
00165           asio::error::get_system_category());
00166       return ec;
00167     }
00168 
00169     impl.socket_ = sock.release();
00170     impl.flags_ = 0;
00171     impl.protocol_ = protocol;
00172     ec = asio::error_code();
00173     return ec;
00174   }
00175 
00176   // Assign a native socket to a socket implementation.
00177   asio::error_code assign(implementation_type& impl,
00178       const protocol_type& protocol, const native_type& native_socket,
00179       asio::error_code& ec)
00180   {
00181     if (is_open(impl))
00182     {
00183       ec = asio::error::already_open;
00184       return ec;
00185     }
00186 
00187     if (int err = reactor_.register_descriptor(
00188           native_socket, impl.reactor_data_))
00189     {
00190       ec = asio::error_code(err,
00191           asio::error::get_system_category());
00192       return ec;
00193     }
00194 
00195     impl.socket_ = native_socket;
00196     impl.flags_ = 0;
00197     impl.protocol_ = protocol;
00198     ec = asio::error_code();
00199     return ec;
00200   }
00201 
00202   // Determine whether the socket is open.
00203   bool is_open(const implementation_type& impl) const
00204   {
00205     return impl.socket_ != invalid_socket;
00206   }
00207 
00208   // Destroy a socket implementation.
00209   asio::error_code close(implementation_type& impl,
00210       asio::error_code& ec)
00211   {
00212     if (is_open(impl))
00213     {
00214       reactor_.close_descriptor(impl.socket_, impl.reactor_data_);
00215 
00216       if (impl.flags_ & implementation_type::internal_non_blocking)
00217       {
00218         ioctl_arg_type non_blocking = 0;
00219         asio::error_code ignored_ec;
00220         socket_ops::ioctl(impl.socket_, FIONBIO, &non_blocking, ignored_ec);
00221         impl.flags_ &= ~implementation_type::internal_non_blocking;
00222       }
00223 
00224       if (socket_ops::close(impl.socket_, ec) == socket_error_retval)
00225         return ec;
00226 
00227       impl.socket_ = invalid_socket;
00228     }
00229 
00230     ec = asio::error_code();
00231     return ec;
00232   }
00233 
00234   // Get the native socket representation.
00235   native_type native(implementation_type& impl)
00236   {
00237     return impl.socket_;
00238   }
00239 
00240   // Cancel all operations associated with the socket.
00241   asio::error_code cancel(implementation_type& impl,
00242       asio::error_code& ec)
00243   {
00244     if (!is_open(impl))
00245     {
00246       ec = asio::error::bad_descriptor;
00247       return ec;
00248     }
00249 
00250     reactor_.cancel_ops(impl.socket_, impl.reactor_data_);
00251     ec = asio::error_code();
00252     return ec;
00253   }
00254 
00255   // Determine whether the socket is at the out-of-band data mark.
00256   bool at_mark(const implementation_type& impl,
00257       asio::error_code& ec) const
00258   {
00259     if (!is_open(impl))
00260     {
00261       ec = asio::error::bad_descriptor;
00262       return false;
00263     }
00264 
00265     asio::detail::ioctl_arg_type value = 0;
00266     socket_ops::ioctl(impl.socket_, SIOCATMARK, &value, ec);
00267 #if defined(ENOTTY)
00268     if (ec.value() == ENOTTY)
00269       ec = asio::error::not_socket;
00270 #endif // defined(ENOTTY)
00271     return ec ? false : value != 0;
00272   }
00273 
00274   // Determine the number of bytes available for reading.
00275   std::size_t available(const implementation_type& impl,
00276       asio::error_code& ec) const
00277   {
00278     if (!is_open(impl))
00279     {
00280       ec = asio::error::bad_descriptor;
00281       return 0;
00282     }
00283 
00284     asio::detail::ioctl_arg_type value = 0;
00285     socket_ops::ioctl(impl.socket_, FIONREAD, &value, ec);
00286 #if defined(ENOTTY)
00287     if (ec.value() == ENOTTY)
00288       ec = asio::error::not_socket;
00289 #endif // defined(ENOTTY)
00290     return ec ? static_cast<std::size_t>(0) : static_cast<std::size_t>(value);
00291   }
00292 
00293   // Bind the socket to the specified local endpoint.
00294   asio::error_code bind(implementation_type& impl,
00295       const endpoint_type& endpoint, asio::error_code& ec)
00296   {
00297     if (!is_open(impl))
00298     {
00299       ec = asio::error::bad_descriptor;
00300       return ec;
00301     }
00302 
00303     socket_ops::bind(impl.socket_, endpoint.data(), endpoint.size(), ec);
00304     return ec;
00305   }
00306 
00307   // Place the socket into the state where it will listen for new connections.
00308   asio::error_code listen(implementation_type& impl, int backlog,
00309       asio::error_code& ec)
00310   {
00311     if (!is_open(impl))
00312     {
00313       ec = asio::error::bad_descriptor;
00314       return ec;
00315     }
00316 
00317     socket_ops::listen(impl.socket_, backlog, ec);
00318     return ec;
00319   }
00320 
00321   // Set a socket option.
00322   template <typename Option>
00323   asio::error_code set_option(implementation_type& impl,
00324       const Option& option, asio::error_code& ec)
00325   {
00326     if (!is_open(impl))
00327     {
00328       ec = asio::error::bad_descriptor;
00329       return ec;
00330     }
00331 
00332     if (option.level(impl.protocol_) == custom_socket_option_level
00333         && option.name(impl.protocol_) == enable_connection_aborted_option)
00334     {
00335       if (option.size(impl.protocol_) != sizeof(int))
00336       {
00337         ec = asio::error::invalid_argument;
00338       }
00339       else
00340       {
00341         if (*reinterpret_cast<const int*>(option.data(impl.protocol_)))
00342           impl.flags_ |= implementation_type::enable_connection_aborted;
00343         else
00344           impl.flags_ &= ~implementation_type::enable_connection_aborted;
00345         ec = asio::error_code();
00346       }
00347       return ec;
00348     }
00349     else
00350     {
00351       if (option.level(impl.protocol_) == SOL_SOCKET
00352           && option.name(impl.protocol_) == SO_LINGER)
00353       {
00354         impl.flags_ |= implementation_type::user_set_linger;
00355       }
00356 
00357       socket_ops::setsockopt(impl.socket_,
00358           option.level(impl.protocol_), option.name(impl.protocol_),
00359           option.data(impl.protocol_), option.size(impl.protocol_), ec);
00360 
00361 #if defined(__MACH__) && defined(__APPLE__) \
00362 || defined(__NetBSD__) || defined(__FreeBSD__) || defined(__OpenBSD__)
00363       // To implement portable behaviour for SO_REUSEADDR with UDP sockets we
00364       // need to also set SO_REUSEPORT on BSD-based platforms.
00365       if (!ec && impl.protocol_.type() == SOCK_DGRAM
00366           && option.level(impl.protocol_) == SOL_SOCKET
00367           && option.name(impl.protocol_) == SO_REUSEADDR)
00368       {
00369         asio::error_code ignored_ec;
00370         socket_ops::setsockopt(impl.socket_, SOL_SOCKET, SO_REUSEPORT,
00371             option.data(impl.protocol_), option.size(impl.protocol_),
00372             ignored_ec);
00373       }
00374 #endif
00375 
00376       return ec;
00377     }
00378   }
00379 
00380   // Set a socket option.
00381   template <typename Option>
00382   asio::error_code get_option(const implementation_type& impl,
00383       Option& option, asio::error_code& ec) const
00384   {
00385     if (!is_open(impl))
00386     {
00387       ec = asio::error::bad_descriptor;
00388       return ec;
00389     }
00390 
00391     if (option.level(impl.protocol_) == custom_socket_option_level
00392         && option.name(impl.protocol_) == enable_connection_aborted_option)
00393     {
00394       if (option.size(impl.protocol_) != sizeof(int))
00395       {
00396         ec = asio::error::invalid_argument;
00397       }
00398       else
00399       {
00400         int* target = reinterpret_cast<int*>(option.data(impl.protocol_));
00401         if (impl.flags_ & implementation_type::enable_connection_aborted)
00402           *target = 1;
00403         else
00404           *target = 0;
00405         option.resize(impl.protocol_, sizeof(int));
00406         ec = asio::error_code();
00407       }
00408       return ec;
00409     }
00410     else
00411     {
00412       size_t size = option.size(impl.protocol_);
00413       socket_ops::getsockopt(impl.socket_,
00414           option.level(impl.protocol_), option.name(impl.protocol_),
00415           option.data(impl.protocol_), &size, ec);
00416       if (!ec)
00417         option.resize(impl.protocol_, size);
00418       return ec;
00419     }
00420   }
00421 
00422   // Perform an IO control command on the socket.
00423   template <typename IO_Control_Command>
00424   asio::error_code io_control(implementation_type& impl,
00425       IO_Control_Command& command, asio::error_code& ec)
00426   {
00427     if (!is_open(impl))
00428     {
00429       ec = asio::error::bad_descriptor;
00430       return ec;
00431     }
00432 
00433     if (command.name() == static_cast<int>(FIONBIO))
00434     {
00435       if (command.get())
00436         impl.flags_ |= implementation_type::user_set_non_blocking;
00437       else
00438         impl.flags_ &= ~implementation_type::user_set_non_blocking;
00439       ec = asio::error_code();
00440     }
00441     else
00442     {
00443       socket_ops::ioctl(impl.socket_, command.name(),
00444           static_cast<ioctl_arg_type*>(command.data()), ec);
00445     }
00446     return ec;
00447   }
00448 
00449   // Get the local endpoint.
00450   endpoint_type local_endpoint(const implementation_type& impl,
00451       asio::error_code& ec) const
00452   {
00453     if (!is_open(impl))
00454     {
00455       ec = asio::error::bad_descriptor;
00456       return endpoint_type();
00457     }
00458 
00459     endpoint_type endpoint;
00460     std::size_t addr_len = endpoint.capacity();
00461     if (socket_ops::getsockname(impl.socket_, endpoint.data(), &addr_len, ec))
00462       return endpoint_type();
00463     endpoint.resize(addr_len);
00464     return endpoint;
00465   }
00466 
00467   // Get the remote endpoint.
00468   endpoint_type remote_endpoint(const implementation_type& impl,
00469       asio::error_code& ec) const
00470   {
00471     if (!is_open(impl))
00472     {
00473       ec = asio::error::bad_descriptor;
00474       return endpoint_type();
00475     }
00476 
00477     endpoint_type endpoint;
00478     std::size_t addr_len = endpoint.capacity();
00479     if (socket_ops::getpeername(impl.socket_, endpoint.data(), &addr_len, ec))
00480       return endpoint_type();
00481     endpoint.resize(addr_len);
00482     return endpoint;
00483   }
00484 
00486   asio::error_code shutdown(implementation_type& impl,
00487       socket_base::shutdown_type what, asio::error_code& ec)
00488   {
00489     if (!is_open(impl))
00490     {
00491       ec = asio::error::bad_descriptor;
00492       return ec;
00493     }
00494 
00495     socket_ops::shutdown(impl.socket_, what, ec);
00496     return ec;
00497   }
00498 
00499   // Send the given data to the peer.
00500   template <typename ConstBufferSequence>
00501   size_t send(implementation_type& impl, const ConstBufferSequence& buffers,
00502       socket_base::message_flags flags, asio::error_code& ec)
00503   {
00504     if (!is_open(impl))
00505     {
00506       ec = asio::error::bad_descriptor;
00507       return 0;
00508     }
00509 
00510     // Copy buffers into array.
00511     socket_ops::buf bufs[max_buffers];
00512     typename ConstBufferSequence::const_iterator iter = buffers.begin();
00513     typename ConstBufferSequence::const_iterator end = buffers.end();
00514     size_t i = 0;
00515     size_t total_buffer_size = 0;
00516     for (; iter != end && i < max_buffers; ++iter, ++i)
00517     {
00518       asio::const_buffer buffer(*iter);
00519       socket_ops::init_buf(bufs[i],
00520           asio::buffer_cast<const void*>(buffer),
00521           asio::buffer_size(buffer));
00522       total_buffer_size += asio::buffer_size(buffer);
00523     }
00524 
00525     // A request to receive 0 bytes on a stream socket is a no-op.
00526     if (impl.protocol_.type() == SOCK_STREAM && total_buffer_size == 0)
00527     {
00528       ec = asio::error_code();
00529       return 0;
00530     }
00531 
00532     // Make socket non-blocking if user wants non-blocking.
00533     if (impl.flags_ & implementation_type::user_set_non_blocking)
00534     {
00535       if (!(impl.flags_ & implementation_type::internal_non_blocking))
00536       {
00537         ioctl_arg_type non_blocking = 1;
00538         if (socket_ops::ioctl(impl.socket_, FIONBIO, &non_blocking, ec))
00539           return 0;
00540         impl.flags_ |= implementation_type::internal_non_blocking;
00541       }
00542     }
00543 
00544     // Send the data.
00545     for (;;)
00546     {
00547       // Try to complete the operation without blocking.
00548       int bytes_sent = socket_ops::send(impl.socket_, bufs, i, flags, ec);
00549 
00550       // Check if operation succeeded.
00551       if (bytes_sent >= 0)
00552         return bytes_sent;
00553 
00554       // Operation failed.
00555       if ((impl.flags_ & implementation_type::user_set_non_blocking)
00556           || (ec != asio::error::would_block
00557             && ec != asio::error::try_again))
00558         return 0;
00559 
00560       // Wait for socket to become ready.
00561       if (socket_ops::poll_write(impl.socket_, ec) < 0)
00562         return 0;
00563     }
00564   }
00565 
00566   // Wait until data can be sent without blocking.
00567   size_t send(implementation_type& impl, const null_buffers&,
00568       socket_base::message_flags, asio::error_code& ec)
00569   {
00570     if (!is_open(impl))
00571     {
00572       ec = asio::error::bad_descriptor;
00573       return 0;
00574     }
00575 
00576     // Wait for socket to become ready.
00577     socket_ops::poll_write(impl.socket_, ec);
00578 
00579     return 0;
00580   }
00581 
00582   template <typename ConstBufferSequence, typename Handler>
00583   class send_operation :
00584     public handler_base_from_member<Handler>
00585   {
00586   public:
00587     send_operation(socket_type socket, asio::io_service& io_service,
00588         const ConstBufferSequence& buffers, socket_base::message_flags flags,
00589         Handler handler)
00590       : handler_base_from_member<Handler>(handler),
00591         socket_(socket),
00592         io_service_(io_service),
00593         work_(io_service),
00594         buffers_(buffers),
00595         flags_(flags)
00596     {
00597     }
00598 
00599     bool perform(asio::error_code& ec,
00600         std::size_t& bytes_transferred)
00601     {
00602       // Check whether the operation was successful.
00603       if (ec)
00604       {
00605         bytes_transferred = 0;
00606         return true;
00607       }
00608 
00609       // Copy buffers into array.
00610       socket_ops::buf bufs[max_buffers];
00611       typename ConstBufferSequence::const_iterator iter = buffers_.begin();
00612       typename ConstBufferSequence::const_iterator end = buffers_.end();
00613       size_t i = 0;
00614       for (; iter != end && i < max_buffers; ++iter, ++i)
00615       {
00616         asio::const_buffer buffer(*iter);
00617         socket_ops::init_buf(bufs[i],
00618             asio::buffer_cast<const void*>(buffer),
00619             asio::buffer_size(buffer));
00620       }
00621 
00622       // Send the data.
00623       int bytes = socket_ops::send(socket_, bufs, i, flags_, ec);
00624 
00625       // Check if we need to run the operation again.
00626       if (ec == asio::error::would_block
00627           || ec == asio::error::try_again)
00628         return false;
00629 
00630       bytes_transferred = (bytes < 0 ? 0 : bytes);
00631       return true;
00632     }
00633 
00634     void complete(const asio::error_code& ec,
00635         std::size_t bytes_transferred)
00636     {
00637       io_service_.post(bind_handler(this->handler_, ec, bytes_transferred));
00638     }
00639 
00640   private:
00641     socket_type socket_;
00642     asio::io_service& io_service_;
00643     asio::io_service::work work_;
00644     ConstBufferSequence buffers_;
00645     socket_base::message_flags flags_;
00646   };
00647 
00648   // Start an asynchronous send. The data being sent must be valid for the
00649   // lifetime of the asynchronous operation.
00650   template <typename ConstBufferSequence, typename Handler>
00651   void async_send(implementation_type& impl, const ConstBufferSequence& buffers,
00652       socket_base::message_flags flags, Handler handler)
00653   {
00654     if (!is_open(impl))
00655     {
00656       this->get_io_service().post(bind_handler(handler,
00657             asio::error::bad_descriptor, 0));
00658     }
00659     else
00660     {
00661       if (impl.protocol_.type() == SOCK_STREAM)
00662       {
00663         // Determine total size of buffers.
00664         typename ConstBufferSequence::const_iterator iter = buffers.begin();
00665         typename ConstBufferSequence::const_iterator end = buffers.end();
00666         size_t i = 0;
00667         size_t total_buffer_size = 0;
00668         for (; iter != end && i < max_buffers; ++iter, ++i)
00669         {
00670           asio::const_buffer buffer(*iter);
00671           total_buffer_size += asio::buffer_size(buffer);
00672         }
00673 
00674         // A request to receive 0 bytes on a stream socket is a no-op.
00675         if (total_buffer_size == 0)
00676         {
00677           this->get_io_service().post(bind_handler(handler,
00678                 asio::error_code(), 0));
00679           return;
00680         }
00681       }
00682 
00683       // Make socket non-blocking.
00684       if (!(impl.flags_ & implementation_type::internal_non_blocking))
00685       {
00686         ioctl_arg_type non_blocking = 1;
00687         asio::error_code ec;
00688         if (socket_ops::ioctl(impl.socket_, FIONBIO, &non_blocking, ec))
00689         {
00690           this->get_io_service().post(bind_handler(handler, ec, 0));
00691           return;
00692         }
00693         impl.flags_ |= implementation_type::internal_non_blocking;
00694       }
00695 
00696       reactor_.start_write_op(impl.socket_, impl.reactor_data_,
00697           send_operation<ConstBufferSequence, Handler>(
00698             impl.socket_, this->get_io_service(), buffers, flags, handler));
00699     }
00700   }
00701 
00702   template <typename Handler>
00703   class null_buffers_operation :
00704     public handler_base_from_member<Handler>
00705   {
00706   public:
00707     null_buffers_operation(asio::io_service& io_service, Handler handler)
00708       : handler_base_from_member<Handler>(handler),
00709         work_(io_service)
00710     {
00711     }
00712 
00713     bool perform(asio::error_code&,
00714         std::size_t& bytes_transferred)
00715     {
00716       bytes_transferred = 0;
00717       return true;
00718     }
00719 
00720     void complete(const asio::error_code& ec,
00721         std::size_t bytes_transferred)
00722     {
00723       work_.get_io_service().post(bind_handler(
00724             this->handler_, ec, bytes_transferred));
00725     }
00726 
00727   private:
00728     asio::io_service::work work_;
00729   };
00730 
00731   // Start an asynchronous wait until data can be sent without blocking.
00732   template <typename Handler>
00733   void async_send(implementation_type& impl, const null_buffers&,
00734       socket_base::message_flags, Handler handler)
00735   {
00736     if (!is_open(impl))
00737     {
00738       this->get_io_service().post(bind_handler(handler,
00739             asio::error::bad_descriptor, 0));
00740     }
00741     else
00742     {
00743       reactor_.start_write_op(impl.socket_, impl.reactor_data_,
00744           null_buffers_operation<Handler>(this->get_io_service(), handler),
00745           false);
00746     }
00747   }
00748 
00749   // Send a datagram to the specified endpoint. Returns the number of bytes
00750   // sent.
00751   template <typename ConstBufferSequence>
00752   size_t send_to(implementation_type& impl, const ConstBufferSequence& buffers,
00753       const endpoint_type& destination, socket_base::message_flags flags,
00754       asio::error_code& ec)
00755   {
00756     if (!is_open(impl))
00757     {
00758       ec = asio::error::bad_descriptor;
00759       return 0;
00760     }
00761 
00762     // Copy buffers into array.
00763     socket_ops::buf bufs[max_buffers];
00764     typename ConstBufferSequence::const_iterator iter = buffers.begin();
00765     typename ConstBufferSequence::const_iterator end = buffers.end();
00766     size_t i = 0;
00767     for (; iter != end && i < max_buffers; ++iter, ++i)
00768     {
00769       asio::const_buffer buffer(*iter);
00770       socket_ops::init_buf(bufs[i],
00771           asio::buffer_cast<const void*>(buffer),
00772           asio::buffer_size(buffer));
00773     }
00774 
00775     // Make socket non-blocking if user wants non-blocking.
00776     if (impl.flags_ & implementation_type::user_set_non_blocking)
00777     {
00778       if (!(impl.flags_ & implementation_type::internal_non_blocking))
00779       {
00780         ioctl_arg_type non_blocking = 1;
00781         if (socket_ops::ioctl(impl.socket_, FIONBIO, &non_blocking, ec))
00782           return 0;
00783         impl.flags_ |= implementation_type::internal_non_blocking;
00784       }
00785     }
00786 
00787     // Send the data.
00788     for (;;)
00789     {
00790       // Try to complete the operation without blocking.
00791       int bytes_sent = socket_ops::sendto(impl.socket_, bufs, i, flags,
00792           destination.data(), destination.size(), ec);
00793 
00794       // Check if operation succeeded.
00795       if (bytes_sent >= 0)
00796         return bytes_sent;
00797 
00798       // Operation failed.
00799       if ((impl.flags_ & implementation_type::user_set_non_blocking)
00800           || (ec != asio::error::would_block
00801             && ec != asio::error::try_again))
00802         return 0;
00803 
00804       // Wait for socket to become ready.
00805       if (socket_ops::poll_write(impl.socket_, ec) < 0)
00806         return 0;
00807     }
00808   }
00809 
00810   // Wait until data can be sent without blocking.
00811   size_t send_to(implementation_type& impl, const null_buffers&,
00812       socket_base::message_flags, const endpoint_type&,
00813       asio::error_code& ec)
00814   {
00815     if (!is_open(impl))
00816     {
00817       ec = asio::error::bad_descriptor;
00818       return 0;
00819     }
00820 
00821     // Wait for socket to become ready.
00822     socket_ops::poll_write(impl.socket_, ec);
00823 
00824     return 0;
00825   }
00826 
00827   template <typename ConstBufferSequence, typename Handler>
00828   class send_to_operation :
00829     public handler_base_from_member<Handler>
00830   {
00831   public:
00832     send_to_operation(socket_type socket, asio::io_service& io_service,
00833         const ConstBufferSequence& buffers, const endpoint_type& endpoint,
00834         socket_base::message_flags flags, Handler handler)
00835       : handler_base_from_member<Handler>(handler),
00836         socket_(socket),
00837         io_service_(io_service),
00838         work_(io_service),
00839         buffers_(buffers),
00840         destination_(endpoint),
00841         flags_(flags)
00842     {
00843     }
00844 
00845     bool perform(asio::error_code& ec,
00846         std::size_t& bytes_transferred)
00847     {
00848       // Check whether the operation was successful.
00849       if (ec)
00850       {
00851         bytes_transferred = 0;
00852         return true;
00853       }
00854 
00855       // Copy buffers into array.
00856       socket_ops::buf bufs[max_buffers];
00857       typename ConstBufferSequence::const_iterator iter = buffers_.begin();
00858       typename ConstBufferSequence::const_iterator end = buffers_.end();
00859       size_t i = 0;
00860       for (; iter != end && i < max_buffers; ++iter, ++i)
00861       {
00862         asio::const_buffer buffer(*iter);
00863         socket_ops::init_buf(bufs[i],
00864             asio::buffer_cast<const void*>(buffer),
00865             asio::buffer_size(buffer));
00866       }
00867 
00868       // Send the data.
00869       int bytes = socket_ops::sendto(socket_, bufs, i, flags_,
00870           destination_.data(), destination_.size(), ec);
00871 
00872       // Check if we need to run the operation again.
00873       if (ec == asio::error::would_block
00874           || ec == asio::error::try_again)
00875         return false;
00876 
00877       bytes_transferred = (bytes < 0 ? 0 : bytes);
00878       return true;
00879     }
00880 
00881     void complete(const asio::error_code& ec,
00882         std::size_t bytes_transferred)
00883     {
00884       io_service_.post(bind_handler(this->handler_, ec, bytes_transferred));
00885     }
00886 
00887   private:
00888     socket_type socket_;
00889     asio::io_service& io_service_;
00890     asio::io_service::work work_;
00891     ConstBufferSequence buffers_;
00892     endpoint_type destination_;
00893     socket_base::message_flags flags_;
00894   };
00895 
00896   // Start an asynchronous send. The data being sent must be valid for the
00897   // lifetime of the asynchronous operation.
00898   template <typename ConstBufferSequence, typename Handler>
00899   void async_send_to(implementation_type& impl,
00900       const ConstBufferSequence& buffers,
00901       const endpoint_type& destination, socket_base::message_flags flags,
00902       Handler handler)
00903   {
00904     if (!is_open(impl))
00905     {
00906       this->get_io_service().post(bind_handler(handler,
00907             asio::error::bad_descriptor, 0));
00908     }
00909     else
00910     {
00911       // Make socket non-blocking.
00912       if (!(impl.flags_ & implementation_type::internal_non_blocking))
00913       {
00914         ioctl_arg_type non_blocking = 1;
00915         asio::error_code ec;
00916         if (socket_ops::ioctl(impl.socket_, FIONBIO, &non_blocking, ec))
00917         {
00918           this->get_io_service().post(bind_handler(handler, ec, 0));
00919           return;
00920         }
00921         impl.flags_ |= implementation_type::internal_non_blocking;
00922       }
00923 
00924       reactor_.start_write_op(impl.socket_, impl.reactor_data_,
00925           send_to_operation<ConstBufferSequence, Handler>(
00926             impl.socket_, this->get_io_service(), buffers,
00927             destination, flags, handler));
00928     }
00929   }
00930 
00931   // Start an asynchronous wait until data can be sent without blocking.
00932   template <typename Handler>
00933   void async_send_to(implementation_type& impl, const null_buffers&,
00934       socket_base::message_flags, const endpoint_type&, Handler handler)
00935   {
00936     if (!is_open(impl))
00937     {
00938       this->get_io_service().post(bind_handler(handler,
00939             asio::error::bad_descriptor, 0));
00940     }
00941     else
00942     {
00943       reactor_.start_write_op(impl.socket_, impl.reactor_data_,
00944           null_buffers_operation<Handler>(this->get_io_service(), handler),
00945           false);
00946     }
00947   }
00948 
00949   // Receive some data from the peer. Returns the number of bytes received.
00950   template <typename MutableBufferSequence>
00951   size_t receive(implementation_type& impl,
00952       const MutableBufferSequence& buffers,
00953       socket_base::message_flags flags, asio::error_code& ec)
00954   {
00955     if (!is_open(impl))
00956     {
00957       ec = asio::error::bad_descriptor;
00958       return 0;
00959     }
00960 
00961     // Copy buffers into array.
00962     socket_ops::buf bufs[max_buffers];
00963     typename MutableBufferSequence::const_iterator iter = buffers.begin();
00964     typename MutableBufferSequence::const_iterator end = buffers.end();
00965     size_t i = 0;
00966     size_t total_buffer_size = 0;
00967     for (; iter != end && i < max_buffers; ++iter, ++i)
00968     {
00969       asio::mutable_buffer buffer(*iter);
00970       socket_ops::init_buf(bufs[i],
00971           asio::buffer_cast<void*>(buffer),
00972           asio::buffer_size(buffer));
00973       total_buffer_size += asio::buffer_size(buffer);
00974     }
00975 
00976     // A request to receive 0 bytes on a stream socket is a no-op.
00977     if (impl.protocol_.type() == SOCK_STREAM && total_buffer_size == 0)
00978     {
00979       ec = asio::error_code();
00980       return 0;
00981     }
00982 
00983     // Make socket non-blocking if user wants non-blocking.
00984     if (impl.flags_ & implementation_type::user_set_non_blocking)
00985     {
00986       if (!(impl.flags_ & implementation_type::internal_non_blocking))
00987       {
00988         ioctl_arg_type non_blocking = 1;
00989         if (socket_ops::ioctl(impl.socket_, FIONBIO, &non_blocking, ec))
00990           return 0;
00991         impl.flags_ |= implementation_type::internal_non_blocking;
00992       }
00993     }
00994 
00995     // Receive some data.
00996     for (;;)
00997     {
00998       // Try to complete the operation without blocking.
00999       int bytes_recvd = socket_ops::recv(impl.socket_, bufs, i, flags, ec);
01000 
01001       // Check if operation succeeded.
01002       if (bytes_recvd > 0)
01003         return bytes_recvd;
01004 
01005       // Check for EOF.
01006       if (bytes_recvd == 0 && impl.protocol_.type() == SOCK_STREAM)
01007       {
01008         ec = asio::error::eof;
01009         return 0;
01010       }
01011 
01012       // Operation failed.
01013       if ((impl.flags_ & implementation_type::user_set_non_blocking)
01014           || (ec != asio::error::would_block
01015             && ec != asio::error::try_again))
01016         return 0;
01017 
01018       // Wait for socket to become ready.
01019       if (socket_ops::poll_read(impl.socket_, ec) < 0)
01020         return 0;
01021     }
01022   }
01023 
01024   // Wait until data can be received without blocking.
01025   size_t receive(implementation_type& impl, const null_buffers&,
01026       socket_base::message_flags, asio::error_code& ec)
01027   {
01028     if (!is_open(impl))
01029     {
01030       ec = asio::error::bad_descriptor;
01031       return 0;
01032     }
01033 
01034     // Wait for socket to become ready.
01035     socket_ops::poll_read(impl.socket_, ec);
01036 
01037     return 0;
01038   }
01039 
01040   template <typename MutableBufferSequence, typename Handler>
01041   class receive_operation :
01042     public handler_base_from_member<Handler>
01043   {
01044   public:
01045     receive_operation(socket_type socket, int protocol_type,
01046         asio::io_service& io_service,
01047         const MutableBufferSequence& buffers,
01048         socket_base::message_flags flags, Handler handler)
01049       : handler_base_from_member<Handler>(handler),
01050         socket_(socket),
01051         protocol_type_(protocol_type),
01052         io_service_(io_service),
01053         work_(io_service),
01054         buffers_(buffers),
01055         flags_(flags)
01056     {
01057     }
01058 
01059     bool perform(asio::error_code& ec,
01060         std::size_t& bytes_transferred)
01061     {
01062       // Check whether the operation was successful.
01063       if (ec)
01064       {
01065         bytes_transferred = 0;
01066         return true;
01067       }
01068 
01069       // Copy buffers into array.
01070       socket_ops::buf bufs[max_buffers];
01071       typename MutableBufferSequence::const_iterator iter = buffers_.begin();
01072       typename MutableBufferSequence::const_iterator end = buffers_.end();
01073       size_t i = 0;
01074       for (; iter != end && i < max_buffers; ++iter, ++i)
01075       {
01076         asio::mutable_buffer buffer(*iter);
01077         socket_ops::init_buf(bufs[i],
01078             asio::buffer_cast<void*>(buffer),
01079             asio::buffer_size(buffer));
01080       }
01081 
01082       // Receive some data.
01083       int bytes = socket_ops::recv(socket_, bufs, i, flags_, ec);
01084       if (bytes == 0 && protocol_type_ == SOCK_STREAM)
01085         ec = asio::error::eof;
01086 
01087       // Check if we need to run the operation again.
01088       if (ec == asio::error::would_block
01089           || ec == asio::error::try_again)
01090         return false;
01091 
01092       bytes_transferred = (bytes < 0 ? 0 : bytes);
01093       return true;
01094     }
01095 
01096     void complete(const asio::error_code& ec,
01097         std::size_t bytes_transferred)
01098     {
01099       io_service_.post(bind_handler(this->handler_, ec, bytes_transferred));
01100     }
01101 
01102   private:
01103     socket_type socket_;
01104     int protocol_type_;
01105     asio::io_service& io_service_;
01106     asio::io_service::work work_;
01107     MutableBufferSequence buffers_;
01108     socket_base::message_flags flags_;
01109   };
01110 
01111   // Start an asynchronous receive. The buffer for the data being received
01112   // must be valid for the lifetime of the asynchronous operation.
01113   template <typename MutableBufferSequence, typename Handler>
01114   void async_receive(implementation_type& impl,
01115       const MutableBufferSequence& buffers,
01116       socket_base::message_flags flags, Handler handler)
01117   {
01118     if (!is_open(impl))
01119     {
01120       this->get_io_service().post(bind_handler(handler,
01121             asio::error::bad_descriptor, 0));
01122     }
01123     else
01124     {
01125       if (impl.protocol_.type() == SOCK_STREAM)
01126       {
01127         // Determine total size of buffers.
01128         typename MutableBufferSequence::const_iterator iter = buffers.begin();
01129         typename MutableBufferSequence::const_iterator end = buffers.end();
01130         size_t i = 0;
01131         size_t total_buffer_size = 0;
01132         for (; iter != end && i < max_buffers; ++iter, ++i)
01133         {
01134           asio::mutable_buffer buffer(*iter);
01135           total_buffer_size += asio::buffer_size(buffer);
01136         }
01137 
01138         // A request to receive 0 bytes on a stream socket is a no-op.
01139         if (total_buffer_size == 0)
01140         {
01141           this->get_io_service().post(bind_handler(handler,
01142                 asio::error_code(), 0));
01143           return;
01144         }
01145       }
01146 
01147       // Make socket non-blocking.
01148       if (!(impl.flags_ & implementation_type::internal_non_blocking))
01149       {
01150         ioctl_arg_type non_blocking = 1;
01151         asio::error_code ec;
01152         if (socket_ops::ioctl(impl.socket_, FIONBIO, &non_blocking, ec))
01153         {
01154           this->get_io_service().post(bind_handler(handler, ec, 0));
01155           return;
01156         }
01157         impl.flags_ |= implementation_type::internal_non_blocking;
01158       }
01159 
01160       if (flags & socket_base::message_out_of_band)
01161       {
01162         reactor_.start_except_op(impl.socket_, impl.reactor_data_,
01163             receive_operation<MutableBufferSequence, Handler>(
01164               impl.socket_, impl.protocol_.type(),
01165               this->get_io_service(), buffers, flags, handler));
01166       }
01167       else
01168       {
01169         reactor_.start_read_op(impl.socket_, impl.reactor_data_,
01170             receive_operation<MutableBufferSequence, Handler>(
01171               impl.socket_, impl.protocol_.type(),
01172               this->get_io_service(), buffers, flags, handler));
01173       }
01174     }
01175   }
01176 
01177   // Wait until data can be received without blocking.
01178   template <typename Handler>
01179   void async_receive(implementation_type& impl, const null_buffers&,
01180       socket_base::message_flags flags, Handler handler)
01181   {
01182     if (!is_open(impl))
01183     {
01184       this->get_io_service().post(bind_handler(handler,
01185             asio::error::bad_descriptor, 0));
01186     }
01187     else if (flags & socket_base::message_out_of_band)
01188     {
01189       reactor_.start_except_op(impl.socket_, impl.reactor_data_,
01190           null_buffers_operation<Handler>(this->get_io_service(), handler));
01191     }
01192     else
01193     {
01194       reactor_.start_read_op(impl.socket_, impl.reactor_data_,
01195           null_buffers_operation<Handler>(this->get_io_service(), handler),
01196           false);
01197     }
01198   }
01199 
01200   // Receive a datagram with the endpoint of the sender. Returns the number of
01201   // bytes received.
01202   template <typename MutableBufferSequence>
01203   size_t receive_from(implementation_type& impl,
01204       const MutableBufferSequence& buffers,
01205       endpoint_type& sender_endpoint, socket_base::message_flags flags,
01206       asio::error_code& ec)
01207   {
01208     if (!is_open(impl))
01209     {
01210       ec = asio::error::bad_descriptor;
01211       return 0;
01212     }
01213 
01214     // Copy buffers into array.
01215     socket_ops::buf bufs[max_buffers];
01216     typename MutableBufferSequence::const_iterator iter = buffers.begin();
01217     typename MutableBufferSequence::const_iterator end = buffers.end();
01218     size_t i = 0;
01219     for (; iter != end && i < max_buffers; ++iter, ++i)
01220     {
01221       asio::mutable_buffer buffer(*iter);
01222       socket_ops::init_buf(bufs[i],
01223           asio::buffer_cast<void*>(buffer),
01224           asio::buffer_size(buffer));
01225     }
01226 
01227     // Make socket non-blocking if user wants non-blocking.
01228     if (impl.flags_ & implementation_type::user_set_non_blocking)
01229     {
01230       if (!(impl.flags_ & implementation_type::internal_non_blocking))
01231       {
01232         ioctl_arg_type non_blocking = 1;
01233         if (socket_ops::ioctl(impl.socket_, FIONBIO, &non_blocking, ec))
01234           return 0;
01235         impl.flags_ |= implementation_type::internal_non_blocking;
01236       }
01237     }
01238 
01239     // Receive some data.
01240     for (;;)
01241     {
01242       // Try to complete the operation without blocking.
01243       std::size_t addr_len = sender_endpoint.capacity();
01244       int bytes_recvd = socket_ops::recvfrom(impl.socket_, bufs, i, flags,
01245           sender_endpoint.data(), &addr_len, ec);
01246 
01247       // Check if operation succeeded.
01248       if (bytes_recvd > 0)
01249       {
01250         sender_endpoint.resize(addr_len);
01251         return bytes_recvd;
01252       }
01253 
01254       // Check for EOF.
01255       if (bytes_recvd == 0 && impl.protocol_.type() == SOCK_STREAM)
01256       {
01257         ec = asio::error::eof;
01258         return 0;
01259       }
01260 
01261       // Operation failed.
01262       if ((impl.flags_ & implementation_type::user_set_non_blocking)
01263           || (ec != asio::error::would_block
01264             && ec != asio::error::try_again))
01265         return 0;
01266 
01267       // Wait for socket to become ready.
01268       if (socket_ops::poll_read(impl.socket_, ec) < 0)
01269         return 0;
01270     }
01271   }
01272 
01273   // Wait until data can be received without blocking.
01274   size_t receive_from(implementation_type& impl, const null_buffers&,
01275       endpoint_type& sender_endpoint, socket_base::message_flags,
01276       asio::error_code& ec)
01277   {
01278     if (!is_open(impl))
01279     {
01280       ec = asio::error::bad_descriptor;
01281       return 0;
01282     }
01283 
01284     // Wait for socket to become ready.
01285     socket_ops::poll_read(impl.socket_, ec);
01286 
01287     // Reset endpoint since it can be given no sensible value at this time.
01288     sender_endpoint = endpoint_type();
01289 
01290     return 0;
01291   }
01292 
01293   template <typename MutableBufferSequence, typename Handler>
01294   class receive_from_operation :
01295     public handler_base_from_member<Handler>
01296   {
01297   public:
01298     receive_from_operation(socket_type socket, int protocol_type,
01299         asio::io_service& io_service,
01300         const MutableBufferSequence& buffers, endpoint_type& endpoint,
01301         socket_base::message_flags flags, Handler handler)
01302       : handler_base_from_member<Handler>(handler),
01303         socket_(socket),
01304         protocol_type_(protocol_type),
01305         io_service_(io_service),
01306         work_(io_service),
01307         buffers_(buffers),
01308         sender_endpoint_(endpoint),
01309         flags_(flags)
01310     {
01311     }
01312 
01313     bool perform(asio::error_code& ec,
01314         std::size_t& bytes_transferred)
01315     {
01316       // Check whether the operation was successful.
01317       if (ec)
01318       {
01319         bytes_transferred = 0;
01320         return true;
01321       }
01322 
01323       // Copy buffers into array.
01324       socket_ops::buf bufs[max_buffers];
01325       typename MutableBufferSequence::const_iterator iter = buffers_.begin();
01326       typename MutableBufferSequence::const_iterator end = buffers_.end();
01327       size_t i = 0;
01328       for (; iter != end && i < max_buffers; ++iter, ++i)
01329       {
01330         asio::mutable_buffer buffer(*iter);
01331         socket_ops::init_buf(bufs[i],
01332             asio::buffer_cast<void*>(buffer),
01333             asio::buffer_size(buffer));
01334       }
01335 
01336       // Receive some data.
01337       std::size_t addr_len = sender_endpoint_.capacity();
01338       int bytes = socket_ops::recvfrom(socket_, bufs, i, flags_,
01339           sender_endpoint_.data(), &addr_len, ec);
01340       if (bytes == 0 && protocol_type_ == SOCK_STREAM)
01341         ec = asio::error::eof;
01342 
01343       // Check if we need to run the operation again.
01344       if (ec == asio::error::would_block
01345           || ec == asio::error::try_again)
01346         return false;
01347 
01348       sender_endpoint_.resize(addr_len);
01349       bytes_transferred = (bytes < 0 ? 0 : bytes);
01350       return true;
01351     }
01352 
01353     void complete(const asio::error_code& ec,
01354         std::size_t bytes_transferred)
01355     {
01356       io_service_.post(bind_handler(this->handler_, ec, bytes_transferred));
01357     }
01358 
01359   private:
01360     socket_type socket_;
01361     int protocol_type_;
01362     asio::io_service& io_service_;
01363     asio::io_service::work work_;
01364     MutableBufferSequence buffers_;
01365     endpoint_type& sender_endpoint_;
01366     socket_base::message_flags flags_;
01367   };
01368 
01369   // Start an asynchronous receive. The buffer for the data being received and
01370   // the sender_endpoint object must both be valid for the lifetime of the
01371   // asynchronous operation.
01372   template <typename MutableBufferSequence, typename Handler>
01373   void async_receive_from(implementation_type& impl,
01374       const MutableBufferSequence& buffers, endpoint_type& sender_endpoint,
01375       socket_base::message_flags flags, Handler handler)
01376   {
01377     if (!is_open(impl))
01378     {
01379       this->get_io_service().post(bind_handler(handler,
01380             asio::error::bad_descriptor, 0));
01381     }
01382     else
01383     {
01384       // Make socket non-blocking.
01385       if (!(impl.flags_ & implementation_type::internal_non_blocking))
01386       {
01387         ioctl_arg_type non_blocking = 1;
01388         asio::error_code ec;
01389         if (socket_ops::ioctl(impl.socket_, FIONBIO, &non_blocking, ec))
01390         {
01391           this->get_io_service().post(bind_handler(handler, ec, 0));
01392           return;
01393         }
01394         impl.flags_ |= implementation_type::internal_non_blocking;
01395       }
01396 
01397       reactor_.start_read_op(impl.socket_, impl.reactor_data_,
01398           receive_from_operation<MutableBufferSequence, Handler>(
01399             impl.socket_, impl.protocol_.type(), this->get_io_service(),
01400             buffers, sender_endpoint, flags, handler));
01401     }
01402   }
01403 
01404   // Wait until data can be received without blocking.
01405   template <typename Handler>
01406   void async_receive_from(implementation_type& impl,
01407       const null_buffers&, endpoint_type& sender_endpoint,
01408       socket_base::message_flags flags, Handler handler)
01409   {
01410     if (!is_open(impl))
01411     {
01412       this->get_io_service().post(bind_handler(handler,
01413             asio::error::bad_descriptor, 0));
01414     }
01415     else
01416     {
01417       // Reset endpoint since it can be given no sensible value at this time.
01418       sender_endpoint = endpoint_type();
01419 
01420       if (flags & socket_base::message_out_of_band)
01421       {
01422         reactor_.start_except_op(impl.socket_, impl.reactor_data_,
01423             null_buffers_operation<Handler>(this->get_io_service(), handler));
01424       }
01425       else
01426       {
01427         reactor_.start_read_op(impl.socket_, impl.reactor_data_,
01428             null_buffers_operation<Handler>(this->get_io_service(), handler),
01429             false);
01430       }
01431     }
01432   }
01433 
01434   // Accept a new connection.
01435   template <typename Socket>
01436   asio::error_code accept(implementation_type& impl,
01437       Socket& peer, endpoint_type* peer_endpoint, asio::error_code& ec)
01438   {
01439     if (!is_open(impl))
01440     {
01441       ec = asio::error::bad_descriptor;
01442       return ec;
01443     }
01444 
01445     // We cannot accept a socket that is already open.
01446     if (peer.is_open())
01447     {
01448       ec = asio::error::already_open;
01449       return ec;
01450     }
01451 
01452     // Make socket non-blocking if user wants non-blocking.
01453     if (impl.flags_ & implementation_type::user_set_non_blocking)
01454     {
01455       if (!(impl.flags_ & implementation_type::internal_non_blocking))
01456       {
01457         ioctl_arg_type non_blocking = 1;
01458         if (socket_ops::ioctl(impl.socket_, FIONBIO, &non_blocking, ec))
01459           return ec;
01460         impl.flags_ |= implementation_type::internal_non_blocking;
01461       }
01462     }
01463 
01464     // Accept a socket.
01465     for (;;)
01466     {
01467       // Try to complete the operation without blocking.
01468       asio::error_code ec;
01469       socket_holder new_socket;
01470       std::size_t addr_len = 0;
01471       if (peer_endpoint)
01472       {
01473         addr_len = peer_endpoint->capacity();
01474         new_socket.reset(socket_ops::accept(impl.socket_,
01475               peer_endpoint->data(), &addr_len, ec));
01476       }
01477       else
01478       {
01479         new_socket.reset(socket_ops::accept(impl.socket_, 0, 0, ec));
01480       }
01481 
01482       // Check if operation succeeded.
01483       if (new_socket.get() >= 0)
01484       {
01485         if (peer_endpoint)
01486           peer_endpoint->resize(addr_len);
01487         peer.assign(impl.protocol_, new_socket.get(), ec);
01488         if (!ec)
01489           new_socket.release();
01490         return ec;
01491       }
01492 
01493       // Operation failed.
01494       if (ec == asio::error::would_block
01495           || ec == asio::error::try_again)
01496       {
01497         if (impl.flags_ & implementation_type::user_set_non_blocking)
01498           return ec;
01499         // Fall through to retry operation.
01500       }
01501       else if (ec == asio::error::connection_aborted)
01502       {
01503         if (impl.flags_ & implementation_type::enable_connection_aborted)
01504           return ec;
01505         // Fall through to retry operation.
01506       }
01507 #if defined(EPROTO)
01508       else if (ec.value() == EPROTO)
01509       {
01510         if (impl.flags_ & implementation_type::enable_connection_aborted)
01511           return ec;
01512         // Fall through to retry operation.
01513       }
01514 #endif // defined(EPROTO)
01515       else
01516         return ec;
01517 
01518       // Wait for socket to become ready.
01519       if (socket_ops::poll_read(impl.socket_, ec) < 0)
01520         return ec;
01521     }
01522   }
01523 
01524   template <typename Socket, typename Handler>
01525   class accept_operation :
01526     public handler_base_from_member<Handler>
01527   {
01528   public:
01529     accept_operation(socket_type socket, asio::io_service& io_service,
01530         Socket& peer, const protocol_type& protocol,
01531         endpoint_type* peer_endpoint, bool enable_connection_aborted,
01532         Handler handler)
01533       : handler_base_from_member<Handler>(handler),
01534         socket_(socket),
01535         io_service_(io_service),
01536         work_(io_service),
01537         peer_(peer),
01538         protocol_(protocol),
01539         peer_endpoint_(peer_endpoint),
01540         enable_connection_aborted_(enable_connection_aborted)
01541     {
01542     }
01543 
01544     bool perform(asio::error_code& ec, std::size_t&)
01545     {
01546       // Check whether the operation was successful.
01547       if (ec)
01548         return true;
01549 
01550       // Accept the waiting connection.
01551       socket_holder new_socket;
01552       std::size_t addr_len = 0;
01553       if (peer_endpoint_)
01554       {
01555         addr_len = peer_endpoint_->capacity();
01556         new_socket.reset(socket_ops::accept(socket_,
01557               peer_endpoint_->data(), &addr_len, ec));
01558       }
01559       else
01560       {
01561         new_socket.reset(socket_ops::accept(socket_, 0, 0, ec));
01562       }
01563 
01564       // Check if we need to run the operation again.
01565       if (ec == asio::error::would_block
01566           || ec == asio::error::try_again)
01567         return false;
01568       if (ec == asio::error::connection_aborted
01569           && !enable_connection_aborted_)
01570         return false;
01571 #if defined(EPROTO)
01572       if (ec.value() == EPROTO && !enable_connection_aborted_)
01573         return false;
01574 #endif // defined(EPROTO)
01575 
01576       // Transfer ownership of the new socket to the peer object.
01577       if (!ec)
01578       {
01579         if (peer_endpoint_)
01580           peer_endpoint_->resize(addr_len);
01581         peer_.assign(protocol_, new_socket.get(), ec);
01582         if (!ec)
01583           new_socket.release();
01584       }
01585 
01586       return true;
01587     }
01588 
01589     void complete(const asio::error_code& ec, std::size_t)
01590     {
01591       io_service_.post(bind_handler(this->handler_, ec));
01592     }
01593 
01594   private:
01595     socket_type socket_;
01596     asio::io_service& io_service_;
01597     asio::io_service::work work_;
01598     Socket& peer_;
01599     protocol_type protocol_;
01600     endpoint_type* peer_endpoint_;
01601     bool enable_connection_aborted_;
01602   };
01603 
01604   // Start an asynchronous accept. The peer and peer_endpoint objects
01605   // must be valid until the accept's handler is invoked.
01606   template <typename Socket, typename Handler>
01607   void async_accept(implementation_type& impl, Socket& peer,
01608       endpoint_type* peer_endpoint, Handler handler)
01609   {
01610     if (!is_open(impl))
01611     {
01612       this->get_io_service().post(bind_handler(handler,
01613             asio::error::bad_descriptor));
01614     }
01615     else if (peer.is_open())
01616     {
01617       this->get_io_service().post(bind_handler(handler,
01618             asio::error::already_open));
01619     }
01620     else
01621     {
01622       // Make socket non-blocking.
01623       if (!(impl.flags_ & implementation_type::internal_non_blocking))
01624       {
01625         ioctl_arg_type non_blocking = 1;
01626         asio::error_code ec;
01627         if (socket_ops::ioctl(impl.socket_, FIONBIO, &non_blocking, ec))
01628         {
01629           this->get_io_service().post(bind_handler(handler, ec));
01630           return;
01631         }
01632         impl.flags_ |= implementation_type::internal_non_blocking;
01633       }
01634 
01635       reactor_.start_read_op(impl.socket_, impl.reactor_data_,
01636           accept_operation<Socket, Handler>(
01637             impl.socket_, this->get_io_service(),
01638             peer, impl.protocol_, peer_endpoint,
01639             (impl.flags_ & implementation_type::enable_connection_aborted) != 0,
01640             handler));
01641     }
01642   }
01643 
01644   // Connect the socket to the specified endpoint.
01645   asio::error_code connect(implementation_type& impl,
01646       const endpoint_type& peer_endpoint, asio::error_code& ec)
01647   {
01648     if (!is_open(impl))
01649     {
01650       ec = asio::error::bad_descriptor;
01651       return ec;
01652     }
01653 
01654     if (impl.flags_ & implementation_type::internal_non_blocking)
01655     {
01656       // Mark the socket as blocking while we perform the connect.
01657       ioctl_arg_type non_blocking = 0;
01658       if (socket_ops::ioctl(impl.socket_, FIONBIO, &non_blocking, ec))
01659         return ec;
01660       impl.flags_ &= ~implementation_type::internal_non_blocking;
01661     }
01662 
01663     // Perform the connect operation.
01664     socket_ops::connect(impl.socket_,
01665         peer_endpoint.data(), peer_endpoint.size(), ec);
01666     return ec;
01667   }
01668 
01669   template <typename Handler>
01670   class connect_operation :
01671     public handler_base_from_member<Handler>
01672   {
01673   public:
01674     connect_operation(socket_type socket,
01675         asio::io_service& io_service, Handler handler)
01676       : handler_base_from_member<Handler>(handler),
01677         socket_(socket),
01678         io_service_(io_service),
01679         work_(io_service)
01680     {
01681     }
01682 
01683     bool perform(asio::error_code& ec, std::size_t&)
01684     {
01685       // Check whether the operation was successful.
01686       if (ec)
01687         return true;
01688 
01689       // Get the error code from the connect operation.
01690       int connect_error = 0;
01691       size_t connect_error_len = sizeof(connect_error);
01692       if (socket_ops::getsockopt(socket_, SOL_SOCKET, SO_ERROR,
01693             &connect_error, &connect_error_len, ec) == socket_error_retval)
01694         return true;
01695 
01696       // The connection failed so the handler will be posted with an error code.
01697       if (connect_error)
01698       {
01699         ec = asio::error_code(connect_error,
01700             asio::error::get_system_category());
01701         return true;
01702       }
01703 
01704       return true;
01705     }
01706 
01707     void complete(const asio::error_code& ec, std::size_t)
01708     {
01709       io_service_.post(bind_handler(this->handler_, ec));
01710     }
01711 
01712   private:
01713     socket_type socket_;
01714     asio::io_service& io_service_;
01715     asio::io_service::work work_;
01716   };
01717 
01718   // Start an asynchronous connect.
01719   template <typename Handler>
01720   void async_connect(implementation_type& impl,
01721       const endpoint_type& peer_endpoint, Handler handler)
01722   {
01723     if (!is_open(impl))
01724     {
01725       this->get_io_service().post(bind_handler(handler,
01726             asio::error::bad_descriptor));
01727       return;
01728     }
01729 
01730     // Make socket non-blocking.
01731     if (!(impl.flags_ & implementation_type::internal_non_blocking))
01732     {
01733       ioctl_arg_type non_blocking = 1;
01734       asio::error_code ec;
01735       if (socket_ops::ioctl(impl.socket_, FIONBIO, &non_blocking, ec))
01736       {
01737         this->get_io_service().post(bind_handler(handler, ec));
01738         return;
01739       }
01740       impl.flags_ |= implementation_type::internal_non_blocking;
01741     }
01742 
01743     // Start the connect operation. The socket is already marked as non-blocking
01744     // so the connection will take place asynchronously.
01745     asio::error_code ec;
01746     if (socket_ops::connect(impl.socket_, peer_endpoint.data(),
01747           peer_endpoint.size(), ec) == 0)
01748     {
01749       // The connect operation has finished successfully so we need to post the
01750       // handler immediately.
01751       this->get_io_service().post(bind_handler(handler,
01752             asio::error_code()));
01753     }
01754     else if (ec == asio::error::in_progress
01755         || ec == asio::error::would_block)
01756     {
01757       // The connection is happening in the background, and we need to wait
01758       // until the socket becomes writeable.
01759       reactor_.start_connect_op(impl.socket_, impl.reactor_data_,
01760           connect_operation<Handler>(impl.socket_,
01761             this->get_io_service(), handler));
01762     }
01763     else
01764     {
01765       // The connect operation has failed, so post the handler immediately.
01766       this->get_io_service().post(bind_handler(handler, ec));
01767     }
01768   }
01769 
01770 private:
01771   // The selector that performs event demultiplexing for the service.
01772   Reactor& reactor_;
01773 };
01774 
01775 } // namespace detail
01776 } // namespace asio
01777 
01778 #include "asio/detail/pop_options.hpp"
01779 
01780 #endif // ASIO_DETAIL_REACTIVE_SOCKET_SERVICE_HPP
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines


Castor
Author(s): Carpe Noctem
autogenerated on Fri Nov 8 2013 11:05:39